Process port refactoring.
- Introduced nxt_runtime_process_port_create(). - Moved nxt_process_use() into nxt_process.c from nxt_runtime.c. - Renamed nxt_runtime_process_remove_pid() as nxt_runtime_process_remove(). - Some public functions transformed to static. This closes #327 issue on GitHub.
This commit is contained in:
@@ -397,31 +397,19 @@ nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
|
||||
nxt_port_t *port;
|
||||
nxt_process_t *process;
|
||||
|
||||
process = nxt_runtime_process_get(rt, nxt_pid);
|
||||
if (nxt_slow_path(process == NULL)) {
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
port = nxt_port_new(task, 0, nxt_pid, NXT_PROCESS_MAIN);
|
||||
port = nxt_runtime_process_port_create(task, rt, nxt_pid, 0,
|
||||
NXT_PROCESS_MAIN);
|
||||
if (nxt_slow_path(port == NULL)) {
|
||||
nxt_process_use(task, process, -1);
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_process_port_add(task, process, port);
|
||||
|
||||
nxt_process_use(task, process, -1);
|
||||
process = port->process;
|
||||
|
||||
ret = nxt_port_socket_init(task, port, 0);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
nxt_port_use(task, port, -1);
|
||||
return ret;
|
||||
}
|
||||
|
||||
nxt_runtime_port_add(task, port);
|
||||
|
||||
nxt_port_use(task, port, -1);
|
||||
|
||||
/*
|
||||
* A main process port. A write port is not closed
|
||||
* since it should be inherited by worker processes.
|
||||
|
||||
@@ -238,7 +238,6 @@ void
|
||||
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_port_t *port;
|
||||
nxt_process_t *process;
|
||||
nxt_runtime_t *rt;
|
||||
nxt_port_msg_new_port_t *new_port_msg;
|
||||
|
||||
@@ -261,22 +260,13 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
return;
|
||||
}
|
||||
|
||||
process = nxt_runtime_process_get(rt, new_port_msg->pid);
|
||||
if (nxt_slow_path(process == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
port = nxt_port_new(task, new_port_msg->id, new_port_msg->pid,
|
||||
new_port_msg->type);
|
||||
port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
|
||||
new_port_msg->id,
|
||||
new_port_msg->type);
|
||||
if (nxt_slow_path(port == NULL)) {
|
||||
nxt_process_use(task, process, -1);
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_process_port_add(task, process, port);
|
||||
|
||||
nxt_process_use(task, process, -1);
|
||||
|
||||
nxt_fd_nonblocking(task, msg->fd);
|
||||
|
||||
port->pair[0] = -1;
|
||||
@@ -286,10 +276,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
|
||||
port->socket.task = task;
|
||||
|
||||
nxt_runtime_port_add(task, port);
|
||||
|
||||
nxt_port_use(task, port, -1);
|
||||
|
||||
nxt_port_write_enable(task, port);
|
||||
|
||||
msg->u.new_port = port;
|
||||
|
||||
@@ -783,6 +783,17 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
|
||||
{
|
||||
process->use_count += i;
|
||||
|
||||
if (process->use_count == 0) {
|
||||
nxt_runtime_process_release(task->thread->runtime, process);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
|
||||
{
|
||||
|
||||
@@ -114,6 +114,8 @@ NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
|
||||
|
||||
nxt_process_type_t nxt_process_type(nxt_process_t *process);
|
||||
|
||||
void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
|
||||
|
||||
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
||||
|
||||
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
||||
|
||||
@@ -37,10 +37,10 @@ static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
|
||||
static void nxt_runtime_thread_pool_init(void);
|
||||
static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
static void nxt_runtime_process_destroy(nxt_runtime_t *rt,
|
||||
static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
|
||||
static void nxt_runtime_process_remove(nxt_runtime_t *rt,
|
||||
nxt_process_t *process);
|
||||
static nxt_process_t *nxt_runtime_process_remove_pid(nxt_runtime_t *rt,
|
||||
nxt_pid_t pid);
|
||||
static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
|
||||
|
||||
|
||||
nxt_int_t
|
||||
@@ -1298,11 +1298,15 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
void
|
||||
nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
{
|
||||
nxt_port_t *port;
|
||||
|
||||
if (process->registered == 1) {
|
||||
nxt_runtime_process_remove(rt, process);
|
||||
}
|
||||
|
||||
nxt_assert(process->use_count == 0);
|
||||
nxt_assert(process->registered == 0);
|
||||
|
||||
@@ -1385,7 +1389,7 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||
}
|
||||
|
||||
|
||||
nxt_process_t *
|
||||
static nxt_process_t *
|
||||
nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||
{
|
||||
nxt_process_t *process;
|
||||
@@ -1495,13 +1499,13 @@ nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
|
||||
}
|
||||
|
||||
|
||||
static nxt_process_t *
|
||||
nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||
static void
|
||||
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
{
|
||||
nxt_process_t *process;
|
||||
nxt_pid_t pid;
|
||||
nxt_lvlhsh_query_t lhq;
|
||||
|
||||
process = NULL;
|
||||
pid = process->pid;
|
||||
|
||||
nxt_runtime_process_lhq_pid(&lhq, &pid);
|
||||
|
||||
@@ -1527,27 +1531,6 @@ nxt_runtime_process_remove_pid(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&rt->processes_mutex);
|
||||
|
||||
return process;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i)
|
||||
{
|
||||
nxt_runtime_t *rt;
|
||||
|
||||
process->use_count += i;
|
||||
|
||||
if (process->use_count == 0) {
|
||||
rt = task->thread->runtime;
|
||||
|
||||
if (process->registered == 1) {
|
||||
nxt_runtime_process_remove_pid(rt, process->pid);
|
||||
}
|
||||
|
||||
nxt_runtime_process_destroy(rt, process);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1560,7 +1543,37 @@ nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_port_t *
|
||||
nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
|
||||
nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
|
||||
{
|
||||
nxt_port_t *port;
|
||||
nxt_process_t *process;
|
||||
|
||||
process = nxt_runtime_process_get(rt, pid);
|
||||
if (nxt_slow_path(process == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
port = nxt_port_new(task, id, pid, type);
|
||||
if (nxt_slow_path(port == NULL)) {
|
||||
nxt_process_use(task, process, -1);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nxt_process_port_add(task, process, port);
|
||||
|
||||
nxt_process_use(task, process, -1);
|
||||
|
||||
nxt_runtime_port_add(task, port);
|
||||
|
||||
nxt_port_use(task, port, -1);
|
||||
|
||||
return port;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
|
||||
{
|
||||
nxt_int_t res;
|
||||
|
||||
@@ -93,22 +93,20 @@ nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
|
||||
|
||||
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
|
||||
|
||||
nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
|
||||
|
||||
void nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process);
|
||||
|
||||
nxt_process_t *nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid);
|
||||
|
||||
void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
|
||||
|
||||
nxt_process_t *nxt_runtime_process_first(nxt_runtime_t *rt,
|
||||
nxt_lvlhsh_each_t *lhe);
|
||||
|
||||
void nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process);
|
||||
|
||||
#define nxt_runtime_process_next(rt, lhe) \
|
||||
nxt_lvlhsh_each(&rt->processes, lhe)
|
||||
|
||||
|
||||
void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
|
||||
nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
|
||||
nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type);
|
||||
|
||||
void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user