Refactor of process management.

The process abstraction has changed to:

  setup(task, process)
  start(task, process_data)
  prefork(task, process, mp)

The prefork() occurs in the main process right before fork.

The file src/nxt_main_process.c is completely free of process
specific logic.

The creation of a process now supports a PROCESS_CREATED state.  The
The setup() function of each process can set its state to either
created or ready.  If created, a MSG_PROCESS_CREATED is sent to main
process, where external setup can be done (required for rootfs under
container).

The core processes (discovery, controller and router) doesn't need
external setup, then they all proceeds to their start() function
straight away.

In the case of applications, the load of the module happens at the
process setup() time and The module's init() function has changed
to be the start() of the process.

The module API has changed to:

  setup(task, process, conf)
  start(task, data)

As a direct benefit of the PROCESS_CREATED message, the clone(2) of
processes using pid namespaces now doesn't need to create a pipe
to make the child block until parent setup uid/gid mappings nor it
needs to receive the child pid.
This commit is contained in:
Tiago Natel de Moura
2020-03-09 16:28:25 +00:00
parent aacf11152c
commit e9e5ddd5a5
28 changed files with 1553 additions and 1341 deletions

View File

@@ -78,7 +78,7 @@ NXT_LIB_SRCS=" \
src/nxt_conf.c \ src/nxt_conf.c \
src/nxt_conf_validation.c \ src/nxt_conf_validation.c \
src/nxt_main_process.c \ src/nxt_main_process.c \
src/nxt_worker_process.c \ src/nxt_signal_handlers.c \
src/nxt_controller.c \ src/nxt_controller.c \
src/nxt_router.c \ src/nxt_router.c \
src/nxt_h1proto.c \ src/nxt_h1proto.c \

View File

@@ -25,6 +25,8 @@ typedef struct {
} nxt_module_t; } nxt_module_t;
static nxt_int_t nxt_discovery_start(nxt_task_t *task,
nxt_process_data_t *data);
static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path); static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp, static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
nxt_array_t *modules, const char *name); nxt_array_t *modules, const char *name);
@@ -34,7 +36,27 @@ static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data); void *data);
static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task, static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
const char *name); const char *name);
static nxt_int_t nxt_app_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_app_setup(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment); static nxt_int_t nxt_app_set_environment(nxt_conf_value_t *environment);
static nxt_int_t nxt_app_isolation(nxt_task_t *task,
nxt_conf_value_t *isolation, nxt_process_t *process);
#if (NXT_HAVE_CLONE)
static nxt_int_t nxt_app_clone_flags(nxt_task_t *task,
nxt_conf_value_t *namespaces, nxt_clone_t *clone);
#endif
#if (NXT_HAVE_CLONE_NEWUSER)
static nxt_int_t nxt_app_isolation_creds(nxt_task_t *task,
nxt_conf_value_t *isolation, nxt_process_t *process);
static nxt_int_t nxt_app_isolation_credential_map(nxt_task_t *task,
nxt_mp_t *mem_pool, nxt_conf_value_t *map_array,
nxt_clone_credential_map_t *map);
#endif
nxt_str_t nxt_server = nxt_string(NXT_SERVER);
static uint32_t compat[] = { static uint32_t compat[] = {
@@ -42,14 +64,53 @@ static uint32_t compat[] = {
}; };
nxt_str_t nxt_server = nxt_string(NXT_SERVER);
static nxt_app_module_t *nxt_app; static nxt_app_module_t *nxt_app;
nxt_int_t static const nxt_port_handlers_t nxt_discovery_process_port_handlers = {
nxt_discovery_start(nxt_task_t *task, void *data) .quit = nxt_signal_quit_handler,
.new_port = nxt_port_new_port_handler,
.change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler,
.data = nxt_port_data_handler,
.remove_pid = nxt_port_remove_pid_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};
static const nxt_port_handlers_t nxt_app_process_port_handlers = {
.quit = nxt_signal_quit_handler,
.rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler,
};
const nxt_process_init_t nxt_discovery_process = {
.name = "discovery",
.type = NXT_PROCESS_DISCOVERY,
.prefork = NULL,
.restart = 0,
.setup = nxt_process_core_setup,
.start = nxt_discovery_start,
.port_handlers = &nxt_discovery_process_port_handlers,
.signals = nxt_process_signals,
};
const nxt_process_init_t nxt_app_process = {
.type = NXT_PROCESS_APP,
.setup = nxt_app_setup,
.prefork = nxt_app_prefork,
.restart = 0,
.start = NULL, /* set to module->start */
.port_handlers = &nxt_app_process_port_handlers,
.signals = nxt_process_signals,
};
static nxt_int_t
nxt_discovery_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
uint32_t stream; uint32_t stream;
nxt_buf_t *b; nxt_buf_t *b;
@@ -57,7 +118,7 @@ nxt_discovery_start(nxt_task_t *task, void *data)
nxt_port_t *main_port, *discovery_port; nxt_port_t *main_port, *discovery_port;
nxt_runtime_t *rt; nxt_runtime_t *rt;
nxt_debug(task, "DISCOVERY"); nxt_log(task, NXT_LOG_INFO, "discovery started");
rt = task->thread->runtime; rt = task->thread->runtime;
@@ -301,18 +362,85 @@ nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
static void static void
nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data) nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
{ {
nxt_worker_process_quit_handler(task, msg); nxt_signal_quit_handler(task, msg);
} }
nxt_int_t static nxt_int_t
nxt_app_start(nxt_task_t *task, void *data) nxt_app_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
{
nxt_int_t cap_setid;
nxt_int_t ret;
nxt_runtime_t *rt;
nxt_common_app_conf_t *app_conf;
rt = task->thread->runtime;
app_conf = process->data.app;
cap_setid = rt->capabilities.setid;
if (app_conf->isolation != NULL) {
ret = nxt_app_isolation(task, app_conf->isolation, process);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
}
#if (NXT_HAVE_CLONE_NEWUSER)
if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
cap_setid = 1;
}
#endif
if (cap_setid) {
ret = nxt_process_creds_set(task, process, &app_conf->user,
&app_conf->group);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
} else {
if (!nxt_str_eq(&app_conf->user, (u_char *) rt->user_cred.user,
nxt_strlen(rt->user_cred.user)))
{
nxt_alert(task, "cannot set user \"%V\" for app \"%V\": "
"missing capabilities", &app_conf->user, &app_conf->name);
return NXT_ERROR;
}
if (app_conf->group.length > 0
&& !nxt_str_eq(&app_conf->group, (u_char *) rt->group,
nxt_strlen(rt->group)))
{
nxt_alert(task, "cannot set group \"%V\" for app \"%V\": "
"missing capabilities", &app_conf->group,
&app_conf->name);
return NXT_ERROR;
}
}
#if (NXT_HAVE_CLONE_NEWUSER)
ret = nxt_process_vldt_isolation_creds(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
#endif
return NXT_OK;
}
static nxt_int_t
nxt_app_setup(nxt_task_t *task, nxt_process_t *process)
{ {
nxt_int_t ret; nxt_int_t ret;
nxt_process_init_t *init;
nxt_app_lang_module_t *lang; nxt_app_lang_module_t *lang;
nxt_common_app_conf_t *app_conf; nxt_common_app_conf_t *app_conf;
app_conf = data; app_conf = process->data.app;
lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type); lang = nxt_app_lang_module(task->thread->runtime, &app_conf->type);
if (nxt_slow_path(lang == NULL)) { if (nxt_slow_path(lang == NULL)) {
@@ -332,8 +460,8 @@ nxt_app_start(nxt_task_t *task, void *data)
} }
} }
if (nxt_app->pre_init != NULL) { if (nxt_app->setup != NULL) {
ret = nxt_app->pre_init(task, data); ret = nxt_app->setup(task, process, app_conf);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
@@ -360,16 +488,13 @@ nxt_app_start(nxt_task_t *task, void *data)
return NXT_ERROR; return NXT_ERROR;
} }
ret = nxt_app->init(task, data); init = nxt_process_init(process);
if (nxt_slow_path(ret != NXT_OK)) { init->start = nxt_app->start;
nxt_debug(task, "application init failed");
} else { process->state = NXT_PROCESS_STATE_CREATED;
nxt_debug(task, "application init done");
}
return ret; return NXT_OK;
} }
@@ -429,6 +554,206 @@ nxt_app_set_environment(nxt_conf_value_t *environment)
} }
static nxt_int_t
nxt_app_isolation(nxt_task_t *task, nxt_conf_value_t *isolation,
nxt_process_t *process)
{
#if (NXT_HAVE_CLONE)
nxt_int_t ret;
nxt_conf_value_t *obj;
static nxt_str_t nsname = nxt_string("namespaces");
obj = nxt_conf_get_object_member(isolation, &nsname, NULL);
if (obj != NULL) {
ret = nxt_app_clone_flags(task, obj, &process->isolation.clone);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
}
#endif
#if (NXT_HAVE_CLONE_NEWUSER)
ret = nxt_app_isolation_creds(task, isolation, process);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
#endif
return NXT_OK;
}
#if (NXT_HAVE_CLONE_NEWUSER)
static nxt_int_t
nxt_app_isolation_creds(nxt_task_t *task, nxt_conf_value_t *isolation,
nxt_process_t *process)
{
nxt_int_t ret;
nxt_clone_t *clone;
nxt_conf_value_t *array;
static nxt_str_t uidname = nxt_string("uidmap");
static nxt_str_t gidname = nxt_string("gidmap");
clone = &process->isolation.clone;
array = nxt_conf_get_object_member(isolation, &uidname, NULL);
if (array != NULL) {
ret = nxt_app_isolation_credential_map(task, process->mem_pool, array,
&clone->uidmap);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
}
array = nxt_conf_get_object_member(isolation, &gidname, NULL);
if (array != NULL) {
ret = nxt_app_isolation_credential_map(task, process->mem_pool, array,
&clone->gidmap);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
}
return NXT_OK;
}
static nxt_int_t
nxt_app_isolation_credential_map(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *map_array, nxt_clone_credential_map_t *map)
{
nxt_int_t ret;
nxt_uint_t i;
nxt_conf_value_t *obj;
static nxt_conf_map_t nxt_clone_map_entry_conf[] = {
{
nxt_string("container"),
NXT_CONF_MAP_INT,
offsetof(nxt_clone_map_entry_t, container),
},
{
nxt_string("host"),
NXT_CONF_MAP_INT,
offsetof(nxt_clone_map_entry_t, host),
},
{
nxt_string("size"),
NXT_CONF_MAP_INT,
offsetof(nxt_clone_map_entry_t, size),
},
};
map->size = nxt_conf_array_elements_count(map_array);
if (map->size == 0) {
return NXT_OK;
}
map->map = nxt_mp_alloc(mp, map->size * sizeof(nxt_clone_map_entry_t));
if (nxt_slow_path(map->map == NULL)) {
return NXT_ERROR;
}
for (i = 0; i < map->size; i++) {
obj = nxt_conf_get_array_element(map_array, i);
ret = nxt_conf_map_object(mp, obj, nxt_clone_map_entry_conf,
nxt_nitems(nxt_clone_map_entry_conf),
map->map + i);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "clone map entry map error");
return NXT_ERROR;
}
}
return NXT_OK;
}
#endif
#if (NXT_HAVE_CLONE)
static nxt_int_t
nxt_app_clone_flags(nxt_task_t *task, nxt_conf_value_t *namespaces,
nxt_clone_t *clone)
{
uint32_t index;
nxt_str_t name;
nxt_int_t flag;
nxt_conf_value_t *value;
index = 0;
for ( ;; ) {
value = nxt_conf_next_object_member(namespaces, &name, &index);
if (value == NULL) {
break;
}
flag = 0;
#if (NXT_HAVE_CLONE_NEWUSER)
if (nxt_str_eq(&name, "credential", 10)) {
flag = CLONE_NEWUSER;
}
#endif
#if (NXT_HAVE_CLONE_NEWPID)
if (nxt_str_eq(&name, "pid", 3)) {
flag = CLONE_NEWPID;
}
#endif
#if (NXT_HAVE_CLONE_NEWNET)
if (nxt_str_eq(&name, "network", 7)) {
flag = CLONE_NEWNET;
}
#endif
#if (NXT_HAVE_CLONE_NEWUTS)
if (nxt_str_eq(&name, "uname", 5)) {
flag = CLONE_NEWUTS;
}
#endif
#if (NXT_HAVE_CLONE_NEWNS)
if (nxt_str_eq(&name, "mount", 5)) {
flag = CLONE_NEWNS;
}
#endif
#if (NXT_HAVE_CLONE_NEWCGROUP)
if (nxt_str_eq(&name, "cgroup", 6)) {
flag = CLONE_NEWCGROUP;
}
#endif
if (!flag) {
nxt_alert(task, "unknown namespace flag: \"%V\"", &name);
return NXT_ERROR;
}
if (nxt_conf_get_boolean(value)) {
clone->flags |= flag;
}
}
return NXT_OK;
}
#endif
nxt_app_lang_module_t * nxt_app_lang_module_t *
nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name) nxt_app_lang_module(nxt_runtime_t *rt, nxt_str_t *name)
{ {
@@ -539,7 +864,7 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
nxt_fd_blocking(task, main_port->pair[1]); nxt_fd_blocking(task, main_port->pair[1]);
init->ready_stream = my_port->process->init->stream; init->ready_stream = my_port->process->stream;
init->read_port.id.pid = my_port->pid; init->read_port.id.pid = my_port->pid;
init->read_port.id.id = my_port->id; init->read_port.id.id = my_port->id;

View File

@@ -27,6 +27,8 @@ typedef enum {
typedef struct nxt_app_module_s nxt_app_module_t; typedef struct nxt_app_module_s nxt_app_module_t;
typedef nxt_int_t (*nxt_application_setup_t)(nxt_task_t *task,
nxt_process_t *process, nxt_common_app_conf_t *conf);
typedef struct { typedef struct {
@@ -37,9 +39,6 @@ typedef struct {
} nxt_app_lang_module_t; } nxt_app_lang_module_t;
typedef struct nxt_common_app_conf_s nxt_common_app_conf_t;
typedef struct { typedef struct {
char *executable; char *executable;
nxt_conf_value_t *arguments; nxt_conf_value_t *arguments;
@@ -111,10 +110,8 @@ struct nxt_app_module_s {
nxt_str_t type; nxt_str_t type;
const char *version; const char *version;
nxt_int_t (*pre_init)(nxt_task_t *task, nxt_application_setup_t setup;
nxt_common_app_conf_t *conf); nxt_process_start_t start;
nxt_int_t (*init)(nxt_task_t *task,
nxt_common_app_conf_t *conf);
}; };

View File

@@ -797,12 +797,11 @@ nxt_cert_info_delete(nxt_str_t *name)
nxt_array_t * nxt_array_t *
nxt_cert_store_load(nxt_task_t *task) nxt_cert_store_load(nxt_task_t *task, nxt_mp_t *mp)
{ {
DIR *dir; DIR *dir;
size_t size, alloc; size_t size, alloc;
u_char *buf, *p; u_char *buf, *p;
nxt_mp_t *mp;
nxt_str_t name; nxt_str_t name;
nxt_int_t ret; nxt_int_t ret;
nxt_file_t file; nxt_file_t file;
@@ -818,14 +817,8 @@ nxt_cert_store_load(nxt_task_t *task)
return NULL; return NULL;
} }
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NULL;
}
certs = nxt_array_create(mp, 16, sizeof(nxt_cert_item_t)); certs = nxt_array_create(mp, 16, sizeof(nxt_cert_item_t));
if (nxt_slow_path(certs == NULL)) { if (nxt_slow_path(certs == NULL)) {
nxt_mp_destroy(mp);
return NULL; return NULL;
} }
@@ -933,7 +926,7 @@ nxt_cert_store_release(nxt_array_t *certs)
nxt_fd_close(items[i].fd); nxt_fd_close(items[i].fd);
} }
nxt_mp_destroy(certs->mem_pool); nxt_array_destroy(certs);
} }

