Runtime processes protected with mutex.
This commit is contained in:
@@ -406,7 +406,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
|
|||||||
|
|
||||||
port = nxt_port_new(0, 0, init->type);
|
port = nxt_port_new(0, 0, init->type);
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
nxt_runtime_process_destroy(rt, process);
|
nxt_runtime_process_remove(rt, process);
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -658,7 +658,9 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
|
|||||||
if (!nxt_exiting) {
|
if (!nxt_exiting) {
|
||||||
nxt_runtime_process_each(rt, process)
|
nxt_runtime_process_each(rt, process)
|
||||||
{
|
{
|
||||||
if (process->pid == nxt_pid) {
|
if (process->pid == nxt_pid ||
|
||||||
|
process->pid == pid ||
|
||||||
|
nxt_queue_is_empty(&process->ports)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -106,28 +106,6 @@ nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
|
|
||||||
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
|
|
||||||
{
|
|
||||||
nxt_port_t *port;
|
|
||||||
nxt_process_t *process;
|
|
||||||
|
|
||||||
nxt_runtime_process_each(rt, process)
|
|
||||||
{
|
|
||||||
if (nxt_pid != process->pid) {
|
|
||||||
nxt_process_port_each(process, port) {
|
|
||||||
|
|
||||||
(void) nxt_port_socket_write(task, port, type,
|
|
||||||
fd, stream, 0, b);
|
|
||||||
|
|
||||||
} nxt_process_port_loop;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nxt_runtime_process_loop;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
@@ -276,6 +254,8 @@ nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
|
nxt_assert(nxt_runtime_is_master(rt));
|
||||||
|
|
||||||
process = nxt_runtime_process_get(rt, msg->port_msg.pid);
|
process = nxt_runtime_process_get(rt, msg->port_msg.pid);
|
||||||
if (nxt_slow_path(process == NULL)) {
|
if (nxt_slow_path(process == NULL)) {
|
||||||
return;
|
return;
|
||||||
@@ -290,9 +270,7 @@ nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
|
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
|
||||||
|
|
||||||
if (nxt_runtime_is_master(rt)) {
|
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
|
||||||
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -310,7 +288,7 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
process = nxt_runtime_process_get(rt, msg->port_msg.pid);
|
process = nxt_runtime_process_find(rt, msg->port_msg.pid);
|
||||||
if (nxt_slow_path(process == NULL)) {
|
if (nxt_slow_path(process == NULL)) {
|
||||||
nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
|
nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
|
||||||
msg->port_msg.pid);
|
msg->port_msg.pid);
|
||||||
@@ -415,14 +393,14 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
nxt_runtime_t *rt;
|
nxt_runtime_t *rt;
|
||||||
nxt_process_t *process;
|
nxt_process_t *process;
|
||||||
|
|
||||||
nxt_debug(task, "port remove pid handler");
|
|
||||||
|
|
||||||
buf = msg->buf;
|
buf = msg->buf;
|
||||||
|
|
||||||
nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
|
nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
|
||||||
|
|
||||||
nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
|
nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
|
||||||
|
|
||||||
|
nxt_debug(task, "port remove pid %PI handler", pid);
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
nxt_port_rpc_remove_peer(task, msg->port, pid);
|
nxt_port_rpc_remove_peer(task, msg->port, pid);
|
||||||
|
|||||||
@@ -160,8 +160,6 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
|
void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_handler_t *handlers);
|
nxt_port_handler_t *handlers);
|
||||||
void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
|
|
||||||
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
|
|
||||||
void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
||||||
nxt_port_t *port, uint32_t stream);
|
nxt_port_t *port, uint32_t stream);
|
||||||
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
|
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
|
||||||
|
|||||||
@@ -379,7 +379,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
|||||||
nxt_port_mmap_t *port_mmap;
|
nxt_port_mmap_t *port_mmap;
|
||||||
nxt_port_mmap_header_t *hdr;
|
nxt_port_mmap_header_t *hdr;
|
||||||
|
|
||||||
process = nxt_runtime_process_get(task->thread->runtime, spid);
|
process = nxt_runtime_process_find(task->thread->runtime, spid);
|
||||||
if (nxt_slow_path(process == NULL)) {
|
if (nxt_slow_path(process == NULL)) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -577,7 +577,7 @@ nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
|||||||
process->port_cleanups--;
|
process->port_cleanups--;
|
||||||
|
|
||||||
if (process->port_cleanups == 0) {
|
if (process->port_cleanups == 0) {
|
||||||
nxt_runtime_process_destroy(rt, process);
|
nxt_runtime_process_remove(rt, process);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -57,6 +57,7 @@ typedef struct {
|
|||||||
nxt_pid_t pid;
|
nxt_pid_t pid;
|
||||||
nxt_queue_t ports; /* of nxt_port_t */
|
nxt_queue_t ports; /* of nxt_port_t */
|
||||||
nxt_bool_t ready;
|
nxt_bool_t ready;
|
||||||
|
nxt_bool_t registered;
|
||||||
nxt_uint_t port_cleanups;
|
nxt_uint_t port_cleanups;
|
||||||
|
|
||||||
nxt_process_init_t *init;
|
nxt_process_init_t *init;
|
||||||
|
|||||||
@@ -48,6 +48,11 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
|
|||||||
nxt_runtime_cont_t cont);
|
nxt_runtime_cont_t cont);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
|
||||||
|
nxt_process_t *process);
|
||||||
|
static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
|
||||||
|
nxt_pid_t pid);
|
||||||
|
|
||||||
|
|
||||||
nxt_int_t
|
nxt_int_t
|
||||||
nxt_runtime_create(nxt_task_t *task)
|
nxt_runtime_create(nxt_task_t *task)
|
||||||
@@ -71,6 +76,8 @@ nxt_runtime_create(nxt_task_t *task)
|
|||||||
task->thread->runtime = rt;
|
task->thread->runtime = rt;
|
||||||
rt->mem_pool = mp;
|
rt->mem_pool = mp;
|
||||||
|
|
||||||
|
nxt_thread_mutex_create(&rt->processes_mutex);
|
||||||
|
|
||||||
rt->prefix = nxt_current_directory(mp);
|
rt->prefix = nxt_current_directory(mp);
|
||||||
if (nxt_slow_path(rt->prefix == NULL)) {
|
if (nxt_slow_path(rt->prefix == NULL)) {
|
||||||
goto fail;
|
goto fail;
|
||||||
@@ -557,6 +564,8 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
} nxt_runtime_process_loop;
|
} nxt_runtime_process_loop;
|
||||||
|
|
||||||
|
nxt_thread_mutex_destroy(&rt->processes_mutex);
|
||||||
|
|
||||||
nxt_mp_destroy(rt->mem_pool);
|
nxt_mp_destroy(rt->mem_pool);
|
||||||
|
|
||||||
nxt_debug(task, "exit");
|
nxt_debug(task, "exit");
|
||||||
@@ -1488,10 +1497,11 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
static void
|
||||||
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
||||||
{
|
{
|
||||||
nxt_assert(process->port_cleanups == 0);
|
nxt_assert(process->port_cleanups == 0);
|
||||||
|
nxt_assert(process->registered == 0);
|
||||||
|
|
||||||
nxt_port_mmaps_destroy(process->incoming, 1);
|
nxt_port_mmaps_destroy(process->incoming, 1);
|
||||||
nxt_port_mmaps_destroy(process->outgoing, 1);
|
nxt_port_mmaps_destroy(process->outgoing, 1);
|
||||||
@@ -1533,23 +1543,38 @@ static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
nxt_inline void
|
||||||
|
nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
|
||||||
|
{
|
||||||
|
lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
|
||||||
|
lhq->key.length = sizeof(*pid);
|
||||||
|
lhq->key.start = (u_char *) pid;
|
||||||
|
lhq->proto = &lvlhsh_processes_proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_process_t *
|
nxt_process_t *
|
||||||
nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
|
nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||||
{
|
{
|
||||||
|
nxt_process_t *process;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid));
|
process = NULL;
|
||||||
lhq.key.length = sizeof(pid);
|
|
||||||
lhq.key.start = (u_char *) &pid;
|
nxt_runtime_process_lhq_pid(&lhq, &pid);
|
||||||
lhq.proto = &lvlhsh_processes_proto;
|
|
||||||
|
nxt_thread_mutex_lock(&rt->processes_mutex);
|
||||||
|
|
||||||
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
|
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
|
||||||
return lhq.value;
|
process = lhq.value;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_thread_log_debug("process %PI not found", pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_thread_log_debug("process %PI not found", pid);
|
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||||
|
|
||||||
return NULL;
|
return process;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -1559,13 +1584,14 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
|
|||||||
nxt_process_t *process;
|
nxt_process_t *process;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&pid, sizeof(pid));
|
nxt_runtime_process_lhq_pid(&lhq, &pid);
|
||||||
lhq.key.length = sizeof(pid);
|
|
||||||
lhq.key.start = (u_char *) &pid;
|
nxt_thread_mutex_lock(&rt->processes_mutex);
|
||||||
lhq.proto = &lvlhsh_processes_proto;
|
|
||||||
|
|
||||||
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
|
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
|
||||||
nxt_thread_log_debug("process %PI found", pid);
|
nxt_thread_log_debug("process %PI found", pid);
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||||
return lhq.value;
|
return lhq.value;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1589,6 +1615,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
|
|||||||
|
|
||||||
rt->nprocesses++;
|
rt->nprocesses++;
|
||||||
|
|
||||||
|
process->registered = 1;
|
||||||
|
|
||||||
nxt_thread_log_debug("process %PI insert", pid);
|
nxt_thread_log_debug("process %PI insert", pid);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@@ -1597,6 +1625,8 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||||
|
|
||||||
return process;
|
return process;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1607,14 +1637,16 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
|
nxt_assert(process->registered == 0);
|
||||||
lhq.key.length = sizeof(process->pid);
|
|
||||||
lhq.key.start = (u_char *) &process->pid;
|
nxt_runtime_process_lhq_pid(&lhq, &process->pid);
|
||||||
lhq.proto = &lvlhsh_processes_proto;
|
|
||||||
lhq.replace = 0;
|
lhq.replace = 0;
|
||||||
lhq.value = process;
|
lhq.value = process;
|
||||||
lhq.pool = rt->mem_pool;
|
lhq.pool = rt->mem_pool;
|
||||||
|
|
||||||
|
nxt_thread_mutex_lock(&rt->processes_mutex);
|
||||||
|
|
||||||
switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
|
switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
|
||||||
|
|
||||||
case NXT_OK:
|
case NXT_OK:
|
||||||
@@ -1632,37 +1664,70 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
|
|||||||
|
|
||||||
} nxt_process_port_loop;
|
} nxt_process_port_loop;
|
||||||
|
|
||||||
|
process->registered = 1;
|
||||||
|
|
||||||
|
nxt_thread_log_debug("process %PI added", process->pid);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
nxt_thread_log_debug("process %PI failed to add", process->pid);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
static nxt_process_t *
|
||||||
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||||
{
|
{
|
||||||
nxt_port_t *port;
|
nxt_process_t *process;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
|
process = NULL;
|
||||||
lhq.key.length = sizeof(process->pid);
|
|
||||||
lhq.key.start = (u_char *) &process->pid;
|
nxt_runtime_process_lhq_pid(&lhq, &pid);
|
||||||
lhq.proto = &lvlhsh_processes_proto;
|
|
||||||
lhq.replace = 0;
|
|
||||||
lhq.value = process;
|
|
||||||
lhq.pool = rt->mem_pool;
|
lhq.pool = rt->mem_pool;
|
||||||
|
|
||||||
|
nxt_thread_mutex_lock(&rt->processes_mutex);
|
||||||
|
|
||||||
switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
|
switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
|
||||||
|
|
||||||
case NXT_OK:
|
case NXT_OK:
|
||||||
rt->nprocesses--;
|
rt->nprocesses--;
|
||||||
|
|
||||||
if (process->port_cleanups == 0) {
|
process = lhq.value;
|
||||||
nxt_runtime_process_destroy(rt, process);
|
|
||||||
|
process->registered = 0;
|
||||||
|
|
||||||
|
nxt_thread_log_debug("process %PI removed", pid);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
nxt_thread_log_debug("process %PI remove failed", pid);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||||
|
|
||||||
|
return process;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||||
|
{
|
||||||
|
nxt_port_t *port;
|
||||||
|
|
||||||
|
if (process->port_cleanups == 0) {
|
||||||
|
if (process->registered == 1) {
|
||||||
|
nxt_runtime_process_remove_pid(rt, process->pid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_runtime_process_destroy(rt, process);
|
||||||
|
|
||||||
|
} else {
|
||||||
nxt_process_port_each(process, port) {
|
nxt_process_port_each(process, port) {
|
||||||
|
|
||||||
nxt_runtime_port_remove(rt, port);
|
nxt_runtime_port_remove(rt, port);
|
||||||
@@ -1670,11 +1735,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
|||||||
nxt_port_release(port);
|
nxt_port_release(port);
|
||||||
|
|
||||||
} nxt_process_port_loop;
|
} nxt_process_port_loop;
|
||||||
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ struct nxt_runtime_s {
|
|||||||
|
|
||||||
nxt_process_t *mprocess;
|
nxt_process_t *mprocess;
|
||||||
size_t nprocesses;
|
size_t nprocesses;
|
||||||
|
nxt_thread_mutex_t processes_mutex;
|
||||||
nxt_lvlhsh_t processes; /* of nxt_process_t */
|
nxt_lvlhsh_t processes; /* of nxt_process_t */
|
||||||
|
|
||||||
nxt_port_t *port_by_type[NXT_PROCESS_MAX];
|
nxt_port_t *port_by_type[NXT_PROCESS_MAX];
|
||||||
@@ -101,8 +102,6 @@ nxt_runtime_is_master(nxt_runtime_t *rt)
|
|||||||
|
|
||||||
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
|
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
|
||||||
|
|
||||||
void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process);
|
|
||||||
|
|
||||||
nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
|
nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
|
||||||
|
|
||||||
void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);
|
void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);
|
||||||
|
|||||||
Reference in New Issue
Block a user