Filtering process to keep connection.
- Main process should be connected to all other processes. - Controller should be connected to Router. - Router should be connected to Controller and all Workers. - Workers should be connected to Router worker thread ports only. This filtering helps to avoid unnecessary communication and various errors during massive application workers stop / restart.
This commit is contained in:
@@ -764,6 +764,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_runtime_t *rt;
|
nxt_runtime_t *rt;
|
||||||
nxt_process_t *process;
|
nxt_process_t *process;
|
||||||
|
nxt_process_type_t ptype;
|
||||||
nxt_process_init_t *init;
|
nxt_process_init_t *init;
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
@@ -773,6 +774,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
|
|||||||
if (process) {
|
if (process) {
|
||||||
init = process->init;
|
init = process->init;
|
||||||
|
|
||||||
|
ptype = nxt_process_type(process);
|
||||||
|
|
||||||
nxt_process_close_ports(task, process);
|
nxt_process_close_ports(task, process);
|
||||||
|
|
||||||
if (!nxt_exiting) {
|
if (!nxt_exiting) {
|
||||||
@@ -787,6 +790,10 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
|
|||||||
|
|
||||||
port = nxt_process_port_first(process);
|
port = nxt_process_port_first(process);
|
||||||
|
|
||||||
|
if (nxt_proc_remove_notify_martix[ptype][port->type] == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
|
buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
|
||||||
sizeof(pid));
|
sizeof(pid));
|
||||||
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
|
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
|
||||||
|
|||||||
@@ -177,7 +177,7 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
nxt_inline void
|
||||||
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
||||||
nxt_port_t *new_port, uint32_t stream)
|
nxt_port_t *new_port, uint32_t stream)
|
||||||
{
|
{
|
||||||
@@ -195,10 +195,7 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
|||||||
|
|
||||||
port = nxt_process_port_first(process);
|
port = nxt_process_port_first(process);
|
||||||
|
|
||||||
if (port->type == NXT_PROCESS_MAIN
|
if (nxt_proc_conn_martix[port->type][new_port->type]) {
|
||||||
|| port->type == NXT_PROCESS_CONTROLLER
|
|
||||||
|| port->type == NXT_PROCESS_ROUTER)
|
|
||||||
{
|
|
||||||
(void) nxt_port_send_port(task, port, new_port, stream);
|
(void) nxt_port_send_port(task, port, new_port, stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -227,8 +227,6 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
|
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_handlers_t *handlers);
|
nxt_port_handlers_t *handlers);
|
||||||
void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
|
||||||
nxt_port_t *port, uint32_t stream);
|
|
||||||
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
|
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_t *new_port, uint32_t stream);
|
nxt_port_t *new_port, uint32_t stream);
|
||||||
void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
|
void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
|
||||||
|
|||||||
@@ -18,13 +18,31 @@ nxt_pid_t nxt_pid;
|
|||||||
/* An original parent process pid. */
|
/* An original parent process pid. */
|
||||||
nxt_pid_t nxt_ppid;
|
nxt_pid_t nxt_ppid;
|
||||||
|
|
||||||
|
nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
|
||||||
|
{ 0, 0, 0, 0, 0, 0 },
|
||||||
|
{ 0, 1, 1, 1, 1, 1 },
|
||||||
|
{ 0, 1, 0, 0, 0, 0 },
|
||||||
|
{ 0, 1, 0, 0, 1, 0 },
|
||||||
|
{ 0, 1, 0, 1, 0, 1 },
|
||||||
|
{ 0, 1, 0, 0, 0, 0 },
|
||||||
|
};
|
||||||
|
|
||||||
|
nxt_bool_t nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
|
||||||
|
{ 0, 0, 0, 0, 0, 0 },
|
||||||
|
{ 0, 0, 0, 0, 0, 0 },
|
||||||
|
{ 0, 0, 0, 0, 0, 0 },
|
||||||
|
{ 0, 0, 0, 0, 1, 0 },
|
||||||
|
{ 0, 0, 0, 1, 0, 1 },
|
||||||
|
{ 0, 0, 0, 0, 1, 0 },
|
||||||
|
};
|
||||||
|
|
||||||
nxt_pid_t
|
nxt_pid_t
|
||||||
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
|
nxt_process_create(nxt_task_t *task, nxt_process_t *process)
|
||||||
{
|
{
|
||||||
nxt_pid_t pid;
|
nxt_pid_t pid;
|
||||||
nxt_process_t *p;
|
nxt_process_t *p;
|
||||||
nxt_runtime_t *rt;
|
nxt_runtime_t *rt;
|
||||||
|
nxt_process_type_t ptype;
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
@@ -48,6 +66,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
|
|||||||
|
|
||||||
rt->types = 0;
|
rt->types = 0;
|
||||||
|
|
||||||
|
ptype = process->init->type;
|
||||||
|
|
||||||
nxt_port_reset_next_id();
|
nxt_port_reset_next_id();
|
||||||
|
|
||||||
nxt_event_engine_thread_adopt(task->thread->engine);
|
nxt_event_engine_thread_adopt(task->thread->engine);
|
||||||
@@ -55,16 +75,25 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
|
|||||||
/* Remove not ready processes */
|
/* Remove not ready processes */
|
||||||
nxt_runtime_process_each(rt, p) {
|
nxt_runtime_process_each(rt, p) {
|
||||||
|
|
||||||
|
if (nxt_proc_conn_martix[ptype][nxt_process_type(p)] == 0) {
|
||||||
|
nxt_debug(task, "remove not required process %PI", p->pid);
|
||||||
|
|
||||||
|
nxt_process_close_ports(task, p);
|
||||||
|
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (!p->ready) {
|
if (!p->ready) {
|
||||||
nxt_debug(task, "remove not ready process %PI", p->pid);
|
nxt_debug(task, "remove not ready process %PI", p->pid);
|
||||||
|
|
||||||
nxt_process_close_ports(task, p);
|
nxt_process_close_ports(task, p);
|
||||||
|
|
||||||
} else {
|
continue;
|
||||||
nxt_port_mmaps_destroy(&p->incoming, 0);
|
|
||||||
nxt_port_mmaps_destroy(&p->outgoing, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_port_mmaps_destroy(&p->incoming, 0);
|
||||||
|
nxt_port_mmaps_destroy(&p->outgoing, 0);
|
||||||
|
|
||||||
} nxt_runtime_process_loop;
|
} nxt_runtime_process_loop;
|
||||||
|
|
||||||
nxt_runtime_process_add(task, process);
|
nxt_runtime_process_add(task, process);
|
||||||
@@ -598,6 +627,14 @@ nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
nxt_process_type_t
|
||||||
|
nxt_process_type(nxt_process_t *process)
|
||||||
|
{
|
||||||
|
return nxt_queue_is_empty(&process->ports) ? 0 :
|
||||||
|
(nxt_process_port_first(process))->type;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
|
nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -70,6 +70,10 @@ typedef struct {
|
|||||||
} nxt_process_t;
|
} nxt_process_t;
|
||||||
|
|
||||||
|
|
||||||
|
extern nxt_bool_t nxt_proc_conn_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
|
||||||
|
extern nxt_bool_t
|
||||||
|
nxt_proc_remove_notify_martix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
|
||||||
|
|
||||||
NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task,
|
NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task,
|
||||||
nxt_process_t *process);
|
nxt_process_t *process);
|
||||||
NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name,
|
NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name,
|
||||||
@@ -95,6 +99,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
|
|||||||
#define nxt_process_port_loop \
|
#define nxt_process_port_loop \
|
||||||
nxt_queue_loop
|
nxt_queue_loop
|
||||||
|
|
||||||
|
nxt_process_type_t nxt_process_type(nxt_process_t *process);
|
||||||
|
|
||||||
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
||||||
|
|
||||||
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
||||||
|
|||||||
Reference in New Issue
Block a user