Introducing process use counter.
This helps to decouple process removal from port memory pool cleanups.
This commit is contained in:
@@ -257,11 +257,14 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
|
|||||||
|
|
||||||
port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
|
port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
|
nxt_process_use(task, process, -1);
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_process_port_add(task, process, port);
|
nxt_process_port_add(task, process, port);
|
||||||
|
|
||||||
|
nxt_process_use(task, process, -1);
|
||||||
|
|
||||||
ret = nxt_port_socket_init(task, port, 0);
|
ret = nxt_port_socket_init(task, port, 0);
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
nxt_port_use(task, port, -1);
|
nxt_port_use(task, port, -1);
|
||||||
@@ -511,12 +514,14 @@ nxt_main_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
|
|||||||
|
|
||||||
port = nxt_port_new(task, 0, 0, init->type);
|
port = nxt_port_new(task, 0, 0, init->type);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
nxt_runtime_process_remove(task, process);
|
nxt_process_use(task, process, -1);
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_process_port_add(task, process, port);
|
nxt_process_port_add(task, process, port);
|
||||||
|
|
||||||
|
nxt_process_use(task, process, -1);
|
||||||
|
|
||||||
ret = nxt_port_socket_init(task, port, 0);
|
ret = nxt_port_socket_init(task, port, 0);
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
nxt_port_use(task, port, -1);
|
nxt_port_use(task, port, -1);
|
||||||
@@ -760,7 +765,7 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
|
|||||||
if (process) {
|
if (process) {
|
||||||
init = process->init;
|
init = process->init;
|
||||||
|
|
||||||
nxt_runtime_process_remove(task, process);
|
nxt_process_close_ports(task, process);
|
||||||
|
|
||||||
if (!nxt_exiting) {
|
if (!nxt_exiting) {
|
||||||
nxt_runtime_process_each(rt, process) {
|
nxt_runtime_process_each(rt, process) {
|
||||||
|
|||||||
@@ -112,7 +112,11 @@ nxt_port_release(nxt_task_t *task, nxt_port_t *port)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (port->link.next != NULL) {
|
if (port->link.next != NULL) {
|
||||||
|
nxt_assert(port->process != NULL);
|
||||||
|
|
||||||
nxt_process_port_remove(port);
|
nxt_process_port_remove(port);
|
||||||
|
|
||||||
|
nxt_process_use(task, port->process, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_mp_release(port->mem_pool, NULL);
|
nxt_mp_release(port->mem_pool, NULL);
|
||||||
@@ -265,11 +269,14 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
|
port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
|
||||||
new_port_msg->type);
|
new_port_msg->type);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
|
nxt_process_use(task, process, -1);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_process_port_add(task, process, port);
|
nxt_process_port_add(task, process, port);
|
||||||
|
|
||||||
|
nxt_process_use(task, process, -1);
|
||||||
|
|
||||||
port->pair[0] = -1;
|
port->pair[0] = -1;
|
||||||
port->pair[1] = msg->fd;
|
port->pair[1] = msg->fd;
|
||||||
port->max_size = new_port_msg->max_size;
|
port->max_size = new_port_msg->max_size;
|
||||||
@@ -298,7 +305,7 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
nxt_assert(nxt_runtime_is_main(rt));
|
nxt_assert(nxt_runtime_is_main(rt));
|
||||||
|
|
||||||
process = nxt_runtime_process_get(rt, msg->port_msg.pid);
|
process = nxt_runtime_process_find(rt, msg->port_msg.pid);
|
||||||
if (nxt_slow_path(process == NULL)) {
|
if (nxt_slow_path(process == NULL)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -452,7 +459,7 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
process = nxt_runtime_process_find(rt, pid);
|
process = nxt_runtime_process_find(rt, pid);
|
||||||
|
|
||||||
if (process) {
|
if (process) {
|
||||||
nxt_runtime_process_remove(task, process);
|
nxt_process_close_ports(task, process);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
|
|||||||
if (!p->ready) {
|
if (!p->ready) {
|
||||||
nxt_debug(task, "remove not ready process %PI", p->pid);
|
nxt_debug(task, "remove not ready process %PI", p->pid);
|
||||||
|
|
||||||
nxt_runtime_process_remove(task, p);
|
nxt_process_close_ports(task, p);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
nxt_port_mmaps_destroy(p->incoming, 0);
|
nxt_port_mmaps_destroy(p->incoming, 0);
|
||||||
@@ -586,29 +586,30 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
|
||||||
{
|
|
||||||
nxt_process_t *process;
|
|
||||||
|
|
||||||
process = obj;
|
|
||||||
|
|
||||||
process->port_cleanups--;
|
|
||||||
|
|
||||||
if (process->port_cleanups == 0) {
|
|
||||||
nxt_runtime_process_remove(task, process);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
|
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
|
||||||
{
|
{
|
||||||
|
nxt_assert(port->process == NULL);
|
||||||
|
|
||||||
port->process = process;
|
port->process = process;
|
||||||
nxt_queue_insert_tail(&process->ports, &port->link);
|
nxt_queue_insert_tail(&process->ports, &port->link);
|
||||||
|
|
||||||
nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process,
|
nxt_process_use(task, process, 1);
|
||||||
NULL);
|
}
|
||||||
process->port_cleanups++;
|
|
||||||
|
|
||||||
|
void
|
||||||
|
nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
|
||||||
|
{
|
||||||
|
nxt_port_t *port;
|
||||||
|
|
||||||
|
nxt_process_port_each(process, port) {
|
||||||
|
|
||||||
|
nxt_port_close(task, port);
|
||||||
|
|
||||||
|
nxt_runtime_port_remove(task, port);
|
||||||
|
|
||||||
|
} nxt_process_port_loop;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -622,6 +623,7 @@ nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
|
|||||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
nxt_thread_mutex_unlock(&process->cp_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
|
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
|
||||||
{
|
{
|
||||||
@@ -632,6 +634,7 @@ nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
|
|||||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
nxt_thread_mutex_unlock(&process->cp_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_port_t *
|
nxt_port_t *
|
||||||
nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
|
nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
|
||||||
nxt_port_id_t port_id)
|
nxt_port_id_t port_id)
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ typedef struct {
|
|||||||
nxt_queue_t ports; /* of nxt_port_t */
|
nxt_queue_t ports; /* of nxt_port_t */
|
||||||
nxt_bool_t ready;
|
nxt_bool_t ready;
|
||||||
nxt_bool_t registered;
|
nxt_bool_t registered;
|
||||||
nxt_uint_t port_cleanups;
|
nxt_int_t use_count;
|
||||||
|
|
||||||
nxt_process_init_t *init;
|
nxt_process_init_t *init;
|
||||||
|
|
||||||
@@ -87,6 +87,7 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
|
|||||||
#define nxt_process_port_loop \
|
#define nxt_process_port_loop \
|
||||||
nxt_queue_loop
|
nxt_queue_loop
|
||||||
|
|
||||||
|
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
||||||
|
|
||||||
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
||||||
|
|
||||||
|
|||||||
@@ -553,7 +553,7 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
nxt_runtime_process_each(rt, process) {
|
nxt_runtime_process_each(rt, process) {
|
||||||
|
|
||||||
nxt_runtime_process_remove(task, process);
|
nxt_process_close_ports(task, process);
|
||||||
|
|
||||||
} nxt_runtime_process_loop;
|
} nxt_runtime_process_loop;
|
||||||
|
|
||||||
@@ -1580,6 +1580,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
|||||||
nxt_thread_mutex_create(&process->outgoing_mutex);
|
nxt_thread_mutex_create(&process->outgoing_mutex);
|
||||||
nxt_thread_mutex_create(&process->cp_mutex);
|
nxt_thread_mutex_create(&process->cp_mutex);
|
||||||
|
|
||||||
|
process->use_count = 1;
|
||||||
|
|
||||||
return process;
|
return process;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1590,7 +1592,7 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_lvlhsh_each_t lhe;
|
nxt_lvlhsh_each_t lhe;
|
||||||
|
|
||||||
nxt_assert(process->port_cleanups == 0);
|
nxt_assert(process->use_count == 0);
|
||||||
nxt_assert(process->registered == 0);
|
nxt_assert(process->registered == 0);
|
||||||
|
|
||||||
nxt_port_mmaps_destroy(process->incoming, 1);
|
nxt_port_mmaps_destroy(process->incoming, 1);
|
||||||
@@ -1685,7 +1687,11 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
|
|||||||
nxt_thread_log_debug("process %PI found", pid);
|
nxt_thread_log_debug("process %PI found", pid);
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||||
return lhq.value;
|
|
||||||
|
process = lhq.value;
|
||||||
|
process->use_count++;
|
||||||
|
|
||||||
|
return process;
|
||||||
}
|
}
|
||||||
|
|
||||||
process = nxt_runtime_process_new(rt);
|
process = nxt_runtime_process_new(rt);
|
||||||
@@ -1812,28 +1818,20 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
|
|||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process)
|
nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
|
||||||
{
|
{
|
||||||
nxt_port_t *port;
|
|
||||||
nxt_runtime_t *rt;
|
nxt_runtime_t *rt;
|
||||||
|
|
||||||
|
process->use_count += i;
|
||||||
|
|
||||||
|
if (process->use_count == 0) {
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
if (process->port_cleanups == 0) {
|
|
||||||
if (process->registered == 1) {
|
if (process->registered == 1) {
|
||||||
nxt_runtime_process_remove_pid(rt, process->pid);
|
nxt_runtime_process_remove_pid(rt, process->pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_runtime_process_destroy(rt, process);
|
nxt_runtime_process_destroy(rt, process);
|
||||||
|
|
||||||
} else {
|
|
||||||
nxt_process_port_each(process, port) {
|
|
||||||
|
|
||||||
nxt_port_close(task, port);
|
|
||||||
|
|
||||||
nxt_runtime_port_remove(task, port);
|
|
||||||
|
|
||||||
} nxt_process_port_loop;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
|
|||||||
|
|
||||||
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
|
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
|
||||||
|
|
||||||
void nxt_runtime_process_remove(nxt_task_t *task, nxt_process_t *process);
|
void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
|
||||||
|
|
||||||
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
|
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
|
||||||
nxt_lvlhsh_each_t *lhe);
|
nxt_lvlhsh_each_t *lhe);
|
||||||
|
|||||||
Reference in New Issue
Block a user