Stopping timed out application process.

This commit is contained in:
Max Romanov
2018-04-05 17:19:25 +03:00
parent c0a423aa74
commit d748f74f2b
2 changed files with 89 additions and 9 deletions

View File

@@ -133,6 +133,7 @@ struct nxt_app_parse_ctx_s {
nxt_app_request_t r;
nxt_http_request_t *request;
nxt_timer_t timer;
void *timer_data;
nxt_http_request_parse_t parser;
nxt_http_request_parse_t resp_parser;
nxt_mp_t *mem_pool;

View File

@@ -543,6 +543,7 @@ nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
if (rc->app->timeout != 0) {
rc->ap->timer.handler = nxt_router_app_timeout;
rc->ap->timer_data = rc;
nxt_timer_add(task->thread->engine, &rc->ap->timer,
rc->app->timeout);
}
@@ -707,6 +708,8 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
}
if (rc->ap != NULL) {
rc->ap->timer_data = NULL;
nxt_app_http_req_done(task, rc->ap);
rc->ap = NULL;
@@ -2664,7 +2667,6 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
if (ar->request->error) {
nxt_app_http_req_done(task, ar);
nxt_router_rc_unlink(task, rc);
return;
}
@@ -2677,8 +2679,9 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_rc_unlink(task, rc);
} else {
if (rc->app->timeout != 0) {
if (rc->app != NULL && rc->app->timeout != 0) {
ar->timer.handler = nxt_router_app_timeout;
ar->timer_data = rc;
nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
}
}
@@ -2732,10 +2735,9 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
fail:
nxt_app_http_req_done(task, ar);
nxt_router_rc_unlink(task, rc);
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
nxt_router_rc_unlink(task, rc);
}
@@ -4262,16 +4264,93 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *timer;
nxt_app_parse_ctx_t *ar;
nxt_app_t *app;
nxt_bool_t cancelled, unlinked;
nxt_port_t *port;
nxt_timer_t *timer;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *pending_ra;
nxt_app_parse_ctx_t *ar;
nxt_req_conn_link_t *rc;
nxt_port_select_state_t state;
timer = obj;
nxt_debug(task, "router app timeout");
ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
rc = ar->timer_data;
app = rc->app;
if (!ar->request->header_sent) {
nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
if (app == NULL) {
goto generate_error;
}
port = NULL;
pending_ra = NULL;
if (rc->app_port != NULL) {
port = rc->app_port;
rc->app_port = NULL;
}
if (port == NULL && rc->ra != NULL && rc->ra->app_port != NULL) {
port = rc->ra->app_port;
rc->ra->app_port = NULL;
}
if (port == NULL) {
goto generate_error;
}
nxt_thread_mutex_lock(&app->mutex);
unlinked = nxt_queue_chk_remove(&port->app_link);
if (!nxt_queue_is_empty(&port->pending_requests)) {
lnk = nxt_queue_first(&port->pending_requests);
pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
nxt_debug(task, "app '%V' pending request #%uD found",
&app->name, pending_ra->stream);
cancelled = nxt_router_msg_cancel(task, &pending_ra->msg_info,
pending_ra->stream);
if (cancelled) {
nxt_router_ra_inc_use(pending_ra);
state.ra = pending_ra;
state.app = app;
nxt_router_port_select(task, &state);
} else {
pending_ra = NULL;
}
}
nxt_thread_mutex_unlock(&app->mutex);
if (pending_ra != NULL
&& nxt_router_port_post_select(task, &state) == NXT_OK)
{
nxt_router_app_prepare_request(task, pending_ra);
}
nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
nxt_port_use(task, port, unlinked ? -2 : -1);
generate_error:
nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
nxt_router_rc_unlink(task, rc);
}