Files
nginx-unit/src/nxt_runtime.c
Andrew Clayton 57fc9201cb Socket: Created control socket & pid file directories.
@alejandro-colomar reported an issue on GitHub whereby Unit would fail
to start due to not being able to create the control socket (a Unix
Domain Socket)

  2022/08/05 20:12:22 [alert] 21613#21613 bind(6,
  unix:/opt/local/unit/var/run/unit/control.unit.sock.tmp)
  failed (2: No such file or directory)

This could happen if the control socket was set to a directory that
doesn't exist. A common place to put the control socket would be under
/run/unit, and while /run will exist, /run/unit may well not (/run
is/should be cleared on each boot).

The pid file would also generally go under /run/unit, though this is
created after the control socket, however it could go someplace else so
we should also ensure its directory exists.

This commit will try to create the pid file and control sockets parent
directory. In some cases the user will need to ensure that the rest of
the path already exists.

This adds a new nxt_fs_mkdir_parent() function that given a full path
to a file (or directory), strips the last component off before passing
the remaining directory path to nxt_fs_mkdir().

Cc: Konstantin Pavlov <thresh@nginx.com>
Closes: <https://github.com/nginx/unit/issues/742>
Reported-by: Alejandro Colomar <alx@nginx.com>
Reviewed-by: Alejandro Colomar <alx@nginx.com>
Tested-by: Alejandro Colomar <alx@nginx.com>
Signed-off-by: Andrew Clayton <a.clayton@nginx.com>
2022-10-03 14:26:45 +01:00

1721 lines
40 KiB
C

