Changed nxt_process_* for reuse.

This enables the reuse of process creation functions.
This commit is contained in:
Tiago Natel de Moura
2021-11-09 15:48:44 +03:00
parent ff6a7053f5
commit 1de660b6df
7 changed files with 255 additions and 261 deletions

View File

@@ -34,11 +34,6 @@ typedef struct {
static nxt_int_t nxt_main_process_port_create(nxt_task_t *task,
nxt_runtime_t *rt);
static void nxt_main_process_title(nxt_task_t *task);
static nxt_int_t nxt_main_process_create(nxt_task_t *task,
const nxt_process_init_t init);
static nxt_int_t nxt_main_start_process(nxt_task_t *task,
nxt_process_t *process);
static nxt_process_t *nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_main_process_sigquit_handler(nxt_task_t *task, void *obj,
@@ -49,7 +44,7 @@ static void nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_main_process_signal_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid);
static void nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process);
static void nxt_main_port_socket_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
@@ -97,7 +92,7 @@ nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
* nxt_main_port_modules_handler() which starts the controller
* and router processes.
*/
return nxt_main_process_create(task, nxt_discovery_process);
return nxt_process_init_start(task, nxt_discovery_process);
}
@@ -368,11 +363,17 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
process = nxt_main_process_new(task, rt);
process = nxt_process_new(rt);
if (nxt_slow_path(process == NULL)) {
return;
}
process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
if (process->mem_pool == NULL) {
nxt_process_use(task, process, -1);
return;
}
init = nxt_process_init(process);
*init = nxt_app_process;
@@ -475,7 +476,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
process->stream = msg->port_msg.stream;
process->data.app = app_conf;
ret = nxt_main_start_process(task, process);
ret = nxt_process_start(task, process);
if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
return;
}
@@ -617,139 +618,6 @@ nxt_main_process_title(nxt_task_t *task)
}
static nxt_int_t
nxt_main_process_create(nxt_task_t *task, const nxt_process_init_t init)
{
nxt_int_t ret;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_process_init_t *pinit;
rt = task->thread->runtime;
process = nxt_main_process_new(task, rt);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
process->name = init.name;
process->user_cred = &rt->user_cred;
pinit = nxt_process_init(process);
*pinit = init;
ret = nxt_main_start_process(task, process);
if (nxt_slow_path(ret == NXT_ERROR)) {
nxt_process_use(task, process, -1);
}
return ret;
}
static nxt_process_t *
nxt_main_process_new(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_t *process;
process = nxt_runtime_process_new(rt);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
if (process->mem_pool == NULL) {
nxt_process_use(task, process, -1);
return NULL;
}
return process;
}
static nxt_int_t
nxt_main_start_process(nxt_task_t *task, nxt_process_t *process)
{
nxt_mp_t *tmp_mp;
nxt_int_t ret;
nxt_pid_t pid;
nxt_port_t *port;
nxt_process_init_t *init;
init = nxt_process_init(process);
port = nxt_port_new(task, 0, 0, init->type);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
nxt_process_port_add(task, process, port);
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
goto free_port;
}
tmp_mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(tmp_mp == NULL)) {
ret = NXT_ERROR;
goto close_port;
}
if (init->prefork) {
ret = init->prefork(task, process, tmp_mp);
if (nxt_slow_path(ret != NXT_OK)) {
goto free_mempool;
}
}
pid = nxt_process_create(task, process);
switch (pid) {
case -1:
ret = NXT_ERROR;
break;
case 0:
/* The child process: return to the event engine work queue loop. */
nxt_process_use(task, process, -1);
ret = NXT_AGAIN;
break;
default:
/* The main process created a new process. */
nxt_process_use(task, process, -1);
nxt_port_read_close(port);
nxt_port_write_enable(task, port);
ret = NXT_OK;
break;
}
free_mempool:
nxt_mp_destroy(tmp_mp);
close_port:
if (nxt_slow_path(ret == NXT_ERROR)) {
nxt_port_close(task, port);
}
free_port:
nxt_port_use(task, port, -1);
return ret;
}
static void
nxt_main_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
{
@@ -879,13 +747,19 @@ fail:
static void
nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
{
int status;
nxt_err_t err;
nxt_pid_t pid;
int status;
nxt_int_t ret;
nxt_err_t err;
nxt_pid_t pid;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_process_init_t init;
nxt_debug(task, "sigchld handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
rt = task->thread->runtime;
for ( ;; ) {
pid = waitpid(-1, &status, WNOHANG);
@@ -926,7 +800,36 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
pid, WEXITSTATUS(status));
}
nxt_main_cleanup_process(task, pid);
process = nxt_runtime_process_find(rt, pid);
if (process != NULL) {
nxt_main_process_cleanup(task, process);
if (process->state == NXT_PROCESS_STATE_READY) {
process->stream = 0;
}
if (nxt_exiting) {
if (rt->nprocesses <= 1) {
nxt_runtime_quit(task, 0);
}
return;
}
nxt_port_remove_notify_others(task, process);
init = *(nxt_process_init_t *) nxt_process_init(process);
nxt_process_close_ports(task, process);
if (init.restart) {
ret = nxt_process_init_start(task, init);
if (nxt_slow_path(ret == NXT_ERROR)) {
nxt_alert(task, "failed to restart %s", init.name);
}
}
}
}
}
@@ -940,81 +843,11 @@ nxt_main_process_signal_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_main_cleanup_process(nxt_task_t *task, nxt_pid_t pid)
nxt_main_process_cleanup(nxt_task_t *task, nxt_process_t *process)
{
int stream;
nxt_int_t ret;
nxt_buf_t *buf;
nxt_port_t *port;
const char *name;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_process_init_t init;
rt = task->thread->runtime;
process = nxt_runtime_process_find(rt, pid);
if (!process) {
return;
}
if (process->isolation.cleanup != NULL) {
process->isolation.cleanup(task, process);
}
name = process->name;
stream = process->stream;
init = *((nxt_process_init_t *) nxt_process_init(process));
if (process->state == NXT_PROCESS_STATE_READY) {
process->stream = 0;
}
nxt_process_close_ports(task, process);
if (nxt_exiting) {
if (rt->nprocesses <= 1) {
nxt_runtime_quit(task, 0);
}
return;
}
nxt_runtime_process_each(rt, process) {
if (process->pid == nxt_pid
|| process->pid == pid
|| nxt_queue_is_empty(&process->ports))
{
continue;
}
port = nxt_process_port_first(process);
if (nxt_proc_remove_notify_matrix[init.type][port->type] == 0) {
continue;
}
buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
sizeof(pid));
if (nxt_slow_path(buf == NULL)) {
continue;
}
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
stream, 0, buf);
} nxt_runtime_process_loop;
if (init.restart) {
ret = nxt_main_process_create(task, init);
if (nxt_slow_path(ret == NXT_ERROR)) {
nxt_alert(task, "failed to restart %s", name);
}
}
}
@@ -1407,9 +1240,9 @@ fail:
nxt_mp_destroy(mp);
ret = nxt_main_process_create(task, nxt_controller_process);
ret = nxt_process_init_start(task, nxt_controller_process);
if (ret == NXT_OK) {
ret = nxt_main_process_create(task, nxt_router_process);
ret = nxt_process_init_start(task, nxt_router_process);
}
if (nxt_slow_path(ret == NXT_ERROR)) {