View File

@@ -19,7 +19,7 @@ nxt_conf_value_t *nxt_cert_info_get(nxt_str_t *name);
nxt_conf_value_t *nxt_cert_info_get_all(nxt_mp_t *mp); nxt_conf_value_t *nxt_cert_info_get_all(nxt_mp_t *mp);
nxt_int_t nxt_cert_info_delete(nxt_str_t *name); nxt_int_t nxt_cert_info_delete(nxt_str_t *name);
nxt_array_t *nxt_cert_store_load(nxt_task_t *task); nxt_array_t *nxt_cert_store_load(nxt_task_t *task, nxt_mp_t *mem_pool);
void nxt_cert_store_release(nxt_array_t *certs); void nxt_cert_store_release(nxt_array_t *certs);
void nxt_cert_store_get(nxt_task_t *task, nxt_str_t *name, nxt_mp_t *mp, void nxt_cert_store_get(nxt_task_t *task, nxt_str_t *name, nxt_mp_t *mp,

View File

@@ -3,8 +3,8 @@
* Copyright (C) NGINX, Inc. * Copyright (C) NGINX, Inc.
*/ */
#ifndef _NXT_CLONE_INCLUDED_ #ifndef _NXT_CLONE_H_INCLUDED_
#define _NXT_CLONE_INCLUDED_ #define _NXT_CLONE_H_INCLUDED_
#if (NXT_HAVE_CLONE_NEWUSER) #if (NXT_HAVE_CLONE_NEWUSER)
@@ -36,10 +36,11 @@ typedef struct {
pid_t nxt_clone(nxt_int_t flags); pid_t nxt_clone(nxt_int_t flags);
#if (NXT_HAVE_CLONE_NEWUSER) #define nxt_is_clone_flag_set(flags, test) \
((flags & CLONE_##test) == CLONE_##test)
#define NXT_CLONE_USER(flags) \
((flags & CLONE_NEWUSER) == CLONE_NEWUSER) #if (NXT_HAVE_CLONE_NEWUSER)
NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid, NXT_EXPORT nxt_int_t nxt_clone_credential_map(nxt_task_t *task, pid_t pid,
nxt_credential_t *creds, nxt_clone_t *clone); nxt_credential_t *creds, nxt_clone_t *clone);
@@ -50,4 +51,5 @@ NXT_EXPORT nxt_int_t nxt_clone_vldt_credential_gidmap(nxt_task_t *task,
#endif #endif
#endif /* _NXT_CLONE_INCLUDED_ */
#endif /* _NXT_CLONE_H_INCLUDED_ */

View File

@@ -39,11 +39,17 @@ typedef struct {
} nxt_controller_response_t; } nxt_controller_response_t;
static nxt_int_t nxt_controller_prefork(nxt_task_t *task,
nxt_process_t *process, nxt_mp_t *mp);
static nxt_int_t nxt_controller_start(nxt_task_t *task,
nxt_process_data_t *data);
static void nxt_controller_process_new_port_handler(nxt_task_t *task, static void nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg); nxt_port_recv_msg_t *msg);
static void nxt_controller_send_current_conf(nxt_task_t *task); static void nxt_controller_send_current_conf(nxt_task_t *task);
static void nxt_controller_router_ready_handler(nxt_task_t *task, static void nxt_controller_router_ready_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg); nxt_port_recv_msg_t *msg);
static void nxt_controller_remove_pid_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_controller_conf_default(void); static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task, static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data); nxt_port_recv_msg_t *msg, void *data);
@@ -83,6 +89,8 @@ static void nxt_controller_process_cert(nxt_task_t *task,
static void nxt_controller_process_cert_save(nxt_task_t *task, static void nxt_controller_process_cert_save(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data); nxt_port_recv_msg_t *msg, void *data);
static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name); static nxt_bool_t nxt_controller_cert_in_use(nxt_str_t *name);
static void nxt_controller_cert_cleanup(nxt_task_t *task, void *obj,
void *data);
#endif #endif
static void nxt_controller_conf_handler(nxt_task_t *task, static void nxt_controller_conf_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data); nxt_port_recv_msg_t *msg, void *data);
@@ -114,21 +122,117 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state;
static const nxt_event_conn_state_t nxt_controller_conn_close_state; static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_port_handlers_t nxt_controller_process_port_handlers = { static const nxt_port_handlers_t nxt_controller_process_port_handlers = {
.quit = nxt_worker_process_quit_handler, .quit = nxt_signal_quit_handler,
.new_port = nxt_controller_process_new_port_handler, .new_port = nxt_controller_process_new_port_handler,
.change_file = nxt_port_change_log_file_handler, .change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler, .mmap = nxt_port_mmap_handler,
.process_ready = nxt_controller_router_ready_handler, .process_ready = nxt_controller_router_ready_handler,
.data = nxt_port_data_handler, .data = nxt_port_data_handler,
.remove_pid = nxt_port_remove_pid_handler, .remove_pid = nxt_controller_remove_pid_handler,
.rpc_ready = nxt_port_rpc_handler, .rpc_ready = nxt_port_rpc_handler,
.rpc_error = nxt_port_rpc_handler, .rpc_error = nxt_port_rpc_handler,
}; };
nxt_int_t const nxt_process_init_t nxt_controller_process = {
nxt_controller_start(nxt_task_t *task, void *data) .name = "controller",
.type = NXT_PROCESS_CONTROLLER,
.prefork = nxt_controller_prefork,
.restart = 1,
.setup = nxt_process_core_setup,
.start = nxt_controller_start,
.port_handlers = &nxt_controller_process_port_handlers,
.signals = nxt_process_signals,
};
static nxt_int_t
nxt_controller_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
{
ssize_t n;
nxt_int_t ret;
nxt_str_t *conf;
nxt_file_t file;
nxt_runtime_t *rt;
nxt_file_info_t fi;
nxt_controller_init_t ctrl_init;
nxt_log(task, NXT_LOG_INFO, "controller started");
rt = task->thread->runtime;
nxt_memzero(&ctrl_init, sizeof(nxt_controller_init_t));
conf = &ctrl_init.conf;
nxt_memzero(&file, sizeof(nxt_file_t));
file.name = (nxt_file_name_t *) rt->conf;
ret = nxt_file_open(task, &file, NXT_FILE_RDONLY, NXT_FILE_OPEN, 0);
if (ret == NXT_OK) {
ret = nxt_file_info(&file, &fi);
if (nxt_fast_path(ret == NXT_OK && nxt_is_file(&fi))) {
conf->length = nxt_file_size(&fi);
conf->start = nxt_mp_alloc(mp, conf->length);
if (nxt_slow_path(conf->start == NULL)) {
nxt_file_close(task, &file);
return NXT_ERROR;
}
n = nxt_file_read(&file, conf->start, conf->length, 0);
if (nxt_slow_path(n != (ssize_t) conf->length)) {
conf->start = NULL;
conf->length = 0;
nxt_alert(task, "failed to restore previous configuration: "
"cannot read the file");
}
}
nxt_file_close(task, &file);
}
#if (NXT_TLS)
ctrl_init.certs = nxt_cert_store_load(task, mp);
nxt_mp_cleanup(mp, nxt_controller_cert_cleanup, task, ctrl_init.certs, rt);
#endif
process->data.controller = ctrl_init;
return NXT_OK;
}
#if (NXT_TLS)
static void
nxt_controller_cert_cleanup(nxt_task_t *task, void *obj, void *data)
{
pid_t main_pid;
nxt_array_t *certs;
nxt_runtime_t *rt;
certs = obj;
rt = data;
main_pid = rt->port_by_type[NXT_PROCESS_MAIN]->pid;
if (nxt_pid == main_pid && certs != NULL) {
nxt_cert_store_release(certs);
}
}
#endif
static nxt_int_t
nxt_controller_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
nxt_mp_t *mp; nxt_mp_t *mp;
nxt_int_t ret; nxt_int_t ret;
@@ -147,15 +251,13 @@ nxt_controller_start(nxt_task_t *task, void *data)
nxt_queue_init(&nxt_controller_waiting_requests); nxt_queue_init(&nxt_controller_waiting_requests);
init = data; init = &data->controller;
#if (NXT_TLS) #if (NXT_TLS)
if (init->certs != NULL) { if (init->certs != NULL) {
nxt_cert_info_init(task, init->certs); nxt_cert_info_init(task, init->certs);
nxt_cert_store_release(init->certs); nxt_cert_store_release(init->certs);
} }
#endif #endif
json = &init->conf; json = &init->conf;
@@ -170,8 +272,6 @@ nxt_controller_start(nxt_task_t *task, void *data)
} }
conf = nxt_conf_json_parse_str(mp, json); conf = nxt_conf_json_parse_str(mp, json);
nxt_free(json->start);
if (nxt_slow_path(conf == NULL)) { if (nxt_slow_path(conf == NULL)) {
nxt_alert(task, "failed to restore previous configuration: " nxt_alert(task, "failed to restore previous configuration: "
"file is corrupted or not enough memory"); "file is corrupted or not enough memory");
@@ -295,6 +395,28 @@ nxt_controller_router_ready_handler(nxt_task_t *task,
} }
static void
nxt_controller_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_pid_t pid;
nxt_process_t *process;
nxt_runtime_t *rt;
rt = task->thread->runtime;
nxt_assert(nxt_buf_used_size(msg->buf) == sizeof(pid));
nxt_memcpy(&pid, msg->buf->mem.pos, sizeof(pid));
process = nxt_runtime_process_find(rt, pid);
if (process != NULL && nxt_process_type(process) == NXT_PROCESS_ROUTER) {
nxt_controller_router_ready = 0;
}
nxt_port_remove_pid_handler(task, msg);
}
static nxt_int_t static nxt_int_t
nxt_controller_conf_default(void) nxt_controller_conf_default(void)
{ {

View File

@@ -9,8 +9,7 @@
#include <nxt_unit.h> #include <nxt_unit.h>
static nxt_int_t nxt_external_init(nxt_task_t *task, static nxt_int_t nxt_external_start(nxt_task_t *task, nxt_process_data_t *data);
nxt_common_app_conf_t *conf);
nxt_app_module_t nxt_external_module = { nxt_app_module_t nxt_external_module = {
@@ -19,7 +18,7 @@ nxt_app_module_t nxt_external_module = {
nxt_string("external"), nxt_string("external"),
"*", "*",
NULL, NULL,
nxt_external_init, nxt_external_start,
}; };
@@ -58,7 +57,7 @@ nxt_external_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd)
static nxt_int_t static nxt_int_t
nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
char **argv; char **argv;
u_char buf[256]; u_char buf[256];
@@ -71,9 +70,11 @@ nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_port_t *my_port, *main_port; nxt_port_t *my_port, *main_port;
nxt_runtime_t *rt; nxt_runtime_t *rt;
nxt_conf_value_t *value; nxt_conf_value_t *value;
nxt_common_app_conf_t *conf;
nxt_external_app_conf_t *c; nxt_external_app_conf_t *c;
rt = task->thread->runtime; rt = task->thread->runtime;
conf = data->app;
main_port = rt->port_by_type[NXT_PROCESS_MAIN]; main_port = rt->port_by_type[NXT_PROCESS_MAIN];
my_port = nxt_runtime_port_find(rt, nxt_pid, 0); my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
@@ -99,7 +100,7 @@ nxt_external_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
"%PI,%ud,%d;" "%PI,%ud,%d;"
"%PI,%ud,%d;" "%PI,%ud,%d;"
"%d,%z,%Z", "%d,%z,%Z",
NXT_VERSION, my_port->process->init->stream, NXT_VERSION, my_port->process->stream,
main_port->pid, main_port->id, main_port->pair[1], main_port->pid, main_port->id, main_port->pair[1],
my_port->pid, my_port->id, my_port->pair[0], my_port->pid, my_port->id, my_port->pair[0],
2, conf->shm_limit); 2, conf->shm_limit);

