Resolving a racing condition while adding ports on the app's side.

An earlier attempt (ad6265786871) to resolve this condition on the
router's side added a new issue: the app could get a request before
acquiring a port.
This commit is contained in:
Max Romanov
2020-04-10 16:21:58 +03:00
parent c7f5c1c664
commit 58cc13ab29
5 changed files with 48 additions and 13 deletions

View File

@@ -50,7 +50,11 @@ func add_port(p *port) {
port_registry_.m = make(map[port_key]*port) port_registry_.m = make(map[port_key]*port)
} }
old := port_registry_.m[p.key]
if old == nil {
port_registry_.m[p.key] = p port_registry_.m[p.key] = p
}
port_registry_.Unlock() port_registry_.Unlock()
} }

View File

@@ -590,6 +590,17 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
} }
void
nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
{
nxt_thread_mutex_lock(&process->cp_mutex);
nxt_port_hash_add(&process->connected_ports, port);
nxt_thread_mutex_unlock(&process->cp_mutex);
}
void void
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
{ {
@@ -602,7 +613,7 @@ nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
nxt_port_t * nxt_port_t *
nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port) nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port)
{ {
nxt_port_t *res; nxt_port_t *res;
@@ -610,10 +621,6 @@ nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *port)
res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id); res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id);
if (nxt_slow_path(res == NULL)) {
nxt_port_hash_add(&process->connected_ports, port);
}
nxt_thread_mutex_unlock(&process->cp_mutex); nxt_thread_mutex_unlock(&process->cp_mutex);
return res; return res;

View File

@@ -105,10 +105,12 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process); void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
void nxt_process_connected_port_remove(nxt_process_t *process, void nxt_process_connected_port_remove(nxt_process_t *process,
nxt_port_t *port); nxt_port_t *port);
nxt_port_t *nxt_process_connected_port_find_add(nxt_process_t *process, nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
nxt_port_t *port); nxt_port_t *port);
void nxt_worker_process_quit_handler(nxt_task_t *task, void nxt_worker_process_quit_handler(nxt_task_t *task,

View File

@@ -4811,7 +4811,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
apr_action = NXT_APR_REQUEST_FAILED; apr_action = NXT_APR_REQUEST_FAILED;
c_port = nxt_process_connected_port_find_add(port->process, reply_port); c_port = nxt_process_connected_port_find(port->process, reply_port);
if (nxt_slow_path(c_port != reply_port)) { if (nxt_slow_path(c_port != reply_port)) {
res = nxt_port_send_port(task, port, reply_port, 0); res = nxt_port_send_port(task, port, reply_port, 0);
@@ -4820,10 +4820,10 @@ nxt_router_app_prepare_request(nxt_task_t *task,
nxt_request_app_link_error(task, port->app, req_app_link, nxt_request_app_link_error(task, port->app, req_app_link,
"Failed to send reply port to application"); "Failed to send reply port to application");
nxt_process_connected_port_remove(port->process, reply_port);
goto release_port; goto release_port;
} }
nxt_process_connected_port_add(port->process, reply_port);
} }
buf = nxt_router_prepare_msg(task, req_app_link->request, port, buf = nxt_router_prepare_msg(task, req_app_link->request, port,

View File

@@ -4282,16 +4282,38 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int rc; int rc;
nxt_unit_impl_t *lib; nxt_unit_impl_t *lib;
nxt_unit_process_t *process; nxt_unit_process_t *process;
nxt_unit_port_impl_t *new_port; nxt_unit_port_impl_t *new_port, *old_port;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
pthread_mutex_lock(&lib->mutex);
old_port = nxt_unit_port_hash_find(&lib->ports, &port->id, 0);
if (nxt_slow_path(old_port != NULL)) {
nxt_unit_debug(ctx, "add_port: duplicate %d,%d in_fd %d out_fd %d",
port->id.pid, port->id.id,
port->in_fd, port->out_fd);
if (port->in_fd != -1) {
close(port->in_fd);
port->in_fd = -1;
}
if (port->out_fd != -1) {
close(port->out_fd);
port->out_fd = -1;
}
pthread_mutex_unlock(&lib->mutex);
return NXT_UNIT_OK;
}
nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d", nxt_unit_debug(ctx, "add_port: %d,%d in_fd %d out_fd %d",
port->id.pid, port->id.id, port->id.pid, port->id.id,
port->in_fd, port->out_fd); port->in_fd, port->out_fd);
pthread_mutex_lock(&lib->mutex);
process = nxt_unit_process_get(ctx, port->id.pid); process = nxt_unit_process_get(ctx, port->id.pid);
if (nxt_slow_path(process == NULL)) { if (nxt_slow_path(process == NULL)) {
rc = NXT_UNIT_ERROR; rc = NXT_UNIT_ERROR;