Python: request processing in multiple threads.
This closes #459 issue on GitHub.
This commit is contained in:
@@ -67,15 +67,16 @@ static PyTypeObject nxt_py_asgi_http_type = {
|
||||
static Py_ssize_t nxt_py_asgi_http_body_buf_size = 32 * 1024 * 1024;
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_py_asgi_http_init(nxt_task_t *task)
|
||||
int
|
||||
nxt_py_asgi_http_init(void)
|
||||
{
|
||||
if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_http_type) != 0)) {
|
||||
nxt_alert(task, "Python failed to initialize the 'http' type object");
|
||||
return NXT_ERROR;
|
||||
nxt_unit_alert(NULL,
|
||||
"Python failed to initialize the 'http' type object");
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
return NXT_OK;
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
|
||||
@@ -106,6 +107,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
|
||||
{
|
||||
PyObject *msg, *future;
|
||||
nxt_py_asgi_http_t *http;
|
||||
nxt_py_asgi_ctx_data_t *ctx_data;
|
||||
nxt_unit_request_info_t *req;
|
||||
|
||||
http = (nxt_py_asgi_http_t *) self;
|
||||
@@ -118,7 +120,9 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
|
||||
ctx_data = req->ctx->data;
|
||||
|
||||
future = PyObject_CallObject(ctx_data->loop_create_future, NULL);
|
||||
if (nxt_slow_path(future == NULL)) {
|
||||
nxt_unit_req_alert(req, "Python failed to create Future object");
|
||||
nxt_python_print_exception();
|
||||
@@ -130,7 +134,7 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
|
||||
}
|
||||
|
||||
if (msg != Py_None) {
|
||||
return nxt_py_asgi_set_result_soon(req, future, msg);
|
||||
return nxt_py_asgi_set_result_soon(req, ctx_data, future, msg);
|
||||
}
|
||||
|
||||
http->receive_future = future;
|
||||
@@ -329,11 +333,12 @@ nxt_py_asgi_http_response_start(nxt_py_asgi_http_t *http, PyObject *dict)
|
||||
static PyObject *
|
||||
nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
|
||||
{
|
||||
int rc;
|
||||
char *body_str;
|
||||
ssize_t sent;
|
||||
PyObject *body, *more_body, *future;
|
||||
Py_ssize_t body_len, body_off;
|
||||
int rc;
|
||||
char *body_str;
|
||||
ssize_t sent;
|
||||
PyObject *body, *more_body, *future;
|
||||
Py_ssize_t body_len, body_off;
|
||||
nxt_py_asgi_ctx_data_t *ctx_data;
|
||||
|
||||
body = PyDict_GetItem(dict, nxt_py_body_str);
|
||||
if (nxt_slow_path(body != NULL && !PyBytes_Check(body))) {
|
||||
@@ -371,6 +376,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
|
||||
|
||||
body_off = 0;
|
||||
|
||||
ctx_data = http->req->ctx->data;
|
||||
|
||||
while (body_len > 0) {
|
||||
sent = nxt_unit_response_write_nb(http->req, body_str, body_len, 0);
|
||||
if (nxt_slow_path(sent < 0)) {
|
||||
@@ -382,7 +389,8 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
|
||||
"out of shared memory, %d",
|
||||
(int) body_len);
|
||||
|
||||
future = PyObject_CallObject(nxt_py_loop_create_future, NULL);
|
||||
future = PyObject_CallObject(ctx_data->loop_create_future,
|
||||
NULL);
|
||||
if (nxt_slow_path(future == NULL)) {
|
||||
nxt_unit_req_alert(http->req,
|
||||
"Python failed to create Future object");
|
||||
@@ -396,7 +404,7 @@ nxt_py_asgi_http_response_body(nxt_py_asgi_http_t *http, PyObject *dict)
|
||||
Py_INCREF(http->send_body);
|
||||
http->send_body_off = body_off;
|
||||
|
||||
nxt_queue_insert_tail(&nxt_py_asgi_drain_queue, &http->link);
|
||||
nxt_py_asgi_drain_wait(http->req, &http->link);
|
||||
|
||||
http->send_future = future;
|
||||
Py_INCREF(http->send_future);
|
||||
|
||||
Reference in New Issue
Block a user