View File

@@ -27,9 +27,10 @@
#include "nxt_jars.h" #include "nxt_jars.h"
static nxt_int_t nxt_java_pre_init(nxt_task_t *task, static nxt_int_t nxt_java_setup(nxt_task_t *task, nxt_process_t *process,
nxt_common_app_conf_t *conf); nxt_common_app_conf_t *conf);
static nxt_int_t nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static nxt_int_t nxt_java_start(nxt_task_t *task,
nxt_process_data_t *data);
static void nxt_java_request_handler(nxt_unit_request_info_t *req); static void nxt_java_request_handler(nxt_unit_request_info_t *req);
static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws); static void nxt_java_websocket_handler(nxt_unit_websocket_frame_t *ws);
static void nxt_java_close_handler(nxt_unit_request_info_t *req); static void nxt_java_close_handler(nxt_unit_request_info_t *req);
@@ -49,8 +50,8 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
compat, compat,
nxt_string("java"), nxt_string("java"),
NXT_STRING(NXT_JAVA_VERSION), NXT_STRING(NXT_JAVA_VERSION),
nxt_java_pre_init, nxt_java_setup,
nxt_java_init, nxt_java_start,
}; };
typedef struct { typedef struct {
@@ -60,7 +61,8 @@ typedef struct {
static nxt_int_t static nxt_int_t
nxt_java_pre_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_java_setup(nxt_task_t *task, nxt_process_t *process,
nxt_common_app_conf_t *conf)
{ {
const char *unit_jars; const char *unit_jars;
@@ -115,7 +117,7 @@ nxt_java_module_jars(const char *jars[], int jar_count)
static nxt_int_t static nxt_int_t
nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
jint rc; jint rc;
char *opt, *real_path; char *opt, *real_path;
@@ -125,14 +127,16 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
jobject cl, classpath; jobject cl, classpath;
nxt_str_t str; nxt_str_t str;
nxt_int_t opt_len, real_path_len; nxt_int_t opt_len, real_path_len;
nxt_uint_t i, unit_jars_count, classpath_count, system_jars_count; nxt_uint_t i, unit_jars_count, classpath_count;
nxt_uint_t system_jars_count;
JavaVMOption *jvm_opt; JavaVMOption *jvm_opt;
JavaVMInitArgs jvm_args; JavaVMInitArgs jvm_args;
nxt_unit_ctx_t *ctx; nxt_unit_ctx_t *ctx;
nxt_unit_init_t java_init; nxt_unit_init_t java_init;
nxt_java_data_t data; nxt_java_data_t java_data;
nxt_conf_value_t *value; nxt_conf_value_t *value;
nxt_java_app_conf_t *c; nxt_java_app_conf_t *c;
nxt_common_app_conf_t *app_conf;
//setenv("ASAN_OPTIONS", "handle_segv=0", 1); //setenv("ASAN_OPTIONS", "handle_segv=0", 1);
@@ -140,7 +144,8 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
jvm_args.nOptions = 0; jvm_args.nOptions = 0;
jvm_args.ignoreUnrecognized = 0; jvm_args.ignoreUnrecognized = 0;
c = &conf->u.java; app_conf = data->app;
c = &app_conf->u.java;
if (c->options != NULL) { if (c->options != NULL) {
jvm_args.nOptions += nxt_conf_array_elements_count(c->options); jvm_args.nOptions += nxt_conf_array_elements_count(c->options);
@@ -338,8 +343,8 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
goto env_failed; goto env_failed;
} }
data.env = env; java_data.env = env;
data.ctx = nxt_java_startContext(env, c->webapp, classpath); java_data.ctx = nxt_java_startContext(env, c->webapp, classpath);
if ((*env)->ExceptionCheck(env)) { if ((*env)->ExceptionCheck(env)) {
nxt_alert(task, "Unhandled exception in application start"); nxt_alert(task, "Unhandled exception in application start");
@@ -353,8 +358,8 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
java_init.callbacks.websocket_handler = nxt_java_websocket_handler; java_init.callbacks.websocket_handler = nxt_java_websocket_handler;
java_init.callbacks.close_handler = nxt_java_close_handler; java_init.callbacks.close_handler = nxt_java_close_handler;
java_init.request_data_size = sizeof(nxt_java_request_data_t); java_init.request_data_size = sizeof(nxt_java_request_data_t);
java_init.data = &data; java_init.data = &java_data;
java_init.shm_limit = conf->shm_limit; java_init.shm_limit = app_conf->shm_limit;
ctx = nxt_unit_init(&java_init); ctx = nxt_unit_init(&java_init);
if (nxt_slow_path(ctx == NULL)) { if (nxt_slow_path(ctx == NULL)) {
@@ -367,7 +372,7 @@ nxt_java_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
/* TODO report error */ /* TODO report error */
} }
nxt_java_stopContext(env, data.ctx); nxt_java_stopContext(env, java_data.ctx);
if ((*env)->ExceptionCheck(env)) { if ((*env)->ExceptionCheck(env)) {
(*env)->ExceptionDescribe(env); (*env)->ExceptionDescribe(env);

File diff suppressed because it is too large Load Diff

View File

@@ -19,26 +19,17 @@ typedef enum {
} nxt_socket_error_t; } nxt_socket_error_t;
typedef struct {
nxt_str_t conf;
#if (NXT_TLS)
nxt_array_t *certs;
#endif
} nxt_controller_init_t;
nxt_int_t nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task, nxt_int_t nxt_main_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_runtime_t *runtime); nxt_runtime_t *runtime);
void nxt_main_stop_all_processes(nxt_task_t *task, nxt_runtime_t *runtime);
nxt_int_t nxt_controller_start(nxt_task_t *task, void *data);
nxt_int_t nxt_router_start(nxt_task_t *task, void *data);
nxt_int_t nxt_discovery_start(nxt_task_t *task, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, void *data);
NXT_EXPORT extern const nxt_process_init_t nxt_discovery_process;
NXT_EXPORT extern const nxt_process_init_t nxt_controller_process;
NXT_EXPORT extern const nxt_process_init_t nxt_router_process;
NXT_EXPORT extern const nxt_process_init_t nxt_app_process;
extern const nxt_sig_event_t nxt_main_process_signals[]; extern const nxt_sig_event_t nxt_main_process_signals[];
extern const nxt_sig_event_t nxt_worker_process_signals[]; extern const nxt_sig_event_t nxt_process_signals[];
#endif /* _NXT_MAIN_PROCESS_H_INCLUDED_ */ #endif /* _NXT_MAIN_PROCESS_H_INCLUDED_ */

View File

@@ -72,7 +72,7 @@ typedef void (*zif_handler)(INTERNAL_FUNCTION_PARAMETERS);
#endif #endif
static nxt_int_t nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static nxt_int_t nxt_php_start(nxt_task_t *task, nxt_process_data_t *data);
static nxt_int_t nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target, static nxt_int_t nxt_php_set_target(nxt_task_t *task, nxt_php_target_t *target,
nxt_conf_value_t *conf); nxt_conf_value_t *conf);
static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options, static void nxt_php_set_options(nxt_task_t *task, nxt_conf_value_t *options,
@@ -242,7 +242,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
nxt_string("php"), nxt_string("php"),
PHP_VERSION, PHP_VERSION,
NULL, NULL,
nxt_php_init, nxt_php_start,
}; };
@@ -256,7 +256,7 @@ static void ***tsrm_ls;
static nxt_int_t static nxt_int_t
nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
u_char *p; u_char *p;
uint32_t next; uint32_t next;
@@ -269,6 +269,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_unit_init_t php_init; nxt_unit_init_t php_init;
nxt_conf_value_t *value; nxt_conf_value_t *value;
nxt_php_app_conf_t *c; nxt_php_app_conf_t *c;
nxt_common_app_conf_t *conf;
static nxt_str_t file_str = nxt_string("file"); static nxt_str_t file_str = nxt_string("file");
static nxt_str_t user_str = nxt_string("user"); static nxt_str_t user_str = nxt_string("user");
@@ -276,6 +277,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_php_task = task; nxt_php_task = task;
conf = data->app;
c = &conf->u.php; c = &conf->u.php;
n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1; n = (c->targets != NULL) ? nxt_conf_object_members_count(c->targets) : 1;
@@ -385,7 +387,7 @@ nxt_php_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_fd_blocking(task, main_port->pair[1]); nxt_fd_blocking(task, main_port->pair[1]);
php_init.ready_stream = my_port->process->init->stream; php_init.ready_stream = my_port->process->stream;
php_init.read_port.id.pid = my_port->pid; php_init.read_port.id.pid = my_port->pid;
php_init.read_port.id.id = my_port->id; php_init.read_port.id.id = my_port->id;

View File

@@ -296,7 +296,9 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return; return;
} }
process->ready = 1; nxt_assert(process->state != NXT_PROCESS_STATE_READY);
process->state = NXT_PROCESS_STATE_READY;
nxt_assert(!nxt_queue_is_empty(&process->ports)); nxt_assert(!nxt_queue_is_empty(&process->ports));

View File

@@ -14,7 +14,7 @@ struct nxt_port_handlers_s {
nxt_port_handler_t rpc_error; nxt_port_handler_t rpc_error;
/* Main process RPC requests. */ /* Main process RPC requests. */
nxt_port_handler_t start_worker; nxt_port_handler_t start_process;
nxt_port_handler_t socket; nxt_port_handler_t socket;
nxt_port_handler_t modules; nxt_port_handler_t modules;
nxt_port_handler_t conf_store; nxt_port_handler_t conf_store;
@@ -27,7 +27,8 @@ struct nxt_port_handlers_s {
nxt_port_handler_t new_port; nxt_port_handler_t new_port;
nxt_port_handler_t mmap; nxt_port_handler_t mmap;
/* New process ready. */ /* New process */
nxt_port_handler_t process_created;
nxt_port_handler_t process_ready; nxt_port_handler_t process_ready;
/* Process exit/crash notification. */ /* Process exit/crash notification. */
@@ -53,6 +54,8 @@ struct nxt_port_handlers_s {
#define nxt_port_handler_idx(name) \ #define nxt_port_handler_idx(name) \
( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) ) ( offsetof(nxt_port_handlers_t, name) / sizeof(nxt_port_handler_t) )
#define nxt_msg_last(handler) \
(handler | NXT_PORT_MSG_LAST)
typedef enum { typedef enum {
NXT_PORT_MSG_LAST = 0x100, NXT_PORT_MSG_LAST = 0x100,
@@ -64,7 +67,7 @@ typedef enum {
_NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready), _NXT_PORT_MSG_RPC_READY = nxt_port_handler_idx(rpc_ready),
_NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error), _NXT_PORT_MSG_RPC_ERROR = nxt_port_handler_idx(rpc_error),
_NXT_PORT_MSG_START_WORKER = nxt_port_handler_idx(start_worker), _NXT_PORT_MSG_START_PROCESS = nxt_port_handler_idx(start_process),
_NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket), _NXT_PORT_MSG_SOCKET = nxt_port_handler_idx(socket),
_NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules), _NXT_PORT_MSG_MODULES = nxt_port_handler_idx(modules),
_NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store), _NXT_PORT_MSG_CONF_STORE = nxt_port_handler_idx(conf_store),
@@ -76,6 +79,7 @@ typedef enum {
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port), _NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap), _NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
_NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
_NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready), _NXT_PORT_MSG_PROCESS_READY = nxt_port_handler_idx(process_ready),
_NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid), _NXT_PORT_MSG_REMOVE_PID = nxt_port_handler_idx(remove_pid),
_NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit), _NXT_PORT_MSG_QUIT = nxt_port_handler_idx(quit),
@@ -92,37 +96,34 @@ typedef enum {
/ sizeof(nxt_port_handler_t), / sizeof(nxt_port_handler_t),
NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY, NXT_PORT_MSG_RPC_READY = _NXT_PORT_MSG_RPC_READY,
NXT_PORT_MSG_RPC_READY_LAST = _NXT_PORT_MSG_RPC_READY | NXT_PORT_MSG_LAST, NXT_PORT_MSG_RPC_READY_LAST = nxt_msg_last(_NXT_PORT_MSG_RPC_READY),
NXT_PORT_MSG_RPC_ERROR = _NXT_PORT_MSG_RPC_ERROR | NXT_PORT_MSG_LAST, NXT_PORT_MSG_RPC_ERROR = nxt_msg_last(_NXT_PORT_MSG_RPC_ERROR),
NXT_PORT_MSG_START_PROCESS = nxt_msg_last(_NXT_PORT_MSG_START_PROCESS),
NXT_PORT_MSG_START_WORKER = _NXT_PORT_MSG_START_WORKER NXT_PORT_MSG_SOCKET = nxt_msg_last(_NXT_PORT_MSG_SOCKET),
| NXT_PORT_MSG_LAST, NXT_PORT_MSG_MODULES = nxt_msg_last(_NXT_PORT_MSG_MODULES),
NXT_PORT_MSG_SOCKET = _NXT_PORT_MSG_SOCKET | NXT_PORT_MSG_LAST, NXT_PORT_MSG_CONF_STORE = nxt_msg_last(_NXT_PORT_MSG_CONF_STORE),
NXT_PORT_MSG_MODULES = _NXT_PORT_MSG_MODULES | NXT_PORT_MSG_LAST, NXT_PORT_MSG_CERT_GET = nxt_msg_last(_NXT_PORT_MSG_CERT_GET),
NXT_PORT_MSG_CONF_STORE = _NXT_PORT_MSG_CONF_STORE | NXT_PORT_MSG_LAST, NXT_PORT_MSG_CERT_DELETE = nxt_msg_last(_NXT_PORT_MSG_CERT_DELETE),
NXT_PORT_MSG_CERT_GET = _NXT_PORT_MSG_CERT_GET | NXT_PORT_MSG_LAST, NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
NXT_PORT_MSG_CERT_DELETE = _NXT_PORT_MSG_CERT_DELETE | NXT_PORT_MSG_LAST, NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
NXT_PORT_MSG_ACCESS_LOG = _NXT_PORT_MSG_ACCESS_LOG | NXT_PORT_MSG_LAST, NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
NXT_PORT_MSG_CHANGE_FILE = _NXT_PORT_MSG_CHANGE_FILE | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_NEW_PORT = _NXT_PORT_MSG_NEW_PORT | NXT_PORT_MSG_LAST,
NXT_PORT_MSG_MMAP = _NXT_PORT_MSG_MMAP | NXT_PORT_MSG_LAST
| NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC, | NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
NXT_PORT_MSG_PROCESS_READY = _NXT_PORT_MSG_PROCESS_READY NXT_PORT_MSG_PROCESS_CREATED = nxt_msg_last(_NXT_PORT_MSG_PROCESS_CREATED),
| NXT_PORT_MSG_LAST, NXT_PORT_MSG_PROCESS_READY = nxt_msg_last(_NXT_PORT_MSG_PROCESS_READY),
NXT_PORT_MSG_QUIT = _NXT_PORT_MSG_QUIT | NXT_PORT_MSG_LAST, NXT_PORT_MSG_QUIT = nxt_msg_last(_NXT_PORT_MSG_QUIT),
NXT_PORT_MSG_REMOVE_PID = _NXT_PORT_MSG_REMOVE_PID | NXT_PORT_MSG_LAST, NXT_PORT_MSG_REMOVE_PID = nxt_msg_last(_NXT_PORT_MSG_REMOVE_PID),
NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS, NXT_PORT_MSG_REQ_HEADERS = _NXT_PORT_MSG_REQ_HEADERS,
NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET, NXT_PORT_MSG_WEBSOCKET = _NXT_PORT_MSG_WEBSOCKET,
NXT_PORT_MSG_WEBSOCKET_LAST = _NXT_PORT_MSG_WEBSOCKET | NXT_PORT_MSG_LAST, NXT_PORT_MSG_WEBSOCKET_LAST = nxt_msg_last(_NXT_PORT_MSG_WEBSOCKET),
NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA, NXT_PORT_MSG_DATA = _NXT_PORT_MSG_DATA,
NXT_PORT_MSG_DATA_LAST = _NXT_PORT_MSG_DATA | NXT_PORT_MSG_LAST, NXT_PORT_MSG_DATA_LAST = nxt_msg_last(_NXT_PORT_MSG_DATA),
NXT_PORT_MSG_OOSM = _NXT_PORT_MSG_OOSM | NXT_PORT_MSG_LAST, NXT_PORT_MSG_OOSM = nxt_msg_last(_NXT_PORT_MSG_OOSM),
NXT_PORT_MSG_SHM_ACK = _NXT_PORT_MSG_SHM_ACK | NXT_PORT_MSG_LAST, NXT_PORT_MSG_SHM_ACK = nxt_msg_last(_NXT_PORT_MSG_SHM_ACK),
} nxt_port_msg_type_t; } nxt_port_msg_type_t;

