Triggering RPC error for all handlers on port close.
This is required to avoid crashes and memory leaks on Unit exit.
This commit is contained in:
@@ -86,6 +86,8 @@ nxt_port_close(nxt_task_t *task, nxt_port_t *port)
|
|||||||
port->id, port->type);
|
port->id, port->type);
|
||||||
|
|
||||||
if (port->pair[0] != -1) {
|
if (port->pair[0] != -1) {
|
||||||
|
nxt_port_rpc_close(task, port);
|
||||||
|
|
||||||
nxt_fd_close(port->pair[0]);
|
nxt_fd_close(port->pair[0]);
|
||||||
port->pair[0] = -1;
|
port->pair[0] = -1;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -461,3 +461,41 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
|
|||||||
|
|
||||||
nxt_port_use(task, port, -1);
|
nxt_port_use(task, port, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static nxt_buf_t nxt_port_close_dummy_buf;
|
||||||
|
|
||||||
|
void
|
||||||
|
nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
|
||||||
|
{
|
||||||
|
nxt_lvlhsh_each_t lhe;
|
||||||
|
nxt_port_rpc_reg_t *reg;
|
||||||
|
nxt_port_recv_msg_t msg;
|
||||||
|
|
||||||
|
for ( ;; ) {
|
||||||
|
nxt_memzero(&lhe, sizeof(nxt_lvlhsh_each_t));
|
||||||
|
|
||||||
|
lhe.proto = &lvlhsh_rpc_reg_proto;
|
||||||
|
|
||||||
|
reg = nxt_lvlhsh_each(&port->rpc_streams, &lhe);
|
||||||
|
if (reg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.fd = -1;
|
||||||
|
msg.buf = &nxt_port_close_dummy_buf;
|
||||||
|
msg.port = port;
|
||||||
|
msg.port_msg.stream = reg->stream;
|
||||||
|
msg.port_msg.pid = nxt_pid;
|
||||||
|
msg.port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
||||||
|
msg.port_msg.last = 1;
|
||||||
|
msg.port_msg.mmap = 0;
|
||||||
|
msg.port_msg.nf = 0;
|
||||||
|
msg.port_msg.mf = 0;
|
||||||
|
msg.port_msg.tracking = 0;
|
||||||
|
msg.size = sizeof(msg.port_msg);
|
||||||
|
msg.cancelled = 0;
|
||||||
|
msg.u.data = NULL;
|
||||||
|
|
||||||
|
nxt_port_rpc_handler(task, &msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ void nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
|||||||
void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port,
|
void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_pid_t peer);
|
nxt_pid_t peer);
|
||||||
void nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream);
|
void nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream);
|
||||||
|
void nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port);
|
||||||
|
|
||||||
|
|
||||||
#endif /* _NXT_PORT_RPC_H_INCLUDED_ */
|
#endif /* _NXT_PORT_RPC_H_INCLUDED_ */
|
||||||
|
|||||||
Reference in New Issue
Block a user