Libunit: closing active requests on quit.
This commit is contained in:
@@ -307,6 +307,8 @@ Unit::close_handler(nxt_unit_request_info_t *req)
|
|||||||
} catch (exception &e) {
|
} catch (exception &e) {
|
||||||
nxt_unit_req_warn(req, "close_handler: %s", e.str);
|
nxt_unit_req_warn(req, "close_handler: %s", e.str);
|
||||||
|
|
||||||
|
nxt_unit_request_done(req, NXT_UNIT_ERROR);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1643,8 +1643,6 @@ nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (recv_msg->last) {
|
if (recv_msg->last) {
|
||||||
req_impl->websocket = 0;
|
|
||||||
|
|
||||||
if (cb->close_handler) {
|
if (cb->close_handler) {
|
||||||
nxt_unit_req_debug(req, "close_handler");
|
nxt_unit_req_debug(req, "close_handler");
|
||||||
|
|
||||||
@@ -1737,8 +1735,6 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
|
|||||||
nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
|
nxt_unit_request_hash_find(req->ctx, req_impl->stream, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
req_impl->websocket = 0;
|
|
||||||
|
|
||||||
while (req_impl->outgoing_buf != NULL) {
|
while (req_impl->outgoing_buf != NULL) {
|
||||||
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
|
nxt_unit_mmap_buf_free(req_impl->outgoing_buf);
|
||||||
}
|
}
|
||||||
@@ -5708,9 +5704,12 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
|
|||||||
static void
|
static void
|
||||||
nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
||||||
{
|
{
|
||||||
nxt_port_msg_t msg;
|
nxt_port_msg_t msg;
|
||||||
nxt_unit_impl_t *lib;
|
nxt_unit_impl_t *lib;
|
||||||
nxt_unit_ctx_impl_t *ctx_impl;
|
nxt_unit_ctx_impl_t *ctx_impl;
|
||||||
|
nxt_unit_callbacks_t *cb;
|
||||||
|
nxt_unit_request_info_t *req;
|
||||||
|
nxt_unit_request_info_impl_t *req_impl;
|
||||||
|
|
||||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
@@ -5721,10 +5720,30 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
ctx_impl->online = 0;
|
ctx_impl->online = 0;
|
||||||
|
|
||||||
if (lib->callbacks.quit != NULL) {
|
cb = &lib->callbacks;
|
||||||
lib->callbacks.quit(ctx);
|
|
||||||
|
if (cb->quit != NULL) {
|
||||||
|
cb->quit(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_queue_each(req_impl, &ctx_impl->active_req,
|
||||||
|
nxt_unit_request_info_impl_t, link)
|
||||||
|
{
|
||||||
|
req = &req_impl->req;
|
||||||
|
|
||||||
|
nxt_unit_req_warn(req, "active request on ctx quit");
|
||||||
|
|
||||||
|
if (cb->close_handler) {
|
||||||
|
nxt_unit_req_debug(req, "close_handler");
|
||||||
|
|
||||||
|
cb->close_handler(req);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_unit_request_done(req, NXT_UNIT_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
} nxt_queue_loop;
|
||||||
|
|
||||||
if (ctx != &lib->main_ctx.ctx) {
|
if (ctx != &lib->main_ctx.ctx) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ static int nxt_python_asgi_run(nxt_unit_ctx_t *ctx);
|
|||||||
static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
|
static void nxt_py_asgi_remove_reader(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_port_t *port);
|
nxt_unit_port_t *port);
|
||||||
static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
|
static void nxt_py_asgi_request_handler(nxt_unit_request_info_t *req);
|
||||||
|
static void nxt_py_asgi_close_handler(nxt_unit_request_info_t *req);
|
||||||
|
|
||||||
static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
|
static PyObject *nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req);
|
||||||
static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
|
static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
|
||||||
@@ -179,7 +180,7 @@ nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
|
|||||||
init->callbacks.request_handler = nxt_py_asgi_request_handler;
|
init->callbacks.request_handler = nxt_py_asgi_request_handler;
|
||||||
init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
|
init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
|
||||||
init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
|
init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
|
||||||
init->callbacks.close_handler = nxt_py_asgi_websocket_close_handler;
|
init->callbacks.close_handler = nxt_py_asgi_close_handler;
|
||||||
init->callbacks.quit = nxt_py_asgi_quit;
|
init->callbacks.quit = nxt_py_asgi_quit;
|
||||||
init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
|
init->callbacks.shm_ack_handler = nxt_py_asgi_shm_ack_handler;
|
||||||
init->callbacks.add_port = nxt_py_asgi_add_port;
|
init->callbacks.add_port = nxt_py_asgi_add_port;
|
||||||
@@ -551,6 +552,18 @@ release_asgi:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_py_asgi_close_handler(nxt_unit_request_info_t *req)
|
||||||
|
{
|
||||||
|
if (req->request->websocket_handshake) {
|
||||||
|
nxt_py_asgi_websocket_close_handler(req);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_py_asgi_http_close_handler(req);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static PyObject *
|
static PyObject *
|
||||||
nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
|
nxt_py_asgi_create_http_scope(nxt_unit_request_info_t *req)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -58,6 +58,7 @@ int nxt_py_asgi_http_init(void);
|
|||||||
PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req);
|
PyObject *nxt_py_asgi_http_create(nxt_unit_request_info_t *req);
|
||||||
void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req);
|
void nxt_py_asgi_http_data_handler(nxt_unit_request_info_t *req);
|
||||||
int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk);
|
int nxt_py_asgi_http_drain(nxt_queue_link_t *lnk);
|
||||||
|
void nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req);
|
||||||
|
|
||||||
int nxt_py_asgi_websocket_init(void);
|
int nxt_py_asgi_websocket_init(void);
|
||||||
PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req);
|
PyObject *nxt_py_asgi_websocket_create(nxt_unit_request_info_t *req);
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ typedef struct {
|
|||||||
uint64_t content_length;
|
uint64_t content_length;
|
||||||
uint64_t bytes_sent;
|
uint64_t bytes_sent;
|
||||||
int complete;
|
int complete;
|
||||||
|
int closed;
|
||||||
PyObject *send_body;
|
PyObject *send_body;
|
||||||
Py_ssize_t send_body_off;
|
Py_ssize_t send_body_off;
|
||||||
} nxt_py_asgi_http_t;
|
} nxt_py_asgi_http_t;
|
||||||
@@ -94,6 +95,7 @@ nxt_py_asgi_http_create(nxt_unit_request_info_t *req)
|
|||||||
http->content_length = -1;
|
http->content_length = -1;
|
||||||
http->bytes_sent = 0;
|
http->bytes_sent = 0;
|
||||||
http->complete = 0;
|
http->complete = 0;
|
||||||
|
http->closed = 0;
|
||||||
http->send_body = NULL;
|
http->send_body = NULL;
|
||||||
http->send_body_off = 0;
|
http->send_body_off = 0;
|
||||||
}
|
}
|
||||||
@@ -115,7 +117,13 @@ nxt_py_asgi_http_receive(PyObject *self, PyObject *none)
|
|||||||
|
|
||||||
nxt_unit_req_debug(req, "asgi_http_receive");
|
nxt_unit_req_debug(req, "asgi_http_receive");
|
||||||
|
|
||||||
msg = nxt_py_asgi_http_read_msg(http);
|
if (nxt_slow_path(http->closed || nxt_unit_response_is_sent(req))) {
|
||||||
|
msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
msg = nxt_py_asgi_http_read_msg(http);
|
||||||
|
}
|
||||||
|
|
||||||
if (nxt_slow_path(msg == NULL)) {
|
if (nxt_slow_path(msg == NULL)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -561,6 +569,48 @@ fail:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
|
||||||
|
{
|
||||||
|
PyObject *msg, *future, *res;
|
||||||
|
nxt_py_asgi_http_t *http;
|
||||||
|
|
||||||
|
http = req->data;
|
||||||
|
|
||||||
|
nxt_unit_req_debug(req, "asgi_http_close_handler");
|
||||||
|
|
||||||
|
http->closed = 1;
|
||||||
|
|
||||||
|
if (http->receive_future == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
msg = nxt_py_asgi_new_msg(req, nxt_py_http_disconnect_str);
|
||||||
|
if (nxt_slow_path(msg == NULL)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (msg == Py_None) {
|
||||||
|
Py_DECREF(msg);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
future = http->receive_future;
|
||||||
|
http->receive_future = NULL;
|
||||||
|
|
||||||
|
res = PyObject_CallMethodObjArgs(future, nxt_py_set_result_str, msg, NULL);
|
||||||
|
if (nxt_slow_path(res == NULL)) {
|
||||||
|
nxt_unit_req_alert(req, "'set_result' call failed");
|
||||||
|
nxt_python_print_exception();
|
||||||
|
}
|
||||||
|
|
||||||
|
Py_XDECREF(res);
|
||||||
|
Py_DECREF(future);
|
||||||
|
|
||||||
|
Py_DECREF(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static PyObject *
|
static PyObject *
|
||||||
nxt_py_asgi_http_done(PyObject *self, PyObject *future)
|
nxt_py_asgi_http_done(PyObject *self, PyObject *future)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user