Refactoring reference counting of req_app_link.

The reason for the change is that the req_app_link reference count
was incorrect if the application crashed at start; in this case,
the nxt_request_app_link_update_peer() function was never called.

This closes #332 issue on GitHub.
This commit is contained in:
Max Romanov
2019-11-26 17:14:53 +03:00
parent 19b974674c
commit 4eecf1cb6a

View File

@@ -95,16 +95,16 @@ nxt_request_app_link_inc_use(nxt_request_app_link_t *req_app_link)
} }
nxt_inline void nxt_inline void
nxt_request_app_link_dec_use(nxt_request_app_link_t *req_app_link) nxt_request_app_link_chk_use(nxt_request_app_link_t *req_app_link, int i)
{ {
#if (NXT_DEBUG) #if (NXT_DEBUG)
int c; int c;
c = nxt_atomic_fetch_add(&req_app_link->use_count, -1); c = nxt_atomic_fetch_add(&req_app_link->use_count, i);
nxt_assert(c > 1); nxt_assert((c + i) > 0);
#else #else
(void) nxt_atomic_fetch_add(&req_app_link->use_count, -1); (void) nxt_atomic_fetch_add(&req_app_link->use_count, i);
#endif #endif
} }
@@ -600,8 +600,6 @@ nxt_request_app_link_update_peer(nxt_task_t *task,
nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data,
req_app_link->app_port->pid); req_app_link->app_port->pid);
} }
nxt_request_app_link_use(task, req_app_link, -1);
} }
@@ -3732,8 +3730,6 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info, cancelled = nxt_router_msg_cancel(task, &req_app_link->msg_info,
req_app_link->stream); req_app_link->stream);
if (cancelled) { if (cancelled) {
nxt_request_app_link_inc_use(req_app_link);
res = nxt_router_app_port(task, req_rpc_data->app, req_app_link); res = nxt_router_app_port(task, req_rpc_data->app, req_app_link);
if (res == NXT_OK) { if (res == NXT_OK) {
@@ -3751,6 +3747,8 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_app_prepare_request(task, req_app_link); nxt_router_app_prepare_request(task, req_app_link);
} }
nxt_request_app_link_use(task, req_app_link, -1);
msg->port_msg.last = 0; msg->port_msg.last = 0;
return; return;
@@ -4015,6 +4013,8 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
#endif #endif
nxt_router_app_prepare_request(task, req_app_link); nxt_router_app_prepare_request(task, req_app_link);
nxt_request_app_link_use(task, req_app_link, -1);
} }
@@ -4148,8 +4148,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
re_ra->stream); re_ra->stream);
if (cancelled) { if (cancelled) {
nxt_request_app_link_inc_use(re_ra);
state.req_app_link = re_ra; state.req_app_link = re_ra;
state.app = app; state.app = app;
@@ -4217,19 +4215,38 @@ re_ra_cancelled:
if (re_ra != NULL) { if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) { if (nxt_router_port_post_select(task, &state) == NXT_OK) {
/*
* There should be call nxt_request_app_link_inc_use(re_ra),
* but we need to decrement use then. So, let's skip both.
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request, nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra); &task->thread->engine->task, app, re_ra);
} else {
/*
* This call should be unconditional, but we want to spare
* couple of CPU ticks to postpone the head death of the universe.
*/
nxt_request_app_link_use(task, re_ra, -1);
} }
} }
if (req_app_link != NULL) { if (req_app_link != NULL) {
nxt_request_app_link_use(task, req_app_link, -1); /*
* Here we do the same trick as described above,
* but without conditions.
* Skip required nxt_request_app_link_inc_use(req_app_link).
*/
nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request, nxt_router_app_process_request,
&task->thread->engine->task, app, req_app_link); &task->thread->engine->task, app, req_app_link);
/* ... skip nxt_request_app_link_use(task, req_app_link, -1) too. */
goto adjust_use; goto adjust_use;
} }
@@ -4477,6 +4494,7 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
static void static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state) nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{ {
int ra_use_delta;
nxt_app_t *app; nxt_app_t *app;
nxt_bool_t can_start_process; nxt_bool_t can_start_process;
nxt_request_app_link_t *req_app_link; nxt_request_app_link_t *req_app_link;
@@ -4485,11 +4503,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
app = state->app; app = state->app;
state->failed_port_use_delta = 0; state->failed_port_use_delta = 0;
ra_use_delta = -nxt_queue_chk_remove(&req_app_link->link_app_requests);
if (nxt_queue_chk_remove(&req_app_link->link_app_requests))
{
nxt_request_app_link_dec_use(req_app_link);
}
if (nxt_queue_chk_remove(&req_app_link->link_port_pending)) if (nxt_queue_chk_remove(&req_app_link->link_port_pending))
{ {
@@ -4498,7 +4512,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_queue_remove(&req_app_link->link_app_pending); nxt_queue_remove(&req_app_link->link_app_pending);
req_app_link->link_app_pending.next = NULL; req_app_link->link_app_pending.next = NULL;
nxt_request_app_link_dec_use(req_app_link); ra_use_delta--;
} }
state->failed_port = req_app_link->app_port; state->failed_port = req_app_link->app_port;
@@ -4538,7 +4552,7 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
&req_app_link->link_app_requests); &req_app_link->link_app_requests);
} }
nxt_request_app_link_inc_use(req_app_link); ra_use_delta++;
nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests", nxt_debug(task, "req_app_link stream #%uD enqueue to app->requests",
req_app_link->stream); req_app_link->stream);
@@ -4569,6 +4583,8 @@ nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
} }
} }
nxt_request_app_link_chk_use(req_app_link, ra_use_delta);
fail: fail:
state->shared_ra = req_app_link; state->shared_ra = req_app_link;
@@ -4596,7 +4612,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
nxt_request_app_link_error(state->req_app_link, 500, nxt_request_app_link_error(state->req_app_link, 500,
"Failed to allocate shared req<->app link"); "Failed to allocate shared req<->app link");
nxt_request_app_link_use(task, state->req_app_link, -1);
return NXT_ERROR; return NXT_ERROR;
} }
@@ -4625,7 +4640,6 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
if (nxt_slow_path(res != NXT_OK)) { if (nxt_slow_path(res != NXT_OK)) {
nxt_request_app_link_error(req_app_link, 500, nxt_request_app_link_error(req_app_link, 500,
"Failed to start app process"); "Failed to start app process");
nxt_request_app_link_use(task, req_app_link, -1);
return NXT_ERROR; return NXT_ERROR;
} }
@@ -4686,12 +4700,9 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_request_app_link_init(task, req_app_link, req_rpc_data); nxt_request_app_link_init(task, req_app_link, req_rpc_data);
res = nxt_router_app_port(task, app, req_app_link); res = nxt_router_app_port(task, app, req_app_link);
if (res != NXT_OK) {
return;
}
req_app_link = req_rpc_data->req_app_link; req_app_link = req_rpc_data->req_app_link;
if (res == NXT_OK) {
port = req_app_link->app_port; port = req_app_link->app_port;
nxt_assert(port != NULL); nxt_assert(port != NULL);
@@ -4699,6 +4710,9 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid); nxt_port_rpc_ex_set_peer(task, engine->port, req_rpc_data, port->pid);
nxt_router_app_prepare_request(task, req_app_link); nxt_router_app_prepare_request(task, req_app_link);
}
nxt_request_app_link_use(task, req_app_link, -1);
} }
@@ -5172,8 +5186,6 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
pending_ra->stream); pending_ra->stream);
if (cancelled) { if (cancelled) {
nxt_request_app_link_inc_use(pending_ra);
state.req_app_link = pending_ra; state.req_app_link = pending_ra;
state.app = app; state.app = app;
@@ -5186,12 +5198,14 @@ nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
nxt_thread_mutex_unlock(&app->mutex); nxt_thread_mutex_unlock(&app->mutex);
if (pending_ra != NULL if (pending_ra != NULL) {
&& nxt_router_port_post_select(task, &state) == NXT_OK) if (nxt_router_port_post_select(task, &state) == NXT_OK) {
{
nxt_router_app_prepare_request(task, pending_ra); nxt_router_app_prepare_request(task, pending_ra);
} }
nxt_request_app_link_use(task, pending_ra, -1);
}
nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid); nxt_debug(task, "send quit to app '%V' pid %PI", &app->name, port->pid);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL); nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);