/*
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_main_process.h>
#include <nxt_router.h>
#include <nxt_regex.h>
static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_systemd_listen_sockets(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt);
static void nxt_runtime_start(nxt_task_t *task, void *obj, void *data);
static void nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status);
static void nxt_runtime_close_idle_connections(nxt_event_engine_t *engine);
static void nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_runtime_exit(nxt_task_t *task, void *obj, void *data);
static nxt_int_t nxt_runtime_event_engine_change(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_log_files_init(nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_log_files_create(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_runtime_pid_file_create(nxt_task_t *task,
nxt_file_name_t *pid_file);
static void nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
nxt_runtime_cont_t cont);
static void nxt_runtime_thread_pool_init(void);
static void nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj,
void *data);
static nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
static void nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port);
nxt_int_t
nxt_runtime_create(nxt_task_t *task)
{
nxt_mp_t *mp;
nxt_int_t ret;
nxt_array_t *listen_sockets;
nxt_runtime_t *rt;
nxt_app_lang_module_t *lang;
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NXT_ERROR;
}
rt = nxt_mp_zget(mp, sizeof(nxt_runtime_t));
if (nxt_slow_path(rt == NULL)) {
goto fail;
}
task->thread->runtime = rt;
rt->mem_pool = mp;
nxt_thread_mutex_create(&rt->processes_mutex);
rt->services = nxt_services_init(mp);
if (nxt_slow_path(rt->services == NULL)) {
goto fail;
}
rt->languages = nxt_array_create(mp, 1, sizeof(nxt_app_lang_module_t));
if (nxt_slow_path(rt->languages == NULL)) {
goto fail;
}
/* Should not fail. */
lang = nxt_array_add(rt->languages);
lang->type = NXT_APP_EXTERNAL;
lang->version = (u_char *) "";
lang->file = NULL;
lang->module = &nxt_external_module;
lang->mounts = nxt_array_create(mp, 1, sizeof(nxt_fs_mount_t));
if (nxt_slow_path(lang->mounts == NULL)) {
goto fail;
}
listen_sockets = nxt_array_create(mp, 1, sizeof(nxt_listen_socket_t));
if (nxt_slow_path(listen_sockets == NULL)) {
goto fail;
}
rt->listen_sockets = listen_sockets;
ret = nxt_runtime_inherited_listen_sockets(task, rt);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
if (nxt_runtime_hostname(task, rt) != NXT_OK) {
goto fail;
}
if (nxt_slow_path(nxt_runtime_log_files_init(rt) != NXT_OK)) {
goto fail;
}
if (nxt_runtime_event_engines(task, rt) != NXT_OK) {
goto fail;
}
if (nxt_slow_path(nxt_runtime_thread_pools(task->thread, rt) != NXT_OK)) {
goto fail;
}
rt->start = nxt_runtime_initial_start;
if (nxt_runtime_conf_init(task, rt) != NXT_OK) {
goto fail;
}
if (nxt_port_rpc_init() != NXT_OK) {
goto fail;
}
if (nxt_slow_path(nxt_http_register_variables() != NXT_OK)) {
goto fail;
}
if (nxt_slow_path(nxt_var_index_init() != NXT_OK)) {
goto fail;
}
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_runtime_start, task, rt, NULL);
return NXT_OK;
fail:
nxt_mp_destroy(mp);
return NXT_ERROR;
}
static nxt_int_t
nxt_runtime_inherited_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
{
u_char *v, *p;
nxt_int_t type;
nxt_array_t *inherited_sockets;
nxt_socket_t s;
nxt_listen_socket_t *ls;
v = (u_char *) getenv("NGINX");
if (v == NULL) {
return nxt_runtime_systemd_listen_sockets(task, rt);
}
nxt_alert(task, "using inherited listen sockets: %s", v);
inherited_sockets = nxt_array_create(rt->mem_pool,
1, sizeof(nxt_listen_socket_t));
if (inherited_sockets == NULL) {
return NXT_ERROR;
}
rt->inherited_sockets = inherited_sockets;
for (p = v; *p != '\0'; p++) {
if (*p == ';') {
s = nxt_int_parse(v, p - v);
if (nxt_slow_path(s < 0)) {
nxt_alert(task, "invalid socket number \"%s\" "
"in NGINX environment variable, "
"ignoring the rest of the variable", v);
return NXT_ERROR;
}
v = p + 1;
ls = nxt_array_zero_add(inherited_sockets);
if (nxt_slow_path(ls == NULL)) {
return NXT_ERROR;
}
ls->socket = s;
ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
if (nxt_slow_path(ls->sockaddr == NULL)) {
return NXT_ERROR;
}
type = nxt_socket_getsockopt(task, s, SOL_SOCKET, SO_TYPE);
if (nxt_slow_path(type == -1)) {
return NXT_ERROR;
}
ls->sockaddr->type = (uint16_t) type;
}
}
return NXT_OK;
}
static nxt_int_t
nxt_runtime_systemd_listen_sockets(nxt_task_t *task, nxt_runtime_t *rt)
{
u_char *nfd, *pid;
nxt_int_t n;
nxt_array_t *inherited_sockets;
nxt_socket_t s;
nxt_listen_socket_t *ls;
/*
* Number of listening sockets passed. The socket
* descriptors start from number 3 and are sequential.
*/
nfd = (u_char *) getenv("LISTEN_FDS");
if (nfd == NULL) {
return NXT_OK;
}
/* The pid of the service process. */
pid = (u_char *) getenv("LISTEN_PID");
if (pid == NULL) {
return NXT_OK;
}
n = nxt_int_parse(nfd, nxt_strlen(nfd));
if (n < 0) {
return NXT_OK;
}
if (nxt_pid != nxt_int_parse(pid, nxt_strlen(pid))) {
return NXT_OK;
}
nxt_log(task, NXT_LOG_INFO, "using %i systemd listen sockets", n);
inherited_sockets = nxt_array_create(rt->mem_pool,
n, sizeof(nxt_listen_socket_t));
if (inherited_sockets == NULL) {
return NXT_ERROR;
}
rt->inherited_sockets = inherited_sockets;
for (s = 3; s < n; s++) {
ls = nxt_array_zero_add(inherited_sockets);
if (nxt_slow_path(ls == NULL)) {
return NXT_ERROR;
}
ls->socket = s;
ls->sockaddr = nxt_getsockname(task, rt->mem_pool, s);
if (nxt_slow_path(ls->sockaddr == NULL)) {
return NXT_ERROR;
}
ls->sockaddr->type = SOCK_STREAM;
}
return NXT_OK;
}
static nxt_int_t
nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_thread_t *thread;
nxt_event_engine_t *engine;
const nxt_event_interface_t *interface;
interface = nxt_service_get(rt->services, "engine", NULL);
if (nxt_slow_path(interface == NULL)) {
/* TODO: log */
return NXT_ERROR;
}
engine = nxt_event_engine_create(task, interface,
nxt_main_process_signals, 0, 0);
if (nxt_slow_path(engine == NULL)) {
return NXT_ERROR;
}
thread = task->thread;
thread->engine = engine;
#if 0
thread->fiber = &engine->fibers->fiber;
#endif
engine->id = rt->last_engine_id++;
engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
nxt_queue_init(&rt->engines);
nxt_queue_insert_tail(&rt->engines, &engine->link);
return NXT_OK;
}
static nxt_int_t
nxt_runtime_thread_pools(nxt_thread_t *thr, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_array_t *thread_pools;
thread_pools = nxt_array_create(rt->mem_pool, 1,
sizeof(nxt_thread_pool_t *));
if (nxt_slow_path(thread_pools == NULL)) {
return NXT_ERROR;
}
rt->thread_pools = thread_pools;
ret = nxt_runtime_thread_pool_create(thr, rt, 2, 60000 * 1000000LL);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
return NXT_OK;
}
static void
nxt_runtime_start(nxt_task_t *task, void *obj, void *data)
{
nxt_runtime_t *rt;
rt = obj;
nxt_debug(task, "rt conf done");
task->thread->log->ctx_handler = NULL;
task->thread->log->ctx = NULL;
if (nxt_runtime_log_files_create(task, rt) != NXT_OK) {
goto fail;
}
if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
goto fail;
}
/*
* Thread pools should be destroyed before starting worker
* processes, because thread pool semaphores will stick in
* locked state in new processes after fork().
*/
nxt_runtime_thread_pool_destroy(task, rt, rt->start);
return;
fail:
nxt_runtime_quit(task, 1);
}
static void
nxt_runtime_initial_start(nxt_task_t *task, nxt_uint_t status)
{
nxt_int_t ret;
nxt_thread_t *thr;
nxt_runtime_t *rt;
const nxt_event_interface_t *interface;
thr = task->thread;
rt = thr->runtime;
if (rt->inherited_sockets == NULL && rt->daemon) {
if (nxt_process_daemon(task) != NXT_OK) {
goto fail;
}
/*
* An event engine should be updated after fork()
* even if an event facility was not changed because:
* 1) inherited kqueue descriptor is invalid,
* 2) the signal thread is not inherited.
*/
interface = nxt_service_get(rt->services, "engine", rt->engine);
if (interface == NULL) {
goto fail;
}
ret = nxt_event_engine_change(task->thread->engine, interface,
rt->batch);
if (ret != NXT_OK) {
goto fail;
}
}
ret = nxt_runtime_pid_file_create(task, rt->pid_file);
if (ret != NXT_OK) {
goto fail;
}
if (nxt_runtime_event_engine_change(task, rt) != NXT_OK) {
goto fail;
}
thr->engine->max_connections = rt->engine_connections;
if (nxt_main_process_start(thr, task, rt) != NXT_ERROR) {
return;
}
fail:
nxt_runtime_quit(task, 1);
}
void
nxt_runtime_quit(nxt_task_t *task, nxt_uint_t status)
{
nxt_bool_t done;
nxt_runtime_t *rt;
nxt_event_engine_t *engine;
rt = task->thread->runtime;
rt->status |= status;
engine = task->thread->engine;
nxt_debug(task, "exiting");
done = 1;
if (!engine->shutdown) {
engine->shutdown = 1;
if (!nxt_array_is_empty(rt->thread_pools)) {
nxt_runtime_thread_pool_destroy(task, rt, nxt_runtime_quit);
done = 0;
}
if (rt->type == NXT_PROCESS_MAIN) {
nxt_runtime_stop_all_processes(task, rt);
done = 0;
}
}
nxt_runtime_close_idle_connections(engine);
if (done) {
nxt_work_queue_add(&engine->fast_work_queue, nxt_runtime_exit,
task, rt, engine);
}
}
static void
nxt_runtime_close_idle_connections(nxt_event_engine_t *engine)
{
nxt_conn_t *c;
nxt_queue_t *idle;
nxt_queue_link_t *link, *next;
nxt_debug(&engine->task, "close idle connections");
idle = &engine->idle_connections;
for (link = nxt_queue_first(idle);
link != nxt_queue_tail(idle);
link = next)
{
next = nxt_queue_next(link);
c = nxt_queue_link_data(link, nxt_conn_t, link);
if (!c->socket.read_ready) {
nxt_queue_remove(link);
nxt_conn_close(engine, c);
}
}
}
void
nxt_runtime_stop_app_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_port_t *port;
nxt_process_t *process;
nxt_process_init_t *init;
nxt_runtime_process_each(rt, process) {
init = nxt_process_init(process);
if (init->type == NXT_PROCESS_APP
|| init->type == NXT_PROCESS_PROTOTYPE)
{
nxt_process_port_each(process, port) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
0, 0, NULL);
} nxt_process_port_loop;
}
} nxt_runtime_process_loop;
}
static void
nxt_runtime_stop_all_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_port_t *port;
nxt_process_t *process;
nxt_runtime_process_each(rt, process) {
nxt_process_port_each(process, port) {
nxt_debug(task, "%d sending quit to %PI", rt->type, port->pid);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0,
0, NULL);
} nxt_process_port_loop;
} nxt_runtime_process_loop;
}
static void
nxt_runtime_exit(nxt_task_t *task, void *obj, void *data)
{
int status, engine_count;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_event_engine_t *engine;
rt = obj;
engine = data;
nxt_debug(task, "thread pools: %d", rt->thread_pools->nelts);
if (!nxt_array_is_empty(rt->thread_pools)) {
return;
}
if (rt->type == NXT_PROCESS_MAIN) {
if (rt->pid_file != NULL) {
nxt_file_delete(rt->pid_file);
}
#if (NXT_HAVE_UNIX_DOMAIN)
{
nxt_sockaddr_t *sa;
nxt_file_name_t *name;
sa = rt->controller_listen;
if (sa->u.sockaddr.sa_family == AF_UNIX) {
name = (nxt_file_name_t *) sa->u.sockaddr_un.sun_path;
(void) nxt_file_delete(name);
}
}
#endif
}
if (!engine->event.signal_support) {
nxt_event_engine_signals_stop(engine);
}
nxt_runtime_process_each(rt, process) {
nxt_runtime_process_remove(rt, process);
nxt_process_close_ports(task, process);
} nxt_runtime_process_loop;
status = rt->status;
engine_count = 0;
nxt_queue_each(engine, &rt->engines, nxt_event_engine_t, link) {
engine_count++;
} nxt_queue_loop;
if (engine_count <= 1) {
if (rt->port_by_type[rt->type] != NULL) {
nxt_port_use(task, rt->port_by_type[rt->type], -1);
}
nxt_thread_mutex_destroy(&rt->processes_mutex);
nxt_mp_destroy(rt->mem_pool);
}
nxt_debug(task, "exit: %d", status);
exit(status);
nxt_unreachable();
}
static nxt_int_t
nxt_runtime_event_engine_change(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_event_engine_t *engine;
const nxt_event_interface_t *interface;
engine = task->thread->engine;
if (engine->batch == rt->batch
&& nxt_strcmp(engine->event.name, rt->engine) == 0)
{
return NXT_OK;
}
interface = nxt_service_get(rt->services, "engine", rt->engine);
if (interface != NULL) {
return nxt_event_engine_change(engine, interface, rt->batch);
}
return NXT_ERROR;
}
void
nxt_runtime_event_engine_free(nxt_runtime_t *rt)
{
nxt_queue_link_t *link;
nxt_event_engine_t *engine;
link = nxt_queue_first(&rt->engines);
nxt_queue_remove(link);
engine = nxt_queue_link_data(link, nxt_event_engine_t, link);
nxt_event_engine_free(engine);
}
nxt_int_t
nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
nxt_uint_t max_threads, nxt_nsec_t timeout)
{
nxt_thread_pool_t *thread_pool, **tp;
tp = nxt_array_add(rt->thread_pools);
if (tp == NULL) {
return NXT_ERROR;
}
thread_pool = nxt_thread_pool_create(max_threads, timeout,
nxt_runtime_thread_pool_init,
thr->engine,
nxt_runtime_thread_pool_exit);
if (nxt_fast_path(thread_pool != NULL)) {
*tp = thread_pool;
}
return NXT_OK;
}
static void
nxt_runtime_thread_pool_destroy(nxt_task_t *task, nxt_runtime_t *rt,
nxt_runtime_cont_t cont)
{
nxt_uint_t n;
nxt_thread_pool_t **tp;
rt->continuation = cont;
n = rt->thread_pools->nelts;
if (n == 0) {
cont(task, 0);
return;
}
tp = rt->thread_pools->elts;
do {
nxt_thread_pool_destroy(*tp);
tp++;
n--;
} while (n != 0);
}
static void
nxt_runtime_thread_pool_init(void)
{
}
static void
nxt_runtime_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
{
nxt_uint_t i, n;
nxt_runtime_t *rt;
nxt_thread_pool_t *tp, **thread_pools;
nxt_thread_handle_t handle;
tp = obj;
if (data != NULL) {
handle = (nxt_thread_handle_t) (uintptr_t) data;
nxt_thread_wait(handle);
}
rt = task->thread->runtime;
thread_pools = rt->thread_pools->elts;
n = rt->thread_pools->nelts;
nxt_debug(task, "thread pools: %ui", n);
for (i = 0; i < n; i++) {
if (tp == thread_pools[i]) {
nxt_array_remove(rt->thread_pools, &thread_pools[i]);
nxt_free(tp);
if (n == 1) {
/* The last thread pool. */
rt->continuation(task, 0);
}
return;
}
}
}
static nxt_int_t
nxt_runtime_conf_init(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_str_t control;
nxt_uint_t n;
nxt_file_t *file;
const char *slash;
nxt_sockaddr_t *sa;
nxt_file_name_str_t file_name;
const nxt_event_interface_t *interface;
rt->daemon = 1;
rt->engine_connections = 256;
rt->auxiliary_threads = 2;
rt->user_cred.user = NXT_USER;
rt->group = NXT_GROUP;
rt->pid = NXT_PID;
rt->log = NXT_LOG;
rt->modules = NXT_MODULES;
rt->state = NXT_STATE;
rt->control = NXT_CONTROL_SOCK;
rt->tmp = NXT_TMP;
nxt_memzero(&rt->capabilities, sizeof(nxt_capabilities_t));
if (nxt_runtime_conf_read_cmd(task, rt) != NXT_OK) {
return NXT_ERROR;
}
if (nxt_capability_set(task, &rt->capabilities) != NXT_OK) {
return NXT_ERROR;
}
if (rt->capabilities.setid) {
ret = nxt_credential_get(task, rt->mem_pool, &rt->user_cred,
rt->group);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
} else {
nxt_log(task, NXT_LOG_WARN, "Unit is running unprivileged, then it "
"cannot use arbitrary user and group.");
}
/* An engine's parameters. */
interface = nxt_service_get(rt->services, "engine", rt->engine);
if (interface == NULL) {
return NXT_ERROR;
}
rt->engine = interface->name;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->pid);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
rt->pid_file = file_name.start;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%Z", rt->log);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
file = nxt_list_first(rt->log_files);
file->name = file_name.start;
slash = "";
n = nxt_strlen(rt->modules);
if (n > 1 && rt->modules[n - 1] != '/') {
slash = "/";
}
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%s*.unit.so%Z",
rt->modules, slash);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
rt->modules = (char *) file_name.start;
slash = "";
n = nxt_strlen(rt->state);
if (n > 1 && rt->state[n - 1] != '/') {
slash = "/";
}
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sversion%Z",
rt->state, slash);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
rt->ver = (char *) file_name.start;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->ver);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
rt->ver_tmp = (char *) file_name.start;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%sconf.json%Z",
rt->state, slash);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
rt->conf = (char *) file_name.start;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s.tmp%Z", rt->conf);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
rt->conf_tmp = (char *) file_name.start;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "%s%scerts/%Z",
rt->state, slash);
if (nxt_slow_path(ret != NXT_OK)) {
return NXT_ERROR;
}
ret = mkdir((char *) file_name.start, S_IRWXU);
if (nxt_fast_path(ret == 0 || nxt_errno == EEXIST)) {
rt->certs.length = file_name.len;
rt->certs.start = file_name.start;
} else {
nxt_alert(task, "Unable to create certificates storage directory: "
"mkdir(%s) failed %E", file_name.start, nxt_errno);
}
control.length = nxt_strlen(rt->control);
control.start = (u_char *) rt->control;
sa = nxt_sockaddr_parse(rt->mem_pool, &control);
if (nxt_slow_path(sa == NULL)) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
rt->controller_listen = sa;
if (nxt_runtime_controller_socket(task, rt) != NXT_OK) {
return NXT_ERROR;
}
return NXT_OK;
}
static nxt_int_t
nxt_runtime_conf_read_cmd(nxt_task_t *task, nxt_runtime_t *rt)
{
char *p, **argv;
u_char *end;
u_char buf[1024];
static const char version[] =
"unit version: " NXT_VERSION "\n"
"configured as ./configure" NXT_CONFIGURE_OPTIONS "\n";
static const char no_control[] =
"option \"--control\" requires socket address\n";
static const char no_user[] = "option \"--user\" requires username\n";
static const char no_group[] = "option \"--group\" requires group name\n";
static const char no_pid[] = "option \"--pid\" requires filename\n";
static const char no_log[] = "option \"--log\" requires filename\n";
static const char no_modules[] =
"option \"--modules\" requires directory\n";
static const char no_state[] = "option \"--state\" requires directory\n";
static const char no_tmp[] = "option \"--tmp\" requires directory\n";
static const char help[] =
"\n"
"unit options:\n"
"\n"
" --version print unit version and configure options\n"
"\n"
" --no-daemon run unit in non-daemon mode\n"
"\n"
" --control ADDRESS set address of control API socket\n"
" default: \"" NXT_CONTROL_SOCK "\"\n"
"\n"
" --pid FILE set pid filename\n"
" default: \"" NXT_PID "\"\n"
"\n"
" --log FILE set log filename\n"
" default: \"" NXT_LOG "\"\n"
"\n"
" --modules DIRECTORY set modules directory name\n"
" default: \"" NXT_MODULES "\"\n"
"\n"
" --state DIRECTORY set state directory name\n"
" default: \"" NXT_STATE "\"\n"
"\n"
" --tmp DIRECTORY set tmp directory name\n"
" default: \"" NXT_TMP "\"\n"
"\n"
" --user USER set non-privileged processes to run"
" as specified user\n"
" default: \"" NXT_USER "\"\n"
"\n"
" --group GROUP set non-privileged processes to run"
" as specified group\n"
" default: ";
static const char group[] = "\"" NXT_GROUP "\"\n\n";
static const char primary[] = "user's primary group\n\n";
argv = &nxt_process_argv[1];
while (*argv != NULL) {
p = *argv++;
if (nxt_strcmp(p, "--control") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_control, nxt_length(no_control));
return NXT_ERROR;
}
p = *argv++;
rt->control = p;
continue;
}
if (nxt_strcmp(p, "--user") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_user, nxt_length(no_user));
return NXT_ERROR;
}
p = *argv++;
rt->user_cred.user = p;
continue;
}
if (nxt_strcmp(p, "--group") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_group, nxt_length(no_group));
return NXT_ERROR;
}
p = *argv++;
rt->group = p;
continue;
}
if (nxt_strcmp(p, "--pid") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_pid, nxt_length(no_pid));
return NXT_ERROR;
}
p = *argv++;
rt->pid = p;
continue;
}
if (nxt_strcmp(p, "--log") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_log, nxt_length(no_log));
return NXT_ERROR;
}
p = *argv++;
rt->log = p;
continue;
}
if (nxt_strcmp(p, "--modules") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_modules, nxt_length(no_modules));
return NXT_ERROR;
}
p = *argv++;
rt->modules = p;
continue;
}
if (nxt_strcmp(p, "--state") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_state, nxt_length(no_state));
return NXT_ERROR;
}
p = *argv++;
rt->state = p;
continue;
}
if (nxt_strcmp(p, "--tmp") == 0) {
if (*argv == NULL) {
write(STDERR_FILENO, no_tmp, nxt_length(no_tmp));
return NXT_ERROR;
}
p = *argv++;
rt->tmp = p;
continue;
}
if (nxt_strcmp(p, "--no-daemon") == 0) {
rt->daemon = 0;
continue;
}
if (nxt_strcmp(p, "--version") == 0) {
write(STDERR_FILENO, version, nxt_length(version));
exit(0);
}
if (nxt_strcmp(p, "--help") == 0 || nxt_strcmp(p, "-h") == 0) {
write(STDOUT_FILENO, help, nxt_length(help));
if (sizeof(NXT_GROUP) == 1) {
write(STDOUT_FILENO, primary, nxt_length(primary));
} else {
write(STDOUT_FILENO, group, nxt_length(group));
}
exit(0);
}
end = nxt_sprintf(buf, buf + sizeof(buf), "unknown option \"%s\", "
"try \"%s -h\" for available options\n",
p, nxt_process_argv[0]);
write(STDERR_FILENO, buf, end - buf);
return NXT_ERROR;
}
return NXT_OK;
}
nxt_listen_socket_t *
nxt_runtime_listen_socket_add(nxt_runtime_t *rt, nxt_sockaddr_t *sa)
{
nxt_mp_t *mp;
nxt_listen_socket_t *ls;
ls = nxt_array_zero_add(rt->listen_sockets);
if (ls == NULL) {
return NULL;
}
mp = rt->mem_pool;
ls->sockaddr = nxt_sockaddr_create(mp, &sa->u.sockaddr, sa->socklen,
sa->length);
if (ls->sockaddr == NULL) {
return NULL;
}
ls->sockaddr->type = sa->type;
nxt_sockaddr_text(ls->sockaddr);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;
return ls;
}
static nxt_int_t
nxt_runtime_hostname(nxt_task_t *task, nxt_runtime_t *rt)
{
size_t length;
char hostname[NXT_MAXHOSTNAMELEN + 1];
if (gethostname(hostname, NXT_MAXHOSTNAMELEN) != 0) {
nxt_alert(task, "gethostname() failed %E", nxt_errno);
return NXT_ERROR;
}
/*
* Linux gethostname(2):
*
* If the null-terminated hostname is too large to fit,
* then the name is truncated, and no error is returned.
*
* For this reason an additional byte is reserved in the buffer.
*/
hostname[NXT_MAXHOSTNAMELEN] = '\0';
length = nxt_strlen(hostname);
rt->hostname.length = length;
rt->hostname.start = nxt_mp_nget(rt->mem_pool, length);
if (rt->hostname.start != NULL) {
nxt_memcpy_lowcase(rt->hostname.start, (u_char *) hostname, length);
return NXT_OK;
}
return NXT_ERROR;
}
static nxt_int_t
nxt_runtime_log_files_init(nxt_runtime_t *rt)
{
nxt_file_t *file;
nxt_list_t *log_files;
log_files = nxt_list_create(rt->mem_pool, 1, sizeof(nxt_file_t));
if (nxt_fast_path(log_files != NULL)) {
rt->log_files = log_files;
/* Preallocate the main log. This allocation cannot fail. */
file = nxt_list_zero_add(log_files);
file->fd = NXT_FILE_INVALID;
file->log_level = NXT_LOG_ALERT;
return NXT_OK;
}
return NXT_ERROR;
}
nxt_file_t *
nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name)
{
nxt_int_t ret;
nxt_file_t *file;
nxt_file_name_str_t file_name;
ret = nxt_file_name_create(rt->mem_pool, &file_name, "V%Z", name);
if (nxt_slow_path(ret != NXT_OK)) {
return NULL;
}
nxt_list_each(file, rt->log_files) {
/* STUB: hardecoded case sensitive/insensitive. */
if (file->name != NULL
&& nxt_file_name_eq(file->name, file_name.start))
{
return file;
}
} nxt_list_loop;
file = nxt_list_zero_add(rt->log_files);
if (nxt_slow_path(file == NULL)) {
return NULL;
}
file->fd = NXT_FILE_INVALID;
file->log_level = NXT_LOG_ALERT;
file->name = file_name.start;
return file;
}
static nxt_int_t
nxt_runtime_log_files_create(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_file_t *file;
nxt_list_each(file, rt->log_files) {
ret = nxt_file_open(task, file, O_WRONLY | O_APPEND, O_CREAT,
NXT_FILE_OWNER_ACCESS);
if (ret != NXT_OK) {
return NXT_ERROR;
}
} nxt_list_loop;
file = nxt_list_first(rt->log_files);
return nxt_file_stderr(file);
}
nxt_int_t
nxt_runtime_listen_sockets_create(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_uint_t c, p, ncurr, nprev;
nxt_listen_socket_t *curr, *prev;
curr = rt->listen_sockets->elts;
ncurr = rt->listen_sockets->nelts;
if (rt->inherited_sockets != NULL) {
prev = rt->inherited_sockets->elts;
nprev = rt->inherited_sockets->nelts;
} else {
prev = NULL;
nprev = 0;
}
for (c = 0; c < ncurr; c++) {
for (p = 0; p < nprev; p++) {
if (nxt_sockaddr_cmp(curr[c].sockaddr, prev[p].sockaddr)) {
ret = nxt_listen_socket_update(task, &curr[c], &prev[p]);
if (ret != NXT_OK) {
return NXT_ERROR;
}
goto next;
}
}
if (nxt_listen_socket_create(task, rt->mem_pool, &curr[c]) != NXT_OK) {
return NXT_ERROR;
}
next:
continue;
}
return NXT_OK;
}
nxt_int_t
nxt_runtime_listen_sockets_enable(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_uint_t i, n;
nxt_listen_socket_t *ls;
ls = rt->listen_sockets->elts;
n = rt->listen_sockets->nelts;
for (i = 0; i < n; i++) {
if (ls[i].flags == NXT_NONBLOCK) {
if (nxt_listen_event(task, &ls[i]) == NULL) {
return NXT_ERROR;
}
}
}
return NXT_OK;
}
nxt_str_t *
nxt_current_directory(nxt_mp_t *mp)
{
size_t length;
u_char *p;
nxt_str_t *name;
char buf[NXT_MAX_PATH_LEN];
length = nxt_dir_current(buf, NXT_MAX_PATH_LEN);
if (nxt_fast_path(length != 0)) {
name = nxt_str_alloc(mp, length + 1);
if (nxt_fast_path(name != NULL)) {
p = nxt_cpymem(name->start, buf, length);
*p = '/';
return name;
}
}
return NULL;
}
static nxt_int_t
nxt_runtime_pid_file_create(nxt_task_t *task, nxt_file_name_t *pid_file)
{
ssize_t length;
nxt_int_t n;
nxt_file_t file;
u_char pid[NXT_INT64_T_LEN + nxt_length("\n")];
nxt_memzero(&file, sizeof(nxt_file_t));
file.name = pid_file;
nxt_fs_mkdir_parent(pid_file, 0755);
n = nxt_file_open(task, &file, O_WRONLY, O_CREAT | O_TRUNC,
NXT_FILE_DEFAULT_ACCESS);
if (n != NXT_OK) {
return NXT_ERROR;
}
length = nxt_sprintf(pid, pid + sizeof(pid), "%PI%n", nxt_pid) - pid;
if (nxt_file_write(&file, pid, length, 0) != length) {
return NXT_ERROR;
}
nxt_file_close(task, &file);
return NXT_OK;
}
void
nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_process_t *child;
if (process->registered == 1) {
nxt_runtime_process_remove(rt, process);
}
if (process->link.next != NULL) {
nxt_queue_remove(&process->link);
}
nxt_queue_each(child, &process->children, nxt_process_t, link) {
nxt_queue_remove(&child->link);
child->link.next = NULL;
} nxt_queue_loop;
nxt_assert(process->use_count == 0);
nxt_assert(process->registered == 0);
nxt_assert(nxt_queue_is_empty(&process->ports));
nxt_port_mmaps_destroy(&process->incoming, 1);
nxt_thread_mutex_destroy(&process->incoming.mutex);
/* processes from nxt_runtime_process_get() have no memory pool */
if (process->mem_pool != NULL) {
nxt_mp_destroy(process->mem_pool);
}
nxt_mp_free(rt->mem_pool, process);
}
static nxt_int_t
nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
{
nxt_process_t *process;
process = data;
if (lhq->key.length == sizeof(nxt_pid_t)
&& *(nxt_pid_t *) lhq->key.start == process->pid)
{
return NXT_OK;
}
return NXT_DECLINED;
}
static const nxt_lvlhsh_proto_t lvlhsh_processes_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_runtime_lvlhsh_pid_test,
nxt_lvlhsh_alloc,
nxt_lvlhsh_free,
};
nxt_inline void
nxt_runtime_process_lhq_pid(nxt_lvlhsh_query_t *lhq, nxt_pid_t *pid)
{
lhq->key_hash = nxt_murmur_hash2(pid, sizeof(*pid));
lhq->key.length = sizeof(*pid);
lhq->key.start = (u_char *) pid;
lhq->proto = &lvlhsh_processes_proto;
}
nxt_process_t *
nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
{
nxt_process_t *process;
nxt_lvlhsh_query_t lhq;
process = NULL;
nxt_runtime_process_lhq_pid(&lhq, &pid);
nxt_thread_mutex_lock(&rt->processes_mutex);
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
process = lhq.value;
} else {
nxt_thread_log_debug("process %PI not found", pid);
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
return process;
}
static nxt_process_t *
nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
{
nxt_process_t *process;
nxt_lvlhsh_query_t lhq;
nxt_runtime_process_lhq_pid(&lhq, &pid);
nxt_thread_mutex_lock(&rt->processes_mutex);
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
nxt_thread_log_debug("process %PI found", pid);
nxt_thread_mutex_unlock(&rt->processes_mutex);
process = lhq.value;
process->use_count++;
return process;
}
process = nxt_process_new(rt);
if (nxt_slow_path(process == NULL)) {
nxt_thread_mutex_unlock(&rt->processes_mutex);
return NULL;
}
process->pid = pid;
lhq.replace = 0;
lhq.value = process;
lhq.pool = rt->mem_pool;
switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
case NXT_OK:
if (rt->nprocesses == 0) {
rt->mprocess = process;
}
rt->nprocesses++;
process->registered = 1;
nxt_thread_log_debug("process %PI insert", pid);
break;
default:
nxt_thread_log_debug("process %PI insert failed", pid);
break;
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
return process;
}
void
nxt_runtime_process_add(nxt_task_t *task, nxt_process_t *process)
{
nxt_port_t *port;
nxt_runtime_t *rt;
nxt_lvlhsh_query_t lhq;
nxt_assert(process->registered == 0);
rt = task->thread->runtime;
nxt_runtime_process_lhq_pid(&lhq, &process->pid);
lhq.replace = 0;
lhq.value = process;
lhq.pool = rt->mem_pool;
nxt_thread_mutex_lock(&rt->processes_mutex);
switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
case NXT_OK:
if (rt->nprocesses == 0) {
rt->mprocess = process;
}
rt->nprocesses++;
nxt_process_port_each(process, port) {
port->pid = process->pid;
nxt_runtime_port_add(task, port);
} nxt_process_port_loop;
process->registered = 1;
nxt_debug(task, "process %PI added", process->pid);
break;
default:
nxt_alert(task, "process %PI failed to add", process->pid);
break;
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
}
void
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_pid_t pid;
nxt_lvlhsh_query_t lhq;
nxt_assert(process->registered != 0);
pid = process->pid;
nxt_runtime_process_lhq_pid(&lhq, &pid);
lhq.pool = rt->mem_pool;
nxt_thread_mutex_lock(&rt->processes_mutex);
switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
case NXT_OK:
nxt_assert(lhq.value == process);
rt->nprocesses--;
process->registered = 0;
nxt_thread_log_debug("process %PI removed", pid);
break;
default:
nxt_thread_log_alert("process %PI remove failed", pid);
break;
}
nxt_thread_mutex_unlock(&rt->processes_mutex);
}
nxt_process_t *
nxt_runtime_process_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
{
nxt_lvlhsh_each_init(lhe, &lvlhsh_processes_proto);
return nxt_runtime_process_next(rt, lhe);
}
nxt_port_t *
nxt_runtime_process_port_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_pid_t pid, nxt_port_id_t id, nxt_process_type_t type)
{
nxt_port_t *port;
nxt_process_t *process;
process = nxt_runtime_process_get(rt, pid);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
port = nxt_port_new(task, id, pid, type);
if (nxt_slow_path(port == NULL)) {
nxt_process_use(task, process, -1);
return NULL;
}
nxt_process_port_add(task, process, port);
nxt_process_use(task, process, -1);
nxt_runtime_port_add(task, port);
nxt_port_use(task, port, -1);
return port;
}
static void
nxt_runtime_port_add(nxt_task_t *task, nxt_port_t *port)
{
nxt_int_t res;
nxt_runtime_t *rt;
rt = task->thread->runtime;
res = nxt_port_hash_add(&rt->ports, port);
if (res != NXT_OK) {
return;
}
rt->port_by_type[port->type] = port;
nxt_port_use(task, port, 1);
}
void
nxt_runtime_port_remove(nxt_task_t *task, nxt_port_t *port)
{
nxt_int_t res;
nxt_runtime_t *rt;
rt = task->thread->runtime;
res = nxt_port_hash_remove(&rt->ports, port);
if (res != NXT_OK) {
return;
}
if (rt->port_by_type[port->type] == port) {
rt->port_by_type[port->type] = NULL;
}
nxt_port_use(task, port, -1);
}
nxt_port_t *
nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
nxt_port_id_t port_id)
{
return nxt_port_hash_find(&rt->ports, pid, port_id);
}