View File

@@ -174,7 +174,7 @@ complete_buf:
if (process != NULL && !nxt_queue_is_empty(&process->ports)) { if (process != NULL && !nxt_queue_is_empty(&process->ports)) {
port = nxt_process_port_first(process); port = nxt_process_port_first(process);
if (port->type == NXT_PROCESS_WORKER) { if (port->type == NXT_PROCESS_APP) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK, (void) nxt_port_socket_write(task, port, NXT_PORT_MSG_SHM_ACK,
-1, 0, 0, NULL); -1, 0, 0, NULL);
} }

View File

@@ -13,9 +13,18 @@
#include <signal.h> #include <signal.h>
static void nxt_process_start(nxt_task_t *task, nxt_process_t *process); static nxt_int_t nxt_process_setup(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_process_worker_setup(nxt_task_t *task, static nxt_int_t nxt_process_child_fixup(nxt_task_t *task,
nxt_process_t *process, int parentfd); nxt_process_t *process);
static nxt_int_t nxt_process_send_created(nxt_task_t *task,
nxt_process_t *process);
static nxt_int_t nxt_process_send_ready(nxt_task_t *task,
nxt_process_t *process);
static void nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data);
static void nxt_process_created_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
/* A cached process pid. */ /* A cached process pid. */
nxt_pid_t nxt_pid; nxt_pid_t nxt_pid;
@@ -47,62 +56,58 @@ nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
static nxt_int_t static nxt_int_t
nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd) nxt_process_child_fixup(nxt_task_t *task, nxt_process_t *process)
{ {
pid_t rpid, pid;
ssize_t n;
nxt_int_t parent_status;
nxt_process_t *p; nxt_process_t *p;
nxt_runtime_t *rt; nxt_runtime_t *rt;
nxt_process_init_t *init; nxt_process_init_t *init;
nxt_process_type_t ptype; nxt_process_type_t ptype;
pid = getpid(); init = nxt_process_init(process);
rpid = 0;
rt = task->thread->runtime;
init = process->init;
/* Setup the worker process. */ nxt_pid = nxt_getpid();
n = read(parentfd, &rpid, sizeof(rpid)); process->pid = nxt_pid;
if (nxt_slow_path(n == -1 || n != sizeof(rpid))) {
nxt_alert(task, "failed to read real pid");
return NXT_ERROR;
}
if (nxt_slow_path(rpid == 0)) {
nxt_alert(task, "failed to get real pid from parent");
return NXT_ERROR;
}
nxt_pid = rpid;
/* Clean inherited cached thread tid. */ /* Clean inherited cached thread tid. */
task->thread->tid = 0; task->thread->tid = 0;
process->pid = nxt_pid; #if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWPID)
if (nxt_is_clone_flag_set(process->isolation.clone.flags, NEWPID)) {
ssize_t pidsz;
char procpid[10];
if (nxt_pid != pid) { nxt_debug(task, "%s isolated pid is %d", process->name, nxt_pid);
nxt_debug(task, "app \"%s\" real pid %d", init->name, nxt_pid);
nxt_debug(task, "app \"%s\" isolated pid: %d", init->name, pid);
}
n = read(parentfd, &parent_status, sizeof(parent_status)); pidsz = readlink("/proc/self", procpid, sizeof(procpid));
if (nxt_slow_path(n == -1 || n != sizeof(parent_status))) {
nxt_alert(task, "failed to read parent status"); if (nxt_slow_path(pidsz < 0 || pidsz >= (ssize_t) sizeof(procpid))) {
nxt_alert(task, "failed to read real pid from /proc/self");
return NXT_ERROR; return NXT_ERROR;
} }
if (nxt_slow_path(parent_status != NXT_OK)) { procpid[pidsz] = '\0';
return parent_status;
nxt_pid = (nxt_pid_t) strtol(procpid, NULL, 10);
nxt_assert(nxt_pid > 0 && nxt_errno != ERANGE);
process->pid = nxt_pid;
task->thread->tid = nxt_pid;
nxt_debug(task, "%s real pid is %d", process->name, nxt_pid);
} }
#endif
ptype = init->type; ptype = init->type;
nxt_port_reset_next_id(); nxt_port_reset_next_id();
nxt_event_engine_thread_adopt(task->thread->engine); nxt_event_engine_thread_adopt(task->thread->engine);
rt = task->thread->runtime;
/* Remove not ready processes. */ /* Remove not ready processes. */
nxt_runtime_process_each(rt, p) { nxt_runtime_process_each(rt, p) {
@@ -114,7 +119,7 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
continue; continue;
} }
if (!p->ready) { if (p->state != NXT_PROCESS_STATE_READY) {
nxt_debug(task, "remove not ready process %PI", p->pid); nxt_debug(task, "remove not ready process %PI", p->pid);
nxt_process_close_ports(task, p); nxt_process_close_ports(task, p);
@@ -127,12 +132,6 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
} nxt_runtime_process_loop; } nxt_runtime_process_loop;
nxt_runtime_process_add(task, process);
nxt_process_start(task, process);
process->ready = 1;
return NXT_OK; return NXT_OK;
} }
@@ -140,48 +139,36 @@ nxt_process_worker_setup(nxt_task_t *task, nxt_process_t *process, int parentfd)
nxt_pid_t nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process) nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{ {
int pipefd[2];
nxt_int_t ret; nxt_int_t ret;
nxt_pid_t pid; nxt_pid_t pid;
nxt_process_init_t *init;
if (nxt_slow_path(pipe(pipefd) == -1)) {
nxt_alert(task, "failed to create process pipe for passing rpid");
return -1;
}
init = process->init;
#if (NXT_HAVE_CLONE) #if (NXT_HAVE_CLONE)
pid = nxt_clone(SIGCHLD | init->isolation.clone.flags); pid = nxt_clone(SIGCHLD | process->isolation.clone.flags);
if (nxt_slow_path(pid < 0)) { if (nxt_slow_path(pid < 0)) {
nxt_alert(task, "clone() failed while creating \"%s\" %E", nxt_alert(task, "clone() failed for %s %E", process->name, nxt_errno);
init->name, nxt_errno); return pid;
goto cleanup;
} }
#else #else
pid = fork(); pid = fork();
if (nxt_slow_path(pid < 0)) { if (nxt_slow_path(pid < 0)) {
nxt_alert(task, "fork() failed while creating \"%s\" %E", nxt_alert(task, "fork() failed for %s %E", process->name, nxt_errno);
init->name, nxt_errno); return pid;
goto cleanup;
} }
#endif #endif
if (pid == 0) { if (pid == 0) {
/* Child. */ /* Child. */
if (nxt_slow_path(close(pipefd[1]) == -1)) { ret = nxt_process_child_fixup(task, process);
nxt_alert(task, "failed to close writer pipe fd");
}
ret = nxt_process_worker_setup(task, process, pipefd[0]);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
exit(1); nxt_process_quit(task, 1);
return -1;
} }
if (nxt_slow_path(close(pipefd[0]) == -1)) { nxt_runtime_process_add(task, process);
nxt_alert(task, "failed to close writer pipe fd");
if (nxt_slow_path(nxt_process_setup(task, process) != NXT_OK)) {
nxt_process_quit(task, 1);
} }
/* /*
@@ -193,78 +180,24 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
/* Parent. */ /* Parent. */
/*
* At this point, the child process is blocked reading the
* pipe fd to get its real pid (rpid).
*
* If anything goes wrong now, we need to terminate the child
* process by sending a NXT_ERROR in the pipe.
*/
#if (NXT_HAVE_CLONE) #if (NXT_HAVE_CLONE)
nxt_debug(task, "clone(\"%s\"): %PI", init->name, pid); nxt_debug(task, "clone(%s): %PI", process->name, pid);
#else #else
nxt_debug(task, "fork(\"%s\"): %PI", init->name, pid); nxt_debug(task, "fork(%s): %PI", process->name, pid);
#endif #endif
if (nxt_slow_path(write(pipefd[1], &pid, sizeof(pid)) == -1)) {
nxt_alert(task, "failed to write real pid");
goto fail;
}
#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
if (NXT_CLONE_USER(init->isolation.clone.flags)) {
ret = nxt_clone_credential_map(task, pid, init->user_cred,
&init->isolation.clone);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
}
#endif
ret = NXT_OK;
if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
nxt_alert(task, "failed to write status");
goto fail;
}
process->pid = pid; process->pid = pid;
nxt_runtime_process_add(task, process); nxt_runtime_process_add(task, process);
goto cleanup;
fail:
ret = NXT_ERROR;
if (nxt_slow_path(write(pipefd[1], &ret, sizeof(ret)) == -1)) {
nxt_alert(task, "failed to write status");
}
waitpid(pid, NULL, 0);
pid = -1;
cleanup:
if (nxt_slow_path(close(pipefd[0]) != 0)) {
nxt_alert(task, "failed to close pipe: %E", nxt_errno);
}
if (nxt_slow_path(close(pipefd[1]) != 0)) {
nxt_alert(task, "failed to close pipe: %E", nxt_errno);
}
return pid; return pid;
} }
static void static nxt_int_t
nxt_process_start(nxt_task_t *task, nxt_process_t *process) nxt_process_setup(nxt_task_t *task, nxt_process_t *process)
{ {
nxt_int_t ret, cap_setid; nxt_int_t ret;
nxt_port_t *port, *main_port; nxt_port_t *port, *main_port;
nxt_thread_t *thread; nxt_thread_t *thread;
nxt_runtime_t *rt; nxt_runtime_t *rt;
@@ -272,37 +205,17 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
const nxt_event_interface_t *interface; const nxt_event_interface_t *interface;
init = process->init; init = nxt_process_init(process);
nxt_log(task, NXT_LOG_INFO, "%s started", init->name); nxt_debug(task, "%s setup", process->name);
nxt_process_title(task, "unit: %s", init->name); nxt_process_title(task, "unit: %s", process->name);
thread = task->thread; thread = task->thread;
rt = thread->runtime; rt = thread->runtime;
nxt_random_init(&thread->random); nxt_random_init(&thread->random);
cap_setid = rt->capabilities.setid;
#if (NXT_HAVE_CLONE_NEWUSER)
if (!cap_setid && NXT_CLONE_USER(init->isolation.clone.flags)) {
cap_setid = 1;
}
#endif
if (cap_setid) {
ret = nxt_credential_setgids(task, init->user_cred);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
ret = nxt_credential_setuid(task, init->user_cred);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
}
rt->type = init->type; rt->type = init->type;
engine = thread->engine; engine = thread->engine;
@@ -312,17 +225,17 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
interface = nxt_service_get(rt->services, "engine", rt->engine); interface = nxt_service_get(rt->services, "engine", rt->engine);
if (nxt_slow_path(interface == NULL)) { if (nxt_slow_path(interface == NULL)) {
goto fail; return NXT_ERROR;
} }
if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) { if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) {
goto fail; return NXT_ERROR;
} }
ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads, ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
60000 * 1000000LL); 60000 * 1000000LL);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
goto fail; return NXT_ERROR;
} }
main_port = rt->port_by_type[NXT_PROCESS_MAIN]; main_port = rt->port_by_type[NXT_PROCESS_MAIN];
@@ -334,28 +247,282 @@ nxt_process_start(nxt_task_t *task, nxt_process_t *process)
nxt_port_write_close(port); nxt_port_write_close(port);
ret = init->start(task, init->data);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
nxt_port_enable(task, port, init->port_handlers); nxt_port_enable(task, port, init->port_handlers);
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY, ret = init->setup(task, process);
-1, init->stream, 0, NULL);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
nxt_log(task, NXT_LOG_ERR, "failed to send READY message to main"); return NXT_ERROR;
}
switch (process->state) {
case NXT_PROCESS_STATE_CREATED:
ret = nxt_process_send_created(task, process);
break;
case NXT_PROCESS_STATE_READY:
ret = nxt_process_send_ready(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
break;
}
ret = init->start(task, &process->data);
break;
default:
nxt_assert(0);
}
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "%s failed to start", process->name);
}
return ret;
}
static nxt_int_t
nxt_process_send_created(nxt_task_t *task, nxt_process_t *process)
{
uint32_t stream;
nxt_int_t ret;
nxt_port_t *my_port, *main_port;
nxt_runtime_t *rt;
nxt_assert(process->state == NXT_PROCESS_STATE_CREATED);
rt = task->thread->runtime;
my_port = nxt_process_port_first(process);
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
nxt_assert(my_port != NULL && main_port != NULL);
stream = nxt_port_rpc_register_handler(task, my_port,
nxt_process_created_ok,
nxt_process_created_error,
main_port->pid, process);
if (nxt_slow_path(stream == 0)) {
return NXT_ERROR;
}
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_CREATED,
-1, stream, my_port->id, NULL);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "%s failed to send CREATED message", process->name);
nxt_port_rpc_cancel(task, my_port, stream);
return NXT_ERROR;
}
nxt_debug(task, "%s created", process->name);
return NXT_OK;
}
static void
nxt_process_created_ok(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
{
nxt_int_t ret;
nxt_process_t *process;
nxt_process_init_t *init;
process = data;
init = nxt_process_init(process);
ret = nxt_process_apply_creds(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail; goto fail;
} }
return; nxt_log(task, NXT_LOG_INFO, "%s started", process->name);
ret = init->start(task, &process->data);
fail: fail:
exit(1); nxt_process_quit(task, ret == NXT_OK ? 0 : 1);
}
static void
nxt_process_created_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_process_t *process;
nxt_process_init_t *init;
process = data;
init = nxt_process_init(process);
nxt_alert(task, "%s failed to start", init->name);
nxt_process_quit(task, 1);
}
nxt_int_t
nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
ret = nxt_process_apply_creds(task, process);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
process->state = NXT_PROCESS_STATE_READY;
return NXT_OK;
}
#if (NXT_HAVE_CLONE_NEWUSER)
nxt_int_t
nxt_process_vldt_isolation_creds(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
nxt_clone_t *clone;
nxt_credential_t *creds;
clone = &process->isolation.clone;
creds = process->user_cred;
if (clone->uidmap.size == 0 && clone->gidmap.size == 0) {
return NXT_OK;
}
if (!nxt_is_clone_flag_set(clone->flags, NEWUSER)) {
if (nxt_slow_path(clone->uidmap.size > 0)) {
nxt_log(task, NXT_LOG_ERR, "\"uidmap\" is set but "
"\"isolation.namespaces.credential\" is false or unset");
return NXT_ERROR;
}
if (nxt_slow_path(clone->gidmap.size > 0)) {
nxt_log(task, NXT_LOG_ERR, "\"gidmap\" is set but "
"\"isolation.namespaces.credential\" is false or unset");
return NXT_ERROR;
}
return NXT_OK;
}
ret = nxt_clone_vldt_credential_uidmap(task, &clone->uidmap, creds);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
return nxt_clone_vldt_credential_gidmap(task, &clone->gidmap, creds);
}
#endif
nxt_int_t
nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process, nxt_str_t *user,
nxt_str_t *group)
{
char *str;
process->user_cred = nxt_mp_zalloc(process->mem_pool,
sizeof(nxt_credential_t));
if (nxt_slow_path(process->user_cred == NULL)) {
return NXT_ERROR;
}
str = nxt_mp_zalloc(process->mem_pool, user->length + 1);
if (nxt_slow_path(str == NULL)) {
return NXT_ERROR;
}
nxt_memcpy(str, user->start, user->length);
str[user->length] = '\0';
process->user_cred->user = str;
if (group->start != NULL) {
str = nxt_mp_zalloc(process->mem_pool, group->length + 1);
if (nxt_slow_path(str == NULL)) {
return NXT_ERROR;
}
nxt_memcpy(str, group->start, group->length);
str[group->length] = '\0';
} else {
str = NULL;
}
return nxt_credential_get(task, process->mem_pool, process->user_cred, str);
}
nxt_int_t
nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret, cap_setid;
nxt_runtime_t *rt;
rt = task->thread->runtime;
cap_setid = rt->capabilities.setid;
#if (NXT_HAVE_CLONE && NXT_HAVE_CLONE_NEWUSER)
if (!cap_setid
&& nxt_is_clone_flag_set(process->isolation.clone.flags, NEWUSER)) {
cap_setid = 1;
}
#endif
if (cap_setid) {
ret = nxt_credential_setgids(task, process->user_cred);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
ret = nxt_credential_setuid(task, process->user_cred);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
}
return NXT_OK;
}
static nxt_int_t
nxt_process_send_ready(nxt_task_t *task, nxt_process_t *process)
{
nxt_int_t ret;
nxt_port_t *main_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
nxt_assert(main_port != NULL);
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_PROCESS_READY,
-1, process->stream, 0, NULL);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_alert(task, "%s failed to send READY message", process->name);
return NXT_ERROR;
}
nxt_debug(task, "%s sent ready", process->name);
return NXT_OK;
} }
@@ -625,3 +792,50 @@ nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port)
return res; return res;
} }
void
nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status)
{
nxt_uint_t n;
nxt_queue_t *listen;
nxt_runtime_t *rt;
nxt_queue_link_t *link, *next;
nxt_listen_event_t *lev;
nxt_listen_socket_t *ls;
rt = task->thread->runtime;
nxt_debug(task, "close listen connections");
listen = &task->thread->engine->listen_connections;
for (link = nxt_queue_first(listen);
link != nxt_queue_tail(listen);
link = next)
{
next = nxt_queue_next(link);
lev = nxt_queue_link_data(link, nxt_listen_event_t, link);
nxt_queue_remove(link);
nxt_fd_event_close(task->thread->engine, &lev->socket);
}
if (rt->listen_sockets != NULL) {
ls = rt->listen_sockets->elts;
n = rt->listen_sockets->nelts;
while (n != 0) {
nxt_socket_close(task, ls->socket);
ls->socket = -1;
ls++;
n--;
}
rt->listen_sockets->nelts = 0;
}
nxt_runtime_quit(task, exit_status);
}

View File

@@ -8,68 +8,123 @@
#define _NXT_PROCESS_H_INCLUDED_ #define _NXT_PROCESS_H_INCLUDED_
#if (NXT_HAVE_CLONE) #if (NXT_HAVE_CLONE)
#include <unistd.h>
#include <nxt_clone.h> #include <nxt_clone.h>
#endif #endif
#if (NXT_HAVE_CLONE)
/*
* Old glibc wrapper for getpid(2) returns a cached pid invalidated only by
* fork(2) calls. As we use clone(2) for container, it returns the wrong pid.
*/
#define nxt_getpid() \
syscall(__NR_getpid)
#else
#define nxt_getpid() \
getpid()
#endif
typedef pid_t nxt_pid_t; typedef pid_t nxt_pid_t;
typedef struct nxt_process_init_s nxt_process_init_t; typedef struct nxt_common_app_conf_s nxt_common_app_conf_t;
typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, void *data);
typedef nxt_int_t (*nxt_process_restart_t)(nxt_task_t *task, nxt_runtime_t *rt,
nxt_process_init_t *init);
struct nxt_process_init_s {
nxt_mp_t *mem_pool;
nxt_process_start_t start;
const char *name;
nxt_credential_t *user_cred;
const nxt_port_handlers_t *port_handlers; typedef struct {
const nxt_sig_event_t *signals; nxt_runtime_t *rt;
} nxt_discovery_init_t;
nxt_process_type_t type;
void *data; typedef struct {
uint32_t stream; nxt_str_t conf;
#if (NXT_TLS)
union { nxt_array_t *certs;
#if (NXT_HAVE_CLONE)
nxt_clone_t clone;
#endif #endif
} isolation; } nxt_controller_init_t;
};
typedef union {
void *discovery;
nxt_controller_init_t controller;
void *router;
nxt_common_app_conf_t *app;
} nxt_process_data_t;
typedef enum {
NXT_PROCESS_STATE_CREATING = 0,
NXT_PROCESS_STATE_CREATED,
NXT_PROCESS_STATE_READY,
} nxt_process_state_t;
typedef struct nxt_port_mmap_s nxt_port_mmap_t; typedef struct nxt_port_mmap_s nxt_port_mmap_t;
typedef struct nxt_port_mmaps_s nxt_port_mmaps_t;
struct nxt_port_mmaps_s {
typedef struct {
nxt_thread_mutex_t mutex; nxt_thread_mutex_t mutex;
uint32_t size; uint32_t size;
uint32_t cap; uint32_t cap;
nxt_port_mmap_t *elts; nxt_port_mmap_t *elts;
}; } nxt_port_mmaps_t;
typedef struct { typedef struct {
nxt_pid_t pid; nxt_pid_t pid;
const char *name;
nxt_queue_t ports; /* of nxt_port_t */ nxt_queue_t ports; /* of nxt_port_t */
nxt_bool_t ready; nxt_process_state_t state;
nxt_bool_t registered; nxt_bool_t registered;
nxt_int_t use_count; nxt_int_t use_count;
nxt_process_init_t *init;
nxt_port_mmaps_t incoming; nxt_port_mmaps_t incoming;
nxt_port_mmaps_t outgoing; nxt_port_mmaps_t outgoing;
nxt_thread_mutex_t cp_mutex; nxt_thread_mutex_t cp_mutex;
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
uint32_t stream;
nxt_mp_t *mem_pool;
nxt_credential_t *user_cred;
nxt_process_data_t data;
union {
#if (NXT_HAVE_CLONE)
nxt_clone_t clone;
#endif
} isolation;
} nxt_process_t; } nxt_process_t;
typedef nxt_int_t (*nxt_process_prefork_t)(nxt_task_t *task,
nxt_process_t *process, nxt_mp_t *mp);
typedef nxt_int_t (*nxt_process_postfork_t)(nxt_task_t *task,
nxt_process_t *process, nxt_mp_t *mp);
typedef nxt_int_t (*nxt_process_setup_t)(nxt_task_t *task,
nxt_process_t *process);
typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task,
nxt_process_data_t *data);
typedef struct {
const char *name;
nxt_process_type_t type;
nxt_process_prefork_t prefork;
nxt_process_setup_t setup;
nxt_process_start_t start;
uint8_t restart; /* 1-bit */
const nxt_port_handlers_t *port_handlers;
const nxt_sig_event_t *signals;
} nxt_process_init_t;
extern nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; extern nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
extern nxt_bool_t extern nxt_bool_t
nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX]; nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX];
@@ -84,6 +139,9 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns);
NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
char ***orig_envp); char ***orig_envp);
#define nxt_process_init(process) \
(nxt_pointer_to(process, sizeof(nxt_process_t)))
#define nxt_process_port_remove(port) \ #define nxt_process_port_remove(port) \
nxt_queue_remove(&port->link) nxt_queue_remove(&port->link)
@@ -113,11 +171,18 @@ void nxt_process_connected_port_remove(nxt_process_t *process,
nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
nxt_port_t *port); nxt_port_t *port);
void nxt_worker_process_quit_handler(nxt_task_t *task, void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status);
nxt_port_recv_msg_t *msg); void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_init_destroy(nxt_runtime_t *rt, nxt_process_init_t *init); nxt_int_t nxt_process_core_setup(nxt_task_t *task, nxt_process_t *process);
nxt_int_t nxt_process_creds_set(nxt_task_t *task, nxt_process_t *process,
nxt_str_t *user, nxt_str_t *group);
nxt_int_t nxt_process_apply_creds(nxt_task_t *task, nxt_process_t *process);
#if (NXT_HAVE_CLONE_NEWUSER)
nxt_int_t nxt_process_vldt_isolation_creds(nxt_task_t *task,
nxt_process_t *process);
#endif
#if (NXT_HAVE_SETPROCTITLE) #if (NXT_HAVE_SETPROCTITLE)

View File

@@ -13,7 +13,7 @@ typedef enum {
NXT_PROCESS_DISCOVERY, NXT_PROCESS_DISCOVERY,
NXT_PROCESS_CONTROLLER, NXT_PROCESS_CONTROLLER,
NXT_PROCESS_ROUTER, NXT_PROCESS_ROUTER,
NXT_PROCESS_WORKER, NXT_PROCESS_APP,
NXT_PROCESS_MAX, NXT_PROCESS_MAX,
} nxt_process_type_t; } nxt_process_type_t;

View File

@@ -65,7 +65,8 @@ typedef struct {
PyObject_HEAD PyObject_HEAD
} nxt_py_error_t; } nxt_py_error_t;
static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static nxt_int_t nxt_python_start(nxt_task_t *task,
nxt_process_data_t *data);
static nxt_int_t nxt_python_init_strings(void); static nxt_int_t nxt_python_init_strings(void);
static void nxt_python_request_handler(nxt_unit_request_info_t *req); static void nxt_python_request_handler(nxt_unit_request_info_t *req);
static void nxt_python_atexit(void); static void nxt_python_atexit(void);
@@ -116,7 +117,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
nxt_string("python"), nxt_string("python"),
PY_VERSION, PY_VERSION,
NULL, NULL,
nxt_python_init, nxt_python_start,
}; };
@@ -211,7 +212,7 @@ static nxt_python_string_t nxt_python_strings[] = {
static nxt_int_t static nxt_int_t
nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
int rc; int rc;
char *nxt_py_module; char *nxt_py_module;
@@ -219,6 +220,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
PyObject *obj, *pypath, *module; PyObject *obj, *pypath, *module;
nxt_unit_ctx_t *unit_ctx; nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t python_init; nxt_unit_init_t python_init;
nxt_common_app_conf_t *app_conf;
nxt_python_app_conf_t *c; nxt_python_app_conf_t *c;
#if PY_MAJOR_VERSION == 3 #if PY_MAJOR_VERSION == 3
char *path; char *path;
@@ -229,7 +231,8 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
static const char bin_python[] = "/bin/python"; static const char bin_python[] = "/bin/python";
#endif #endif
c = &conf->u.python; app_conf = data->app;
c = &app_conf->u.python;
if (c->module.length == 0) { if (c->module.length == 0) {
nxt_alert(task, "python module is empty"); nxt_alert(task, "python module is empty");
@@ -410,7 +413,7 @@ nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
nxt_unit_default_init(task, &python_init); nxt_unit_default_init(task, &python_init);
python_init.callbacks.request_handler = nxt_python_request_handler; python_init.callbacks.request_handler = nxt_python_request_handler;
python_init.shm_limit = conf->shm_limit; python_init.shm_limit = data->app->shm_limit;
unit_ctx = nxt_unit_init(&python_init); unit_ctx = nxt_unit_init(&python_init);
if (nxt_slow_path(unit_ctx == NULL)) { if (nxt_slow_path(unit_ctx == NULL)) {

View File

@@ -75,6 +75,9 @@ struct nxt_port_select_state_s {
typedef struct nxt_port_select_state_s nxt_port_select_state_t; typedef struct nxt_port_select_state_s nxt_port_select_state_t;
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
nxt_mp_t *mp);
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
static void nxt_router_greet_controller(nxt_task_t *task, static void nxt_router_greet_controller(nxt_task_t *task,
nxt_port_t *controller_port); nxt_port_t *controller_port);
@@ -268,8 +271,8 @@ static const nxt_str_t *nxt_app_msg_prefix[] = {
}; };
nxt_port_handlers_t nxt_router_process_port_handlers = { static const nxt_port_handlers_t nxt_router_process_port_handlers = {
.quit = nxt_worker_process_quit_handler, .quit = nxt_signal_quit_handler,
.new_port = nxt_router_new_port_handler, .new_port = nxt_router_new_port_handler,
.change_file = nxt_port_change_log_file_handler, .change_file = nxt_port_change_log_file_handler,
.mmap = nxt_port_mmap_handler, .mmap = nxt_port_mmap_handler,
@@ -282,8 +285,29 @@ nxt_port_handlers_t nxt_router_process_port_handlers = {
}; };
nxt_int_t const nxt_process_init_t nxt_router_process = {
nxt_router_start(nxt_task_t *task, void *data) .name = "router",
.type = NXT_PROCESS_ROUTER,
.prefork = nxt_router_prefork,
.restart = 1,
.setup = nxt_process_core_setup,
.start = nxt_router_start,
.port_handlers = &nxt_router_process_port_handlers,
.signals = nxt_process_signals,
};
static nxt_int_t
nxt_router_prefork(nxt_task_t *task, nxt_process_t *process, nxt_mp_t *mp)
{
nxt_runtime_stop_app_processes(task, task->thread->runtime);
return NXT_OK;
}
static nxt_int_t
nxt_router_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
nxt_int_t ret; nxt_int_t ret;
nxt_port_t *controller_port; nxt_port_t *controller_port;
@@ -292,6 +316,8 @@ nxt_router_start(nxt_task_t *task, void *data)
rt = task->thread->runtime; rt = task->thread->runtime;
nxt_log(task, NXT_LOG_INFO, "router started");
#if (NXT_TLS) #if (NXT_TLS)
rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL"); rt->tls = nxt_service_get(rt->services, "SSL/TLS", "OpenSSL");
if (nxt_slow_path(rt->tls == NULL)) { if (nxt_slow_path(rt->tls == NULL)) {
@@ -382,8 +408,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
goto failed; goto failed;
} }
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
stream, port->id, b); -1, stream, port->id, b);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream); nxt_port_rpc_cancel(task, port, stream);
@@ -862,7 +888,7 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
} }
if (msg->u.new_port == NULL if (msg->u.new_port == NULL
|| msg->u.new_port->type != NXT_PROCESS_WORKER) || msg->u.new_port->type != NXT_PROCESS_APP)
{ {
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
} }
@@ -2400,8 +2426,8 @@ nxt_router_app_rpc_create(nxt_task_t *task,
goto fail; goto fail;
} }
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_WORKER, -1, ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
stream, router_port->id, b); -1, stream, router_port->id, b);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, router_port, stream); nxt_port_rpc_cancel(task, router_port, stream);

View File

@@ -21,6 +21,7 @@ static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data); static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status); static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine); static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data); static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task, static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
nxt_runtime_t *rt); nxt_runtime_t *rt);
@@ -438,7 +439,7 @@ nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
} }
if (rt->type == NXT_PROCESS_MAIN) { if (rt->type == NXT_PROCESS_MAIN) {
nxt_main_stop_all_processes(task, rt); nxt_runtime_stop_all_processes(task, rt);
done = 0; done = 0;
} }
} }
@@ -478,6 +479,50 @@ nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
} }
void
nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_port_t *port;
nxt_process_t *process;
nxt_process_init_t *init;
nxt_runtime_process_each(rt, process) {
init = nxt_process_init(process);
if (init->type == NXT_PROCESS_APP) {
nxt_process_port_each(process, port) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
0, 0, NULL);
} nxt_process_port_loop;
}
} nxt_runtime_process_loop;
}
static void
nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_port_t *port;
nxt_process_t *process;
nxt_runtime_process_each(rt, process) {
nxt_process_port_each(process, port) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
0, NULL);
} nxt_process_port_loop;
} nxt_runtime_process_loop;
}
static void static void
nxt_runtime_exit(nxt_task_t *task, void *obj, void *data) nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
{ {
@@ -525,6 +570,10 @@ nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
} nxt_runtime_process_loop; } nxt_runtime_process_loop;
if (rt->port_by_type[rt->type] != NULL) {
nxt_port_use(task, rt->port_by_type[rt->type], -1);
}
nxt_thread_mutex_destroy(&rt->processes_mutex); nxt_thread_mutex_destroy(&rt->processes_mutex);
status = rt->status; status = rt->status;
@@ -1306,7 +1355,9 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
/* TODO: memory failures. */ /* TODO: memory failures. */
process = nxt_mp_zalloc(rt->mem_pool, sizeof(nxt_process_t)); process = nxt_mp_zalloc(rt->mem_pool,
sizeof(nxt_process_t) + sizeof(nxt_process_init_t));
if (nxt_slow_path(process == NULL)) { if (nxt_slow_path(process == NULL)) {
return NULL; return NULL;
} }
@@ -1347,8 +1398,9 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
nxt_thread_mutex_destroy(&process->outgoing.mutex); nxt_thread_mutex_destroy(&process->outgoing.mutex);
nxt_thread_mutex_destroy(&process->cp_mutex); nxt_thread_mutex_destroy(&process->cp_mutex);
if (process->init != NULL) { /* processes from nxt_runtime_process_get() have no memory pool */
nxt_mp_destroy(process->init->mem_pool); if (process->mem_pool != NULL) {
nxt_mp_destroy(process->mem_pool);
} }
nxt_mp_free(rt->mem_pool, process); nxt_mp_free(rt->mem_pool, process);

View File

@@ -110,6 +110,7 @@ nxt_port_t *nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type); nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type);
void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port); void nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port);
void nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt);
NXT_EXPORT nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid, NXT_EXPORT nxt_port_t *nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
nxt_port_id_t port_id); nxt_port_id_t port_id);

67
src/nxt_signal_handlers.c Normal file
View File

@@ -0,0 +1,67 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_main_process.h>
#include <nxt_router.h>
static void nxt_signal_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_signal_sigterm_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_signal_sigquit_handler(nxt_task_t *task, void *obj, void *data);
const nxt_sig_event_t nxt_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_signal_handler),
nxt_event_signal(SIGINT, nxt_signal_sigterm_handler),
nxt_event_signal(SIGQUIT, nxt_signal_sigquit_handler),
nxt_event_signal(SIGTERM, nxt_signal_sigterm_handler),
nxt_event_signal(SIGCHLD, nxt_signal_handler),
nxt_event_signal(SIGUSR1, nxt_signal_handler),
nxt_event_signal(SIGUSR2, nxt_signal_handler),
nxt_event_signal_end,
};
static void
nxt_signal_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
(int) (uintptr_t) obj, data);
}
void
nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_process_quit(task, 0);
}
static void
nxt_signal_sigterm_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "sigterm handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
/* A fast exit. */
nxt_runtime_quit(task, 0);
}
static void
nxt_signal_sigquit_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "sigquit handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
/* A graceful exit. */
nxt_process_quit(task, 0);
}

View File

@@ -4248,7 +4248,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *dst,
m.new_port.id = new_port->id; m.new_port.id = new_port->id;
m.new_port.pid = new_port->pid; m.new_port.pid = new_port->pid;
m.new_port.type = NXT_PROCESS_WORKER; m.new_port.type = NXT_PROCESS_APP;
m.new_port.max_size = 16 * 1024; m.new_port.max_size = 16 * 1024;
m.new_port.max_share = 64 * 1024; m.new_port.max_share = 64 * 1024;

View File

@@ -1,118 +0,0 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_main_process.h>
#include <nxt_router.h>
static void nxt_worker_process_quit(nxt_task_t *task);
static void nxt_worker_process_signal_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
const nxt_sig_event_t nxt_worker_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler),
nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler),
nxt_event_signal(SIGQUIT, nxt_worker_process_sigquit_handler),
nxt_event_signal(SIGTERM, nxt_worker_process_sigterm_handler),
nxt_event_signal(SIGCHLD, nxt_worker_process_signal_handler),
nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler),
nxt_event_signal(SIGUSR2, nxt_worker_process_signal_handler),
nxt_event_signal_end,
};
static void
nxt_worker_process_quit(nxt_task_t *task)
{
nxt_uint_t n;
nxt_queue_t *listen;
nxt_runtime_t *rt;
nxt_queue_link_t *link, *next;
nxt_listen_event_t *lev;
nxt_listen_socket_t *ls;
rt = task->thread->runtime;
nxt_debug(task, "close listen connections");
listen = &task->thread->engine->listen_connections;
for (link = nxt_queue_first(listen);
link != nxt_queue_tail(listen);
link = next)
{
next = nxt_queue_next(link);
lev = nxt_queue_link_data(link, nxt_listen_event_t, link);
nxt_queue_remove(link);
nxt_fd_event_close(task->thread->engine, &lev->socket);
}
if (rt->listen_sockets != NULL) {
ls = rt->listen_sockets->elts;
n = rt->listen_sockets->nelts;
while (n != 0) {
nxt_socket_close(task, ls->socket);
ls->socket = -1;
ls++;
n--;
}
rt->listen_sockets->nelts = 0;
}
nxt_runtime_quit(task, 0);
}
static void
nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_trace(task, "signal signo:%d (%s) recevied, ignored",
(int) (uintptr_t) obj, data);
}
void
nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_worker_process_quit(task);
}
static void
nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "sigterm handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
/* A fast exit. */
nxt_runtime_quit(task, 0);
}
static void
nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "sigquit handler signo:%d (%s)",
(int) (uintptr_t) obj, data);
/* A graceful exit. */
nxt_worker_process_quit(task);
}

View File

@@ -95,8 +95,8 @@ static int nxt_perl_psgi_result_array(PerlInterpreter *my_perl,
static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
nxt_unit_request_info_t *req); nxt_unit_request_info_t *req);
static nxt_int_t nxt_perl_psgi_init(nxt_task_t *task, static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task,
nxt_common_app_conf_t *conf); nxt_process_data_t *conf);
static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req); static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req);
static void nxt_perl_psgi_atexit(void); static void nxt_perl_psgi_atexit(void);
@@ -119,7 +119,7 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
nxt_string("perl"), nxt_string("perl"),
PERL_VERSION_STRING, PERL_VERSION_STRING,
NULL, NULL,
nxt_perl_psgi_init, nxt_perl_psgi_start,
}; };
@@ -1134,14 +1134,17 @@ nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
static nxt_int_t static nxt_int_t
nxt_perl_psgi_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
int rc; int rc;
nxt_unit_ctx_t *unit_ctx; nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t perl_init; nxt_unit_init_t perl_init;
PerlInterpreter *my_perl; PerlInterpreter *my_perl;
nxt_common_app_conf_t *conf;
nxt_perl_psgi_module_t module; nxt_perl_psgi_module_t module;
conf = data->app;
my_perl = nxt_perl_psgi_interpreter_init(task, conf->u.perl.script, my_perl = nxt_perl_psgi_interpreter_init(task, conf->u.perl.script,
&module.app); &module.app);

View File

@@ -28,7 +28,8 @@ typedef struct {
} nxt_ruby_rack_init_t; } nxt_ruby_rack_init_t;
static nxt_int_t nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static nxt_int_t nxt_ruby_start(nxt_task_t *task,
nxt_process_data_t *data);
static VALUE nxt_ruby_init_basic(VALUE arg); static VALUE nxt_ruby_init_basic(VALUE arg);
static nxt_int_t nxt_ruby_init_io(nxt_task_t *task); static nxt_int_t nxt_ruby_init_io(nxt_task_t *task);
static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init); static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
@@ -78,21 +79,24 @@ NXT_EXPORT nxt_app_module_t nxt_app_module = {
nxt_string("ruby"), nxt_string("ruby"),
ruby_version, ruby_version,
NULL, NULL,
nxt_ruby_init, nxt_ruby_start,
}; };
static nxt_int_t static nxt_int_t
nxt_ruby_init(nxt_task_t *task, nxt_common_app_conf_t *conf) nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
int state, rc; int state, rc;
VALUE res; VALUE res;
nxt_unit_ctx_t *unit_ctx; nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t ruby_unit_init; nxt_unit_init_t ruby_unit_init;
nxt_ruby_rack_init_t rack_init; nxt_ruby_rack_init_t rack_init;
nxt_common_app_conf_t *conf;
static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" }; static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
conf = data->app;
RUBY_INIT_STACK RUBY_INIT_STACK
ruby_init(); ruby_init();
ruby_options(2, argv); ruby_options(2, argv);

View File

@@ -13,6 +13,7 @@ class TestJavaApplication(TestApplicationJava):
[ [
r'realpath.*failed', r'realpath.*failed',
r'failed to apply new conf', r'failed to apply new conf',
r'application setup failed',
] ]
) )
self.assertIn( self.assertIn(