Introducing application prototype processes.

This commit is contained in:
Tiago Natel de Moura
2021-11-09 15:48:44 +03:00
parent 1de660b6df
commit e207415a78
15 changed files with 1109 additions and 193 deletions

View File

@@ -10,6 +10,7 @@
#include <nxt_main_process.h>
#include <nxt_conf.h>
#include <nxt_router.h>
#include <nxt_port_queue.h>
#if (NXT_TLS)
#include <nxt_cert.h>
#endif
@@ -52,6 +53,8 @@ static nxt_int_t nxt_main_listening_socket(nxt_sockaddr_t *sa,
static void nxt_main_port_modules_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static int nxt_cdecl nxt_app_lang_compare(const void *v1, const void *v2);
static void nxt_main_process_whoami_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_main_port_conf_store_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_main_file_store(nxt_task_t *task, const char *tmp_name,
@@ -326,7 +329,7 @@ static nxt_conf_app_map_t nxt_app_maps[] = {
static void
nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "main data: %*s",
nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
@@ -334,7 +337,33 @@ nxt_port_main_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static void
nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_main_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
void *mem;
nxt_port_t *port;
nxt_port_new_port_handler(task, msg);
port = msg->u.new_port;
if (port != NULL
&& port->type == NXT_PROCESS_APP
&& msg->fd[1] != -1)
{
mem = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd[1], 0);
if (nxt_fast_path(mem != MAP_FAILED)) {
port->queue = mem;
}
nxt_fd_close(msg->fd[1]);
msg->fd[1] = -1;
}
}
static void
nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
u_char *start, *p, ch;
size_t type_len;
@@ -374,16 +403,18 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
init = nxt_process_init(process);
*init = nxt_app_process;
*init = nxt_proto_process;
b = nxt_buf_chk_make_plain(process->mem_pool, msg->buf, msg->size);
if (b == NULL) {
goto failed;
}
nxt_debug(task, "main start process: %*s", b->mem.free - b->mem.pos,
nxt_debug(task, "main start prototype: %*s", b->mem.free - b->mem.pos,
b->mem.pos);
app_conf = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_common_app_conf_t));
@@ -399,7 +430,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
init->name = (const char *) start;
process->name = nxt_mp_alloc(process->mem_pool, app_conf->name.length
+ sizeof("\"\" application") + 1);
+ sizeof("\"\" prototype") + 1);
if (nxt_slow_path(process->name == NULL)) {
goto failed;
@@ -408,7 +439,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
p = (u_char *) process->name;
*p++ = '"';
p = nxt_cpymem(p, init->name, app_conf->name.length);
p = nxt_cpymem(p, "\" application", 13);
p = nxt_cpymem(p, "\" prototype", 11);
*p = '\0';
app_conf->shm_limit = 100 * 1024 * 1024;
@@ -504,21 +535,17 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rt = task->thread->runtime;
process = nxt_runtime_process_find(rt, msg->port_msg.pid);
if (nxt_slow_path(process == NULL)) {
return;
}
nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
port = nxt_runtime_port_find(rt, msg->port_msg.pid,
msg->port_msg.reply_port);
if (nxt_slow_path(port == NULL)) {
return;
}
process = port->process;
nxt_assert(process != NULL);
nxt_assert(process->state == NXT_PROCESS_STATE_CREATING);
#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
if (nxt_slow_path(nxt_clone_credential_map(task, process->pid,
@@ -542,10 +569,13 @@ nxt_main_process_created_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
static nxt_port_handlers_t nxt_main_process_port_handlers = {
.data = nxt_port_main_data_handler,
.data = nxt_main_data_handler,
.new_port = nxt_main_new_port_handler,
.process_created = nxt_main_process_created_handler,
.process_ready = nxt_port_process_ready_handler,
.start_process = nxt_port_main_start_process_handler,
.whoami = nxt_main_process_whoami_handler,
.remove_pid = nxt_port_remove_pid_handler,
.start_process = nxt_main_start_process_handler,
.socket = nxt_main_port_socket_handler,
.modules = nxt_main_port_modules_handler,
.conf_store = nxt_main_port_conf_store_handler,
@@ -559,6 +589,88 @@ static nxt_port_handlers_t nxt_main_process_port_handlers = {
};
static void
nxt_main_process_whoami_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *buf;
nxt_pid_t pid, ppid;
nxt_port_t *port;
nxt_runtime_t *rt;
nxt_process_t *pprocess;
nxt_assert(msg->port_msg.reply_port == 0);
if (nxt_slow_path(msg->buf == NULL
|| nxt_buf_used_size(msg->buf) != sizeof(nxt_pid_t)))
{
nxt_alert(task, "whoami: buffer is NULL or unexpected size");
goto fail;
}
nxt_memcpy(&ppid, msg->buf->mem.pos, sizeof(nxt_pid_t));
rt = task->thread->runtime;
pprocess = nxt_runtime_process_find(rt, ppid);
if (nxt_slow_path(pprocess == NULL)) {
nxt_alert(task, "whoami: parent process %PI not found", ppid);
goto fail;
}
pid = nxt_recv_msg_cmsg_pid(msg);
nxt_debug(task, "whoami: from %PI, parent %PI, fd %d", pid, ppid,
msg->fd[0]);
if (msg->fd[0] != -1) {
port = nxt_runtime_process_port_create(task, rt, pid, 0,
NXT_PROCESS_APP);
if (nxt_slow_path(port == NULL)) {
goto fail;
}
nxt_fd_nonblocking(task, msg->fd[0]);
port->pair[0] = -1;
port->pair[1] = msg->fd[0];
msg->fd[0] = -1;
port->max_size = 16 * 1024;
port->max_share = 64 * 1024;
port->socket.task = task;
nxt_port_write_enable(task, port);
} else {
port = nxt_runtime_port_find(rt, pid, 0);
if (nxt_slow_path(port == NULL)) {
goto fail;
}
}
if (ppid != nxt_pid) {
nxt_queue_insert_tail(&pprocess->children, &port->process->link);
}
buf = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
sizeof(nxt_pid_t), 0);
if (nxt_slow_path(buf == NULL)) {
goto fail;
}
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(nxt_pid_t));
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_READY_LAST, -1,
msg->port_msg.stream, 0, buf);
fail:
if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
}
}
static nxt_int_t
nxt_main_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
{
@@ -751,8 +863,10 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
nxt_int_t ret;
nxt_err_t err;
nxt_pid_t pid;
nxt_port_t *port;
nxt_queue_t children;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_process_t *process, *child;
nxt_process_init_t init;
nxt_debug(task, "sigchld handler signo:%d (%s)",
@@ -809,7 +923,31 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
process->stream = 0;
}
nxt_queue_init(&children);
if (!nxt_queue_is_empty(&process->children)) {
nxt_queue_add(&children, &process->children);
nxt_queue_init(&process->children);
nxt_queue_each(child, &children, nxt_process_t, link) {
port = nxt_process_port_first(child);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
} nxt_queue_loop;
}
if (nxt_exiting) {
nxt_process_close_ports(task, process);
nxt_queue_each(child, &children, nxt_process_t, link) {
nxt_queue_remove(&child->link);
child->link.next = NULL;
nxt_process_close_ports(task, child);
} nxt_queue_loop;
if (rt->nprocesses <= 1) {
nxt_runtime_quit(task, 0);
}
@@ -819,6 +957,15 @@ nxt_main_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
nxt_port_remove_notify_others(task, process);
nxt_queue_each(child, &children, nxt_process_t, link) {
nxt_port_remove_notify_others(task, child);
nxt_queue_remove(&child->link);
child->link.next = NULL;
nxt_process_close_ports(task, child);
} nxt_queue_loop;
init = *(nxt_process_init_t *) nxt_process_init(process);
nxt_process_close_ports(task, process);