Fixing 'find & add' racing condition in connected ports hash.
Missing error log messages added.
This commit is contained in:
@@ -590,17 +590,6 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
|
||||
{
|
||||
nxt_thread_mutex_lock(&process->cp_mutex);
|
||||
|
||||
nxt_port_hash_add(&process->connected_ports, port);
|
||||
|
||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
|
||||
{
|
||||
@@ -613,14 +602,17 @@ nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
|
||||
|
||||
|
||||
nxt_port_t *
|
||||
nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
|
||||
nxt_port_id_t port_id)
|
||||
nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port)
|
||||
{
|
||||
nxt_port_t *res;
|
||||
|
||||
nxt_thread_mutex_lock(&process->cp_mutex);
|
||||
|
||||
res = nxt_port_hash_find(&process->connected_ports, pid, port_id);
|
||||
res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id);
|
||||
|
||||
if (nxt_slow_path(res == NULL)) {
|
||||
nxt_port_hash_add(&process->connected_ports, port);
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
||||
|
||||
|
||||
@@ -105,13 +105,11 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
|
||||
|
||||
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
||||
|
||||
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
||||
|
||||
void nxt_process_connected_port_remove(nxt_process_t *process,
|
||||
nxt_port_t *port);
|
||||
|
||||
nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
|
||||
nxt_pid_t pid, nxt_port_id_t port_id);
|
||||
nxt_port_t *nxt_process_connected_port_find_add(nxt_process_t *process,
|
||||
nxt_port_t *port);
|
||||
|
||||
void nxt_worker_process_quit_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
|
||||
@@ -735,12 +735,15 @@ nxt_request_app_link_use(nxt_task_t *task, nxt_request_app_link_t *req_app_link,
|
||||
|
||||
|
||||
nxt_inline void
|
||||
nxt_request_app_link_error(nxt_request_app_link_t *req_app_link, int code,
|
||||
const char *str)
|
||||
nxt_request_app_link_error(nxt_task_t *task, nxt_app_t *app,
|
||||
nxt_request_app_link_t *req_app_link, const char *str)
|
||||
{
|
||||
req_app_link->app_port = NULL;
|
||||
req_app_link->err_code = code;
|
||||
req_app_link->err_code = 500;
|
||||
req_app_link->err_str = str;
|
||||
|
||||
nxt_alert(task, "app \"%V\" internal error: %s on #%uD",
|
||||
&app->name, str, req_app_link->stream);
|
||||
}
|
||||
|
||||
|
||||
@@ -3909,7 +3912,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
nxt_debug(task, "app '%V' %p abort next stream #%uD",
|
||||
&app->name, app, req_app_link->stream);
|
||||
|
||||
nxt_request_app_link_error(req_app_link, 500,
|
||||
nxt_request_app_link_error(task, app, req_app_link,
|
||||
"Failed to start application process");
|
||||
nxt_request_app_link_use(task, req_app_link, -1);
|
||||
}
|
||||
@@ -4665,7 +4668,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
|
||||
nxt_port_use(task, state->port, -1);
|
||||
}
|
||||
|
||||
nxt_request_app_link_error(state->req_app_link, 500,
|
||||
nxt_request_app_link_error(task, app, state->req_app_link,
|
||||
"Failed to allocate shared req<->app link");
|
||||
|
||||
return NXT_ERROR;
|
||||
@@ -4693,7 +4696,7 @@ nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
|
||||
res = nxt_router_start_app_process(task, app);
|
||||
|
||||
if (nxt_slow_path(res != NXT_OK)) {
|
||||
nxt_request_app_link_error(req_app_link, 500,
|
||||
nxt_request_app_link_error(task, app, req_app_link,
|
||||
"Failed to start app process");
|
||||
|
||||
return NXT_ERROR;
|
||||
@@ -4808,25 +4811,26 @@ nxt_router_app_prepare_request(nxt_task_t *task,
|
||||
|
||||
apr_action = NXT_APR_REQUEST_FAILED;
|
||||
|
||||
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
|
||||
reply_port->id);
|
||||
c_port = nxt_process_connected_port_find_add(port->process, reply_port);
|
||||
|
||||
if (nxt_slow_path(c_port != reply_port)) {
|
||||
res = nxt_port_send_port(task, port, reply_port, 0);
|
||||
|
||||
if (nxt_slow_path(res != NXT_OK)) {
|
||||
nxt_request_app_link_error(req_app_link, 500,
|
||||
nxt_request_app_link_error(task, port->app, req_app_link,
|
||||
"Failed to send reply port to application");
|
||||
|
||||
nxt_process_connected_port_remove(port->process, reply_port);
|
||||
|
||||
goto release_port;
|
||||
}
|
||||
|
||||
nxt_process_connected_port_add(port->process, reply_port);
|
||||
}
|
||||
|
||||
buf = nxt_router_prepare_msg(task, req_app_link->request, port,
|
||||
nxt_app_msg_prefix[port->app->type]);
|
||||
|
||||
if (nxt_slow_path(buf == NULL)) {
|
||||
nxt_request_app_link_error(req_app_link, 500,
|
||||
nxt_request_app_link_error(task, port->app, req_app_link,
|
||||
"Failed to prepare message for application");
|
||||
goto release_port;
|
||||
}
|
||||
@@ -4850,7 +4854,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
|
||||
&req_app_link->msg_info.tracking,
|
||||
req_app_link->stream);
|
||||
if (nxt_slow_path(res != NXT_OK)) {
|
||||
nxt_request_app_link_error(req_app_link, 500,
|
||||
nxt_request_app_link_error(task, port->app, req_app_link,
|
||||
"Failed to get tracking area");
|
||||
goto release_port;
|
||||
}
|
||||
@@ -4868,7 +4872,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
|
||||
&req_app_link->msg_info.tracking);
|
||||
|
||||
if (nxt_slow_path(res != NXT_OK)) {
|
||||
nxt_request_app_link_error(req_app_link, 500,
|
||||
nxt_request_app_link_error(task, port->app, req_app_link,
|
||||
"Failed to send message to application");
|
||||
goto release_port;
|
||||
}
|
||||
|
||||
@@ -4312,6 +4312,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
|
||||
rc = nxt_unit_port_hash_add(&lib->ports, &new_port->port);
|
||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||
nxt_unit_alert(ctx, "add_port: %d,%d hash_add failed",
|
||||
port->id.pid, port->id.id);
|
||||
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user