Processes refactoring.

The cycle has been renamed to the runtime.
This commit is contained in:
Igor Sysoev
2017-03-09 18:03:27 +03:00
parent 5745e48264
commit 6f2c9acd18
47 changed files with 2607 additions and 2688 deletions

View File

@@ -237,7 +237,7 @@ fi
cat << END > Makefile
all: libnxt $NXT_BIN
all: $NXT_BIN
libnxt:
make -f $NXT_MAKEFILE libnxt

View File

@@ -18,7 +18,7 @@ NXT_MODULES_SRC=$NXT_BUILD_DIR/nxt_modules.c
cat << END > $NXT_MODULES_SRC
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
END
@@ -26,7 +26,7 @@ END
for nxt_init in $NXT_MODULES_INIT
do
$echo "extern nxt_int_t $nxt_init(nxt_thread_t *thr, nxt_cycle_t *cycle);" \
$echo "extern nxt_int_t $nxt_init(nxt_thread_t *thr, nxt_runtime_t *rt);" \
>> $NXT_MODULES_SRC
done

View File

@@ -82,6 +82,7 @@ NXT_LIB_SRCS=" \
src/nxt_process_title.c \
src/nxt_signal.c \
src/nxt_port_socket.c \
src/nxt_port.c \
src/nxt_dyld.c \
src/nxt_random.c \
src/nxt_queue.c \
@@ -320,7 +321,7 @@ if [ $NXT_LIB_UNIT_TEST = YES ]; then
fi
NXT_DEPS=" \
src/nxt_cycle.h \
src/nxt_runtime.h \
src/nxt_application.h \
src/nxt_master_process.h \
"
@@ -328,10 +329,11 @@ NXT_DEPS=" \
NXT_SRCS=" \
src/nxt_main.c \
src/nxt_app_log.c \
src/nxt_cycle.c \
src/nxt_port.c \
src/nxt_application.c \
src/nxt_runtime.c \
src/nxt_stream_module.c \
src/nxt_master_process.c \
src/nxt_worker_process.c \
src/nxt_controller.c \
src/nxt_router.c \
src/nxt_application.c \
"

View File

@@ -5,7 +5,7 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
static nxt_time_string_t nxt_log_error_time_cache;

View File

@@ -6,13 +6,14 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
#include <nxt_application.h>
#define NXT_PARSE_AGAIN (u_char *) -1
static nxt_int_t nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
static void nxt_app_thread(void *ctx);
static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
nxt_log_t *log);
@@ -77,11 +78,15 @@ static nxt_uint_t nxt_app_buf_max_number = 16;
nxt_int_t
nxt_app_start(nxt_cycle_t *cycle)
nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_thread_link_t *link;
nxt_thread_handle_t handle;
if (nxt_app_listen_socket(task, rt) != NXT_OK) {
return NXT_ERROR;
}
if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
return NXT_ERROR;
}
@@ -94,7 +99,7 @@ nxt_app_start(nxt_cycle_t *cycle)
if (nxt_fast_path(link != NULL)) {
link->start = nxt_app_thread;
link->data = cycle;
link->data = rt;
return nxt_thread_create(&handle, link);
}
@@ -103,6 +108,39 @@ nxt_app_start(nxt_cycle_t *cycle)
}
static nxt_int_t
nxt_app_listen_socket(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_sockaddr_t *sa;
nxt_listen_socket_t *ls;
sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
NXT_INET_ADDR_STR_LEN);
if (sa == NULL) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
sa->u.sockaddr_in.sin_family = AF_INET;
sa->u.sockaddr_in.sin_port = htons(8080);
nxt_sockaddr_text(sa);
ls = nxt_runtime_listen_socket_add(rt, sa);
if (ls == NULL) {
return NXT_ERROR;
}
ls->read_after_accept = 1;
if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
return NXT_ERROR;
}
return NXT_OK;
}
#define SIZE 4096
static void
@@ -110,9 +148,9 @@ nxt_app_thread(void *ctx)
{
ssize_t n;
nxt_err_t err;
nxt_cycle_t *cycle;
nxt_socket_t s;
nxt_thread_t *thr;
nxt_runtime_t *rt;
nxt_app_request_t *r;
nxt_event_engine_t **engines;
nxt_listen_socket_t *ls;
@@ -124,8 +162,8 @@ nxt_app_thread(void *ctx)
nxt_log_debug(thr->log, "app thread");
cycle = ctx;
engines = cycle->engines->elts;
rt = ctx;
engines = rt->engines->elts;
nxt_app_engine = engines[0];
@@ -138,7 +176,7 @@ nxt_app_thread(void *ctx)
nxt_log_debug(thr->log, "application init failed");
}
ls = cycle->listen_sockets->elts;
ls = rt->listen_sockets->elts;
for ( ;; ) {
nxt_log_debug(thr->log, "wait on accept");

View File

@@ -53,7 +53,8 @@ typedef struct {
extern nxt_application_module_t nxt_python_module;
nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len);
nxt_int_t nxt_app_http_read_body(nxt_app_request_t *r, u_char *data,
size_t len);
nxt_int_t nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len);

254
src/nxt_controller.c Normal file
View File

@@ -0,0 +1,254 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_master_process.h>
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c,
uintptr_t data);
static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
void *data);
static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_int_t
nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt)
{
if (nxt_event_conn_listen(task, rt->controller_socket) != NXT_OK) {
return NXT_ERROR;
}
return NXT_OK;
}
nxt_int_t
nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_sockaddr_t *sa;
nxt_listen_socket_t *ls;
sa = rt->controller_listen;
if (rt->controller_listen == NULL) {
sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
NXT_INET_ADDR_STR_LEN);
if (sa == NULL) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
sa->u.sockaddr_in.sin_family = AF_INET;
sa->u.sockaddr_in.sin_port = htons(8443);
nxt_sockaddr_text(sa);
rt->controller_listen = sa;
}
ls = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_listen_socket_t));
if (ls == NULL) {
return NXT_ERROR;
}
ls->sockaddr = nxt_sockaddr_create(rt->mem_pool, &sa->u.sockaddr,
sa->socklen, sa->length);
if (ls->sockaddr == NULL) {
return NXT_ERROR;
}
ls->sockaddr->type = sa->type;
nxt_sockaddr_text(ls->sockaddr);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;
ls->read_after_accept = 1;
ls->flags = NXT_NONBLOCK;
#if 0
/* STUB */
wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t));
if (wq == NULL) {
return NXT_ERROR;
}
nxt_work_queue_name(wq, "listen");
/**/
ls->work_queue = wq;
#endif
ls->handler = nxt_controller_conn_init;
/*
* Connection memory pool chunk size is tunned to
* allocate the most data in one mem_pool chunk.
*/
ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls)
+ sizeof(nxt_event_conn_proxy_t)
+ sizeof(nxt_event_conn_t)
+ 4 * sizeof(nxt_buf_t);
if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
return NXT_ERROR;
}
rt->controller_socket = ls;
return NXT_OK;
}
static void
nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_event_conn_t *c;
nxt_event_engine_t *engine;
c = obj;
nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
if (nxt_slow_path(b == NULL)) {
nxt_controller_conn_free(task, c, NULL);
return;
}
c->read = b;
c->socket.read_ready = 1;
c->read_state = &nxt_controller_conn_read_state;
engine = task->thread->engine;
c->read_work_queue = &engine->read_work_queue;
nxt_event_conn_read(engine, c);
}
static const nxt_event_conn_state_t nxt_controller_conn_read_state
nxt_aligned(64) =
{
NXT_EVENT_NO_BUF_PROCESS,
NXT_EVENT_TIMER_NO_AUTORESET,
nxt_controller_conn_read,
nxt_controller_conn_close,
nxt_controller_conn_read_error,
nxt_controller_conn_read_timeout,
nxt_controller_conn_timeout_value,
60 * 1000,
};
static void
nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn read");
nxt_controller_conn_close(task, c, c->socket.data);
}
static nxt_msec_t
nxt_controller_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
{
return (nxt_msec_t) data;
}
static void
nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn read error");
nxt_controller_conn_close(task, c, c->socket.data);
}
static void
nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *ev;
nxt_event_conn_t *c;
ev = obj;
c = nxt_event_read_timer_conn(ev);
c->socket.timedout = 1;
c->socket.closed = 1;
nxt_debug(task, "controller conn read timeout");
nxt_controller_conn_close(task, c, c->socket.data);
}
static const nxt_event_conn_state_t nxt_controller_conn_close_state
nxt_aligned(64) =
{
NXT_EVENT_NO_BUF_PROCESS,
NXT_EVENT_TIMER_NO_AUTORESET,
nxt_controller_conn_free,
NULL,
NULL,
NULL,
NULL,
0,
};
static void
nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn close");
c->write_state = &nxt_controller_conn_close_state;
nxt_event_conn_close(task->thread->engine, c);
}
static void
nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn free");
nxt_mem_pool_destroy(c->mem_pool);
//nxt_free(c);
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,159 +0,0 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_CYCLE_H_INCLUDED_
#define _NXT_CYCLE_H_INCLUDED_
typedef enum {
NXT_PROCESS_SINGLE = 0,
NXT_PROCESS_MASTER,
NXT_PROCESS_WORKER,
} nxt_process_type_e;
typedef void (*nxt_cycle_cont_t)(nxt_task_t *task, nxt_cycle_t *cycle);
struct nxt_cycle_s {
nxt_mem_pool_t *mem_pool;
nxt_cycle_t *previous;
nxt_array_t *inherited_sockets; /* of nxt_listen_socket_t */
nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */
nxt_array_t *services; /* of nxt_service_t */
nxt_array_t *engines; /* of nxt_event_engine_t */
nxt_cycle_cont_t start;
nxt_str_t *conf_prefix;
nxt_str_t *prefix;
nxt_str_t hostname;
nxt_file_name_t *pid_file;
nxt_file_name_t *oldbin_file;
nxt_pid_t new_binary;
#if (NXT_THREADS)
nxt_array_t *thread_pools; /* of nxt_thread_pool_t */
nxt_cycle_cont_t continuation;
#endif
nxt_array_t *ports; /* of nxt_port_t */
nxt_list_t *log_files; /* of nxt_file_t */
nxt_array_t *shm_zones; /* of nxt_cycle_shm_zone_t */
uint32_t process_generation;
uint32_t current_process;
uint32_t last_engine_id;
nxt_process_type_e type;
uint8_t test_config; /* 1 bit */
uint8_t reconfiguring; /* 1 bit */
void **core_ctx;
nxt_timer_t timer;
uint8_t daemon;
uint8_t batch;
uint8_t master_process;
const char *engine;
uint32_t engine_connections;
uint32_t worker_processes;
uint32_t auxiliary_threads;
nxt_user_cred_t user_cred;
const char *group;
const char *pid;
const char *error_log;
nxt_sockaddr_t *app_listen;
nxt_sockaddr_t *stream_listen;
nxt_str_t upstream;
};
typedef struct {
void *addr;
size_t size;
nxt_uint_t page_size;
nxt_str_t name;
} nxt_cycle_shm_zone_t;
typedef nxt_int_t (*nxt_module_init_t)(nxt_thread_t *thr, nxt_cycle_t *cycle);
nxt_thread_extern_data(nxt_cycle_t *, nxt_thread_cycle_data);
nxt_inline void
nxt_thread_cycle_set(nxt_cycle_t *cycle)
{
nxt_cycle_t **p;
p = nxt_thread_get_data(nxt_thread_cycle_data);
*p = cycle;
}
nxt_inline nxt_cycle_t *
nxt_thread_cycle(void)
{
nxt_cycle_t **p;
p = nxt_thread_get_data(nxt_thread_cycle_data);
return *p;
}
nxt_int_t nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task,
nxt_cycle_t *previous, nxt_cycle_cont_t start);
void nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle);
void nxt_cycle_event_engine_free(nxt_cycle_t *cycle);
#if (NXT_THREADS)
nxt_int_t nxt_cycle_thread_pool_create(nxt_thread_t *thr, nxt_cycle_t *cycle,
nxt_uint_t max_threads, nxt_nsec_t timeout);
#endif
/* STUB */
nxt_str_t *nxt_current_directory(nxt_mem_pool_t *mp);
nxt_int_t nxt_cycle_pid_file_create(nxt_file_name_t *pid_file, nxt_bool_t test);
nxt_listen_socket_t *nxt_cycle_listen_socket_add(nxt_cycle_t *cycle,
nxt_sockaddr_t *sa);
nxt_int_t nxt_cycle_listen_sockets_enable(nxt_task_t *task, nxt_cycle_t *cycle);
nxt_file_t *nxt_cycle_log_file_add(nxt_cycle_t *cycle, nxt_str_t *name);
nxt_int_t nxt_cycle_shm_zone_add(nxt_cycle_t *cycle, nxt_str_t *name,
size_t size, nxt_uint_t page_size);
/* STUB */
void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log,
const char *fmt, ...);
void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_app_start(nxt_cycle_t *cycle);
extern nxt_module_init_t nxt_init_modules[];
extern nxt_uint_t nxt_init_modules_n;
#endif /* _NXT_CYCLE_H_INCLIDED_ */

View File

@@ -65,7 +65,7 @@ typedef struct {
/*
* The write() is an interface to write a buffer chain with a given rate
* limit. It calls write_chunk() in a cycle and handles write event timer.
* limit. It calls write_chunk() in a loop and handles write event timer.
*/
nxt_work_handler_t write;
@@ -175,7 +175,7 @@ typedef struct {
nxt_task_t task;
uint32_t ready;
uint32_t batch0;
uint32_t batch;
/* An accept() interface is cached to minimize memory accesses. */
nxt_work_handler_t accept;
@@ -299,13 +299,13 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
c->socket.task, c, c->socket.data)
#define nxt_event_conn_read(e, c) \
#define nxt_event_conn_read(engine, c) \
do { \
nxt_event_engine_t *engine = e; \
nxt_event_engine_t *e = engine; \
\
c->socket.read_work_queue = &engine->read_work_queue; \
c->socket.read_work_queue = &e->read_work_queue; \
\
nxt_work_queue_add(&engine->read_work_queue, c->io->read, \
nxt_work_queue_add(&e->read_work_queue, c->io->read, \
c->socket.task, c, c->socket.data); \
} while (0)

View File

@@ -44,7 +44,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->socket.fd = ls->socket;
engine = task->thread->engine;
cls->batch0 = engine->batch0;
cls->batch = engine->batch;
cls->socket.read_work_queue = &engine->accept_work_queue;
cls->socket.read_handler = nxt_event_conn_listen_handler;
@@ -130,7 +130,7 @@ nxt_event_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
nxt_event_conn_listen_t *cls;
cls = obj;
cls->ready = cls->batch0;
cls->ready = cls->batch;
cls->accept(task, cls, data);
}

View File

@@ -182,7 +182,7 @@ nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
b = jbs->out;
/* The job must be destroyed before connection error handler. */
nxt_job_destroy(jbs);
nxt_job_destroy(task, jbs);
if (c->write_state->process_buffers) {
b = nxt_event_conn_job_sendfile_completion(task, c, b);

View File

@@ -25,11 +25,12 @@ static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
nxt_event_engine_t *
nxt_event_engine_create(nxt_thread_t *thr,
nxt_event_engine_create(nxt_task_t *task,
const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
nxt_uint_t flags, nxt_uint_t batch)
{
nxt_uint_t events;
nxt_thread_t *thread;
nxt_event_engine_t *engine;
engine = nxt_zalloc(sizeof(nxt_event_engine_t));
@@ -37,14 +38,16 @@ nxt_event_engine_create(nxt_thread_t *thr,
return NULL;
}
engine->task.thread = thr;
engine->task.log = thr->log;
thread = task->thread;
engine->task.thread = thread;
engine->task.log = thread->log;
engine->task.ident = nxt_task_next_ident();
thr->engine = engine;
thr->fiber = &engine->fibers->fiber;
thread->engine = engine;
thread->fiber = &engine->fibers->fiber;
engine->batch0 = batch;
engine->batch = batch;
if (flags & NXT_ENGINE_FIBERS) {
engine->fibers = nxt_fiber_main_create(engine);
@@ -111,8 +114,10 @@ nxt_event_engine_create(nxt_thread_t *thr,
goto timers_fail;
}
nxt_thread_time_update(thr);
engine->timers.now = nxt_thread_monotonic_time(thr) / 1000000;
thread = task->thread;
nxt_thread_time_update(thread);
engine->timers.now = nxt_thread_monotonic_time(thread) / 1000000;
engine->max_connections = 0xffffffff;
@@ -122,7 +127,7 @@ nxt_event_engine_create(nxt_thread_t *thr,
#if !(NXT_THREADS)
if (interface->signal_support) {
thr->time.signal = -1;
thread->time.signal = -1;
}
#endif
@@ -368,14 +373,12 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
nxt_int_t
nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
nxt_event_engine_change(nxt_event_engine_t *engine,
const nxt_event_interface_t *interface, nxt_uint_t batch)
{
nxt_uint_t events;
nxt_event_engine_t *engine;
nxt_uint_t events;
engine = thr->engine;
engine->batch0 = batch;
engine->batch = batch;
if (!engine->event.signal_support && interface->signal_support) {
/*
@@ -388,7 +391,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
* Add to engine fast work queue the signal events possibly
* received before the blocking signal processing.
*/
nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
nxt_event_engine_signal_pipe(&engine->task, &engine->pipe->event, NULL);
}
if (engine->pipe != NULL && interface->enable_post != NULL) {

View File

@@ -477,7 +477,7 @@ struct nxt_event_engine_s {
uint8_t shutdown; /* 1 bit */
uint32_t batch0;
uint32_t batch;
uint32_t connections;
uint32_t max_connections;
@@ -486,11 +486,11 @@ struct nxt_event_engine_s {
};
NXT_EXPORT nxt_event_engine_t *nxt_event_engine_create(nxt_thread_t *thr,
NXT_EXPORT nxt_event_engine_t *nxt_event_engine_create(nxt_task_t *task,
const nxt_event_interface_t *interface, const nxt_sig_event_t *signals,
nxt_uint_t flags, nxt_uint_t batch);
NXT_EXPORT nxt_int_t nxt_event_engine_change(nxt_thread_t *thr,
nxt_task_t *task, const nxt_event_interface_t *interface, nxt_uint_t batch);
NXT_EXPORT nxt_int_t nxt_event_engine_change(nxt_event_engine_t *engine,
const nxt_event_interface_t *interface, nxt_uint_t batch);
NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine);
NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine);

View File

@@ -8,11 +8,9 @@
nxt_int_t
nxt_file_open(nxt_file_t *file, nxt_uint_t mode, nxt_uint_t create,
nxt_file_access_t access)
nxt_file_open(nxt_task_t *task, nxt_file_t *file, nxt_uint_t mode,
nxt_uint_t create, nxt_file_access_t access)
{
nxt_thread_debug(thr);
#ifdef __CYGWIN__
mode |= O_BINARY;
#endif
@@ -24,18 +22,20 @@ nxt_file_open(nxt_file_t *file, nxt_uint_t mode, nxt_uint_t create,
file->error = (file->fd == -1) ? nxt_errno : 0;
nxt_thread_time_debug_update(thr);
#if (NXT_DEBUG)
nxt_thread_time_update(task->thread);
#endif
nxt_log_debug(thr->log, "open(\"%FN\", 0x%uXi, 0x%uXi): %FD err:%d",
file->name, mode, access, file->fd, file->error);
nxt_debug(task, "open(\"%FN\", 0x%uXi, 0x%uXi): %FD err:%d",
file->name, mode, access, file->fd, file->error);
if (file->fd != -1) {
return NXT_OK;
}
if (file->log_level != 0) {
nxt_thread_log_error(file->log_level, "open(\"%FN\") failed %E",
file->name, file->error);
nxt_log(task, file->log_level, "open(\"%FN\") failed %E",
file->name, file->error);
}
return NXT_ERROR;
@@ -43,13 +43,13 @@ nxt_file_open(nxt_file_t *file, nxt_uint_t mode, nxt_uint_t create,
void
nxt_file_close(nxt_file_t *file)
nxt_file_close(nxt_task_t *task, nxt_file_t *file)
{
nxt_thread_log_debug("close(%FD)", file->fd);
nxt_debug(task, "close(%FD)", file->fd);
if (close(file->fd) != 0) {
nxt_thread_log_error(NXT_LOG_CRIT, "close(%FD, \"%FN\") failed %E",
file->fd, file->name, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "close(%FD, \"%FN\") failed %E",
file->fd, file->name, nxt_errno);
}
}

View File

@@ -106,10 +106,8 @@ typedef struct {
} nxt_file_t;
NXT_EXPORT nxt_int_t nxt_file_open(nxt_file_t *file, nxt_uint_t mode,
nxt_uint_t create, nxt_file_access_t access);
#define nxt_file_open_n "open"
NXT_EXPORT nxt_int_t nxt_file_open(nxt_task_t *task, nxt_file_t *file,
nxt_uint_t mode, nxt_uint_t create, nxt_file_access_t access);
/* The file open access modes. */
@@ -128,7 +126,7 @@ NXT_EXPORT nxt_int_t nxt_file_open(nxt_file_t *file, nxt_uint_t mode,
#define NXT_FILE_OWNER_ACCESS 0600
NXT_EXPORT void nxt_file_close(nxt_file_t *file);
NXT_EXPORT void nxt_file_close(nxt_task_t *task, nxt_file_t *file);
NXT_EXPORT ssize_t nxt_file_write(nxt_file_t *file, const u_char *buf,
size_t size, nxt_off_t offset);
NXT_EXPORT ssize_t nxt_file_read(nxt_file_t *file, u_char *buf, size_t size,

View File

@@ -60,7 +60,7 @@ nxt_job_init(nxt_job_t *job, size_t size)
void
nxt_job_destroy(void *data)
nxt_job_destroy(nxt_task_t *task, void *data)
{
nxt_job_t *job;

View File

@@ -60,7 +60,7 @@ typedef struct {
NXT_EXPORT void *nxt_job_create(nxt_mem_pool_t *mp, size_t size);
NXT_EXPORT void nxt_job_init(nxt_job_t *job, size_t size);
NXT_EXPORT void nxt_job_destroy(void *data);
NXT_EXPORT void nxt_job_destroy(nxt_task_t *task, void *data);
NXT_EXPORT nxt_int_t nxt_job_cleanup_add(nxt_mem_pool_t *mp, nxt_job_t *job);
NXT_EXPORT void nxt_job_start(nxt_task_t *task, nxt_job_t *job,

View File

@@ -738,7 +738,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
ev->kq_errno = err;
ev->kq_eof = eof;
if (ev->read == NXT_EVENT_BLOCKED) {
if (ev->read <= NXT_EVENT_BLOCKED) {
nxt_debug(ev->task, "blocked read event fd:%d", ev->fd);
continue;
}
@@ -769,7 +769,7 @@ nxt_kqueue_poll(nxt_event_engine_t *engine, nxt_msec_t timeout)
ev->kq_errno = err;
ev->kq_eof = eof;
if (ev->write == NXT_EVENT_BLOCKED) {
if (ev->write <= NXT_EVENT_BLOCKED) {
nxt_debug(ev->task, "blocked write event fd:%d", ev->fd);
continue;
}
@@ -908,7 +908,7 @@ nxt_kqueue_listen_handler(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "kevent fd:%d avail:%D",
cls->socket.fd, cls->socket.kq_available);
cls->ready = nxt_min(cls->batch0, (uint32_t) cls->socket.kq_available);
cls->ready = nxt_min(cls->batch, (uint32_t) cls->socket.kq_available);
nxt_kqueue_event_conn_io_accept(task, cls, data);
}

View File

@@ -36,7 +36,7 @@ nxt_lib_start(const char *app, char **argv, char ***envp)
int n;
nxt_int_t flags;
nxt_bool_t update;
nxt_thread_t *thr;
nxt_thread_t *thread;
flags = nxt_stderr_start();
@@ -64,16 +64,15 @@ nxt_lib_start(const char *app, char **argv, char ***envp)
/* Thread log is required for nxt_malloc() in nxt_strerror_start(). */
nxt_thread_init_data(nxt_thread_context);
thr = nxt_thread();
thr->log = &nxt_main_log;
thread = nxt_thread();
thread->log = &nxt_main_log;
#if (NXT_THREADS)
thr->handle = nxt_thread_handle();
thr->time.signal = -1;
#endif
thread->handle = nxt_thread_handle();
thread->time.signal = -1;
nxt_thread_time_update(thread);
nxt_main_task.thread = thr;
nxt_main_task.log = thr->log;
nxt_main_task.thread = thread;
nxt_main_task.log = thread->log;
nxt_main_task.ident = nxt_task_next_ident();
if (nxt_strerror_start() != NXT_OK) {
@@ -81,7 +80,7 @@ nxt_lib_start(const char *app, char **argv, char ***envp)
}
if (flags != -1) {
nxt_log_debug(thr->log, "stderr flags: 0x%04Xd", flags);
nxt_debug(&nxt_main_task, "stderr flags: 0x%04Xd", flags);
}
#ifdef _SC_NPROCESSORS_ONLN
@@ -93,7 +92,7 @@ nxt_lib_start(const char *app, char **argv, char ***envp)
#endif
nxt_log_debug(thr->log, "ncpu: %ui", n);
nxt_debug(&nxt_main_task, "ncpu: %ui", n);
if (n > 1) {
nxt_ncpu = n;
@@ -105,12 +104,12 @@ nxt_lib_start(const char *app, char **argv, char ***envp)
nxt_pagesize = getpagesize();
nxt_log_debug(thr->log, "pagesize: %ui", nxt_pagesize);
nxt_debug(&nxt_main_task, "pagesize: %ui", nxt_pagesize);
if (argv != NULL) {
update = (argv[0] == app);
nxt_process_arguments(argv, envp);
nxt_process_arguments(&nxt_main_task, argv, envp);
if (update) {
nxt_log_start(nxt_process_argv[0]);
@@ -131,12 +130,12 @@ nxt_lib_stop(void)
for ( ;; ) {
nxt_thread_pool_t *tp;
nxt_thread_spin_lock(&cycle->lock);
nxt_thread_spin_lock(&rt->lock);
tp = cycle->thread_pools;
cycle->thread_pools = (tp != NULL) ? tp->next : NULL;
tp = rt->thread_pools;
rt->thread_pools = (tp != NULL) ? tp->next : NULL;
nxt_thread_spin_unlock(&cycle->lock);
nxt_thread_spin_unlock(&rt->lock);
if (tp == NULL) {
break;

View File

@@ -75,7 +75,7 @@ nxt_list_elt(nxt_list_t *list, nxt_uint_t n)
#define nxt_list_each(elt, list) \
do { \
if (nxt_fast_path((list) != NULL)) { \
if (nxt_fast_path((list) != NULL)) { \
void *_end; \
nxt_list_part_t *_part = nxt_list_part(list); \
\

View File

@@ -5,7 +5,7 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
extern char **environ;
@@ -14,8 +14,7 @@ extern char **environ;
int nxt_cdecl
main(int argc, char **argv)
{
nxt_int_t ret;
nxt_thread_t *thr;
nxt_int_t ret;
if (nxt_lib_start("nginext", argv, &environ) != NXT_OK) {
return 1;
@@ -23,20 +22,17 @@ main(int argc, char **argv)
// nxt_main_log.level = NXT_LOG_INFO;
thr = nxt_thread();
nxt_thread_time_update(thr);
nxt_main_log.handler = nxt_log_time_handler;
nxt_log_error(NXT_LOG_INFO, thr->log, "nginext started");
nxt_log(&nxt_main_task, NXT_LOG_INFO, "nginext started");
ret = nxt_cycle_create(thr, &nxt_main_task, NULL, NULL);
ret = nxt_runtime_create(&nxt_main_task);
if (ret != NXT_OK) {
return 1;
}
nxt_event_engine_start(thr->engine);
nxt_event_engine_start(nxt_main_task.thread->engine);
nxt_unreachable();
return 0;

View File

@@ -14,13 +14,22 @@
#include <nxt_clang.h>
#include <nxt_types.h>
#include <nxt_time.h>
typedef struct nxt_mem_pool_s nxt_mem_pool_t;
#include <nxt_array.h>
typedef struct nxt_port_s nxt_port_t;
typedef struct nxt_task_s nxt_task_t;
typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
typedef struct nxt_sig_event_s nxt_sig_event_t;
typedef struct nxt_runtime_s nxt_runtime_t;
#include <nxt_process.h>
typedef struct nxt_task_s nxt_task_t;
typedef struct nxt_thread_s nxt_thread_t;
#include <nxt_thread_id.h>
typedef struct nxt_mem_pool_s nxt_mem_pool_t;
#include <nxt_mem_pool.h>
#include <nxt_errno.h>
@@ -89,7 +98,6 @@ typedef struct nxt_thread_pool_s nxt_thread_pool_t;
#include <nxt_hash.h>
#include <nxt_sort.h>
#include <nxt_array.h>
#include <nxt_vector.h>
#include <nxt_list.h>
@@ -110,8 +118,7 @@ typedef struct nxt_event_conn_s nxt_event_conn_t;
#endif
#define \
nxt_thread() \
#define nxt_thread() \
(nxt_thread_t *) nxt_thread_get_data(nxt_thread_context)
nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
@@ -121,7 +128,6 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_fd_event.h>
typedef struct nxt_cycle_s nxt_cycle_t;
#include <nxt_port.h>
#if (NXT_THREADS)
#include <nxt_thread_pool.h>
@@ -154,7 +160,7 @@ typedef struct nxt_upstream_source_s nxt_upstream_source_t;
#include <nxt_upstream_source.h>
#include <nxt_http_source.h>
#include <nxt_fastcgi_source.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
#if (NXT_LIB_UNIT_TEST)

View File

@@ -5,46 +5,39 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_master_process.h>
static nxt_int_t nxt_master_process_port_create(nxt_task_t *task,
nxt_cycle_t *cycle);
static void nxt_master_process_title(void);
nxt_runtime_t *rt);
static void nxt_master_process_title(nxt_task_t *task);
static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_router_process(nxt_task_t *task,
nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task,
nxt_cycle_t *cycle);
nxt_runtime_t *rt);
static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task,
nxt_cycle_t *cycle);
static void nxt_master_stop_previous_worker_processes(nxt_task_t *task,
void *obj, void *data);
static void nxt_master_process_sighup_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle);
nxt_runtime_t *rt, nxt_process_init_t *init);
static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj,
void *data);
static char **nxt_master_process_upgrade_environment(nxt_cycle_t *cycle);
static char **nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle);
static void nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid);
const nxt_sig_event_t nxt_master_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_master_process_sighup_handler),
nxt_event_signal(SIGINT, nxt_master_process_sigterm_handler),
nxt_event_signal(SIGQUIT, nxt_master_process_sigquit_handler),
nxt_event_signal(SIGTERM, nxt_master_process_sigterm_handler),
nxt_event_signal(SIGCHLD, nxt_master_process_sigchld_handler),
nxt_event_signal(SIGUSR1, nxt_master_process_sigusr1_handler),
nxt_event_signal(SIGUSR2, nxt_master_process_sigusr2_handler),
nxt_event_signal_end,
};
@@ -54,27 +47,47 @@ static nxt_bool_t nxt_exiting;
nxt_int_t
nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_cycle_t *cycle)
nxt_runtime_t *rt)
{
cycle->type = NXT_PROCESS_MASTER;
nxt_int_t ret;
if (nxt_master_process_port_create(task, cycle) != NXT_OK) {
rt->type = NXT_PROCESS_MASTER;
if (nxt_master_process_port_create(task, rt) != NXT_OK) {
return NXT_ERROR;
}
nxt_master_process_title();
nxt_master_process_title(task);
return nxt_master_start_worker_processes(task, cycle);
ret = nxt_master_start_controller_process(task, rt);
if (ret != NXT_OK) {
return ret;
}
ret = nxt_master_start_router_process(task, rt);
if (ret != NXT_OK) {
return ret;
}
return nxt_master_start_worker_processes(task, rt);
}
static nxt_int_t
nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle)
nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_port_t *port;
nxt_int_t ret;
nxt_port_t *port;
nxt_process_t *process;
port = nxt_array_zero_add(cycle->ports);
process = nxt_runtime_new_process(rt);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
process->pid = nxt_pid;
port = nxt_array_zero_add(process->ports);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
@@ -98,7 +111,7 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle)
static void
nxt_master_process_title(void)
nxt_master_process_title(nxt_task_t *task)
{
u_char *p, *end;
nxt_uint_t i;
@@ -115,22 +128,75 @@ nxt_master_process_title(void)
*p = '\0';
nxt_process_title((char *) title);
nxt_process_title(task, (char *) title);
}
static nxt_int_t
nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_uint_t n;
nxt_process_init_t *init;
cycle->process_generation++;
init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
n = cycle->worker_processes;
init->start = nxt_controller_start;
init->name = "controller process";
init->user_cred = &rt->user_cred;
init->port_handlers = nxt_worker_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_CONTROLLER;
return nxt_master_create_worker_process(task, rt, init);
}
static nxt_int_t
nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_process_init_t *init;
init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
init->start = nxt_router_start;
init->name = "router process";
init->user_cred = &rt->user_cred;
init->port_handlers = nxt_worker_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_ROUTER;
return nxt_master_create_worker_process(task, rt, init);
}
static nxt_int_t
nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_int_t ret;
nxt_uint_t n;
nxt_process_init_t *init;
init = nxt_mem_alloc(rt->mem_pool, sizeof(nxt_process_init_t));
if (nxt_slow_path(init == NULL)) {
return NXT_ERROR;
}
init->start = nxt_app_start;
init->name = "worker process";
init->user_cred = &rt->user_cred;
init->port_handlers = nxt_worker_process_port_handlers;
init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_WORKER;
n = rt->worker_processes;
while (n-- != 0) {
ret = nxt_master_create_worker_process(task, cycle);
ret = nxt_master_create_worker_process(task, rt, init);
if (ret != NXT_OK) {
return ret;
@@ -142,18 +208,34 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
static nxt_int_t
nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
nxt_process_init_t *init)
{
nxt_int_t ret;
nxt_pid_t pid;
nxt_port_t *port;
nxt_int_t ret;
nxt_pid_t pid;
nxt_port_t *port;
nxt_process_t *process, *master_process;
port = nxt_array_zero_add(cycle->ports);
/*
* TODO: remove process, init, ports from array on memory and fork failures.
*/
process = nxt_runtime_new_process(rt);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
process->init = init;
master_process = rt->processes->elts;
init->master_port = master_process->ports->elts;
port = nxt_array_zero_add(process->ports);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
cycle->current_process = cycle->ports->nelts - 1;
init->port = port;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -161,10 +243,8 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
}
port->engine = 0;
port->generation = cycle->process_generation;
pid = nxt_process_create(nxt_worker_process_start, cycle,
"start worker process");
pid = nxt_process_create(task, init);
switch (pid) {
@@ -177,110 +257,42 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
default:
/* The master process created a new process. */
process->pid = pid;
port->pid = pid;
nxt_port_read_close(port);
nxt_port_write_enable(task, port);
nxt_port_send_new_port(task, cycle, port);
nxt_port_send_new_port(task, rt, port);
return NXT_OK;
}
}
static void
nxt_master_process_sighup_handler(nxt_task_t *task, void *obj, void *data)
void
nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_cycle_t *cycle;
nxt_uint_t i, n, nprocesses, nports;
nxt_port_t *port;
nxt_process_t *process;
cycle = nxt_thread_cycle();
process = rt->processes->elts;
nprocesses = rt->processes->nelts;
nxt_log(task, NXT_LOG_NOTICE, "signal %d (%s) recevied, %s",
(int) (uintptr_t) obj, data,
cycle->reconfiguring ? "ignored" : "reconfiguring");
for (i = 0; i < nprocesses; i++) {
if (!cycle->reconfiguring) {
(void) nxt_cycle_create(task->thread, task, cycle,
nxt_master_process_new_cycle);
}
}
if (nxt_pid != process[i].pid) {
process[i].init = NULL;
port = process[i].ports->elts;
nports = process[i].ports->nelts;
static void
nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_thread_t *thr;
thr = task->thread;
nxt_debug(task, "new cycle");
/* A safe place to free the previous cycle. */
nxt_mem_pool_destroy(cycle->previous->mem_pool);
switch (nxt_master_start_worker_processes(task, cycle)) {
case NXT_OK:
/*
* The master process, allow old worker processes to accept new
* connections yet 500ms in parallel with new worker processes.
*/
cycle->timer.handler = nxt_master_stop_previous_worker_processes;
cycle->timer.log = &nxt_main_log;
cycle->timer.work_queue = &thr->engine->fast_work_queue;
nxt_timer_add(thr->engine, &cycle->timer, 500);
return;
case NXT_ERROR:
/*
* The master process, one or more new worker processes
* could not be created, there is no fallback.
*/
return;
default: /* NXT_AGAIN */
/* A worker process, return to the event engine work queue loop. */
return;
}
}
static void
nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
void *data)
{
uint32_t generation;
nxt_uint_t i, n;
nxt_port_t *port;
nxt_cycle_t *cycle;
cycle = nxt_thread_cycle();
port = cycle->ports->elts;
n = cycle->ports->nelts;
generation = cycle->process_generation - 1;
/* The port[0] is the master process port. */
for (i = 1; i < n; i++) {
if (port[i].generation == generation) {
(void) nxt_port_socket_write(task, &port[i],
NXT_PORT_MSG_QUIT, -1, 0, NULL);
for (n = 0; n < nports; n++) {
(void) nxt_port_socket_write(task, &port[n], NXT_PORT_MSG_QUIT,
-1, 0, NULL);
}
}
}
cycle->reconfiguring = 0;
}
void
nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL);
}
@@ -295,7 +307,7 @@ nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
nxt_exiting = 1;
nxt_cycle_quit(task, NULL);
nxt_runtime_quit(task);
}
@@ -309,7 +321,7 @@ nxt_master_process_sigquit_handler(nxt_task_t *task, void *obj, void *data)
nxt_exiting = 1;
nxt_cycle_quit(task, NULL);
nxt_runtime_quit(task);
}
@@ -319,7 +331,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
nxt_int_t ret;
nxt_uint_t n;
nxt_file_t *file, *new_file;
nxt_cycle_t *cycle;
nxt_runtime_t *rt;
nxt_array_t *new_files;
nxt_mem_pool_t *mp;
@@ -331,9 +343,9 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
return;
}
cycle = nxt_thread_cycle();
rt = task->thread->runtime;
n = nxt_list_nelts(cycle->log_files);
n = nxt_list_nelts(rt->log_files);
new_files = nxt_array_create(mp, n, sizeof(nxt_file_t));
if (new_files == NULL) {
@@ -341,7 +353,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
return;
}
nxt_list_each(file, cycle->log_files) {
nxt_list_each(file, rt->log_files) {
/* This allocation cannot fail. */
new_file = nxt_array_add(new_files);
@@ -350,7 +362,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
new_file->fd = NXT_FILE_INVALID;
new_file->log_level = NXT_LOG_CRIT;
ret = nxt_file_open(new_file, NXT_FILE_APPEND, NXT_FILE_CREATE_OR_OPEN,
ret = nxt_file_open(task, new_file, O_WRONLY | O_APPEND, O_CREAT,
NXT_FILE_OWNER_ACCESS);
if (ret != NXT_OK) {
@@ -366,9 +378,9 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
if (ret == NXT_OK) {
n = 0;
nxt_list_each(file, cycle->log_files) {
nxt_list_each(file, rt->log_files) {
nxt_port_change_log_file(task, cycle, n, new_file[n].fd);
nxt_port_change_log_file(task, rt, n, new_file[n].fd);
/*
* The old log file descriptor must be closed at the moment
* when no other threads use it. dup2() allows to use the
@@ -392,7 +404,7 @@ fail:
while (n != 0) {
if (new_file->fd != NXT_FILE_INVALID) {
nxt_file_close(new_file);
nxt_file_close(task, new_file);
}
new_file++;
@@ -403,164 +415,6 @@ fail:
}
static void
nxt_master_process_sigusr2_handler(nxt_task_t *task, void *obj, void *data)
{
char **env;
nxt_int_t ret;
nxt_pid_t pid, ppid;
nxt_bool_t ignore;
nxt_cycle_t *cycle;
cycle = nxt_thread_cycle();
/* Is upgrade or reconfiguring in progress? */
ignore = (cycle->new_binary != 0) || cycle->reconfiguring;
ppid = getppid();
if (ppid == nxt_ppid && ppid != 1) {
/*
* Ignore the upgrade signal in a new master process if an old
* master process is still running. After the old process's exit
* getppid() will return 1 (init process pid) or pid of zsched (zone
* scheduler) if the processes run in Solaris zone. There is little
* race condition between the parent process exit and getting getppid()
* for the very start of the new master process execution, so init or
* zsched pid may be stored in nxt_ppid. For this reason pid 1 is
* tested explicitly. There is no workaround for this race condition
* in Solaris zons. To eliminate this race condition in Solaris
* zone the old master process should be quit only when both
* "nginext.pid.oldbin" (created by the old master process) and
* "nginext.pid" (created by the new master process) files exists.
*/
ignore = 1;
}
nxt_log(task, NXT_LOG_NOTICE,
"signal %d (%s) recevied, %s, parent pid: %PI",
(int) (uintptr_t) obj, data,
ignore ? "ignored" : "online binary file upgrade", ppid);
if (ignore) {
return;
}
env = nxt_master_process_upgrade_environment(cycle);
if (nxt_slow_path(env == NULL)) {
return;
}
cycle->new_binary = -1;
ret = nxt_cycle_pid_file_create(cycle->oldbin_file, 0);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
pid = nxt_process_execute(nxt_process_argv[0], nxt_process_argv, env);
if (pid == -1) {
cycle->new_binary = 0;
(void) nxt_file_delete(cycle->oldbin_file);
} else {
cycle->new_binary = pid;
}
fail:
/* Zero slot is NGINX variable slot, all other slots must not be free()d. */
nxt_free(env[0]);
nxt_free(env);
}
static char **
nxt_master_process_upgrade_environment(nxt_cycle_t *cycle)
{
size_t len;
char **env;
u_char *p, *end;
nxt_uint_t n;
nxt_listen_socket_t *ls;
env = nxt_master_process_upgrade_environment_create(cycle);
if (nxt_slow_path(env == NULL)) {
return NULL;
}
ls = cycle->listen_sockets->elts;
n = cycle->listen_sockets->nelts;
len = sizeof("NGINX=") + n * (NXT_INT_T_LEN + 1);
p = nxt_malloc(len);
if (nxt_slow_path(p == NULL)) {
nxt_free(env);
return NULL;
}
env[0] = (char *) p;
end = p + len;
p = nxt_cpymem(p, "NGINX=", sizeof("NGINX=") - 1);
do {
p = nxt_sprintf(p, end, "%ud;", ls->socket);
ls++;
n--;
} while (n != 0);
*p = '\0';
return env;
}
static char **
nxt_master_process_upgrade_environment_create(nxt_cycle_t *cycle)
{
char **env;
nxt_uint_t n;
/* 2 is for "NGINX" variable and the last NULL slot. */
n = 2;
#if (NXT_SETPROCTITLE_ARGV)
n++;
#endif
env = nxt_malloc(n * sizeof(char *));
if (nxt_slow_path(env == NULL)) {
return NULL;
}
/* Zero slot is reserved for "NGINX" variable. */
n = 1;
/* TODO: copy env values */
#if (NXT_SETPROCTITLE_ARGV)
/* 300 spare bytes for new process title. */
env[n++] = (char *)
"SPARE=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
"XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX";
#endif
env[n] = NULL;
return env;
}
static void
nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
{
@@ -619,38 +473,36 @@ nxt_master_process_sigchld_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
{
nxt_uint_t i, n, generation;
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_uint_t i, n;
nxt_runtime_t *rt;
nxt_process_t *process;
nxt_process_init_t *init;
cycle = nxt_thread_cycle();
rt = task->thread->runtime;
if (cycle->new_binary == pid) {
cycle->new_binary = 0;
process = rt->processes->elts;
n = rt->processes->nelts;
(void) nxt_file_rename(cycle->oldbin_file, cycle->pid_file);
return;
}
/* A process[0] is the master process. */
port = cycle->ports->elts;
n = cycle->ports->nelts;
for (i = 1; i < n; i++) {
for (i = 0; i < n; i++) {
if (pid == process[i].pid) {
init = process[i].init;
if (pid == port[i].pid) {
generation = port[i].generation;
/* TODO: free ports fds. */
nxt_array_remove(cycle->ports, &port[i]);
nxt_array_remove(rt->processes, &process[i]);
if (nxt_exiting) {
nxt_debug(task, "processes %d", n);
if (n == 2) {
nxt_cycle_quit(task, cycle);
nxt_runtime_quit(task);
}
} else if (generation == cycle->process_generation) {
(void) nxt_master_create_worker_process(task, cycle);
} else if (init != NULL) {
(void) nxt_master_create_worker_process(task, rt, init);
}
return;

View File

@@ -4,17 +4,21 @@
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_UNIX_MASTER_PROCESS_H_INCLUDED_
#define _NXT_UNIX_MASTER_PROCESS_H_INCLUDED_
#ifndef _NXT_MASTER_PROCESS_H_INCLUDED_
#define _NXT_MASTER_PROCESS_H_INCLUDED_
nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_cycle_t *cycle);
void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle);
void nxt_worker_process_start(void *data);
nxt_runtime_t *runtime);
void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *runtime);
nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt);
nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt);
extern nxt_port_handler_t nxt_worker_process_port_handlers[];
extern const nxt_sig_event_t nxt_master_process_signals[];
extern const nxt_sig_event_t nxt_worker_process_signals[];
#endif /* _NXT_UNIX_MASTER_PROCESS_H_INCLUDED_ */
#endif /* _NXT_MASTER_PROCESS_H_INCLUDED_ */

View File

@@ -126,16 +126,19 @@ nxt_mem_pool_create(size_t size)
void
nxt_mem_pool_destroy(nxt_mem_pool_t *mp)
{
nxt_task_t *task;
nxt_mem_pool_ext_t *ext;
nxt_mem_pool_chunk_t *chunk, *next;
nxt_mem_pool_cleanup_t *mpcl;
task = NULL;
nxt_mem_pool_thread_assert(mp);
for (mpcl = mp->cleanup; mpcl != NULL; mpcl = mpcl->next) {
if (mpcl->handler != NULL) {
nxt_thread_log_debug("mem pool cleanup: %p", mpcl);
mpcl->handler(mpcl->data);
mpcl->handler(task, mpcl->data);
}
}

View File

@@ -11,7 +11,7 @@
#define NXT_MEM_POOL_MIN_EXT_SIZE nxt_pagesize
typedef void (*nxt_mem_pool_cleanup_handler_t)(void *data);
typedef void (*nxt_mem_pool_cleanup_handler_t)(nxt_task_t *task, void *data);
typedef struct nxt_mem_pool_cleanup_s nxt_mem_pool_cleanup_t;
typedef struct nxt_mem_pool_cache_s nxt_mem_pool_cache_t;
typedef struct nxt_mem_pool_chunk_s nxt_mem_pool_chunk_t;

View File

@@ -7,7 +7,7 @@
#include <nxt_main.h>
static void nxt_mem_pool_file_cleanup_handler(void *data);
static void nxt_mem_pool_file_cleanup_handler(nxt_task_t *task, void *data);
nxt_mem_pool_cleanup_t *
@@ -27,13 +27,13 @@ nxt_mem_pool_file_cleanup(nxt_mem_pool_t *mp, nxt_file_t *file)
static void
nxt_mem_pool_file_cleanup_handler(void *data)
nxt_mem_pool_file_cleanup_handler(nxt_task_t *task, void *data)
{
nxt_file_t *file;
file = data;
if (file->fd != NXT_FILE_INVALID) {
nxt_file_close(file);
nxt_file_close(task, file);
}
}

View File

@@ -507,62 +507,62 @@ nxt_php_register_variables(zval *track_vars_array TSRMLS_DC)
nxt_log_debug(r->log, "php register variables");
php_register_variable_safe((char *) "PHP_SELF",
(char *) r->header.path.data,
ctx->script_name_len, track_vars_array TSRMLS_CC);
(char *) r->header.path.data,
ctx->script_name_len, track_vars_array TSRMLS_CC);
php_register_variable_safe((char *) "SERVER_PROTOCOL",
(char *) r->header.version.data,
r->header.version.len, track_vars_array TSRMLS_CC);
(char *) r->header.version.data,
r->header.version.len, track_vars_array TSRMLS_CC);
#if ABS_MODE
php_register_variable_safe((char *) "SCRIPT_NAME",
(char *) nxt_php_script.data,
nxt_php_script.len, track_vars_array TSRMLS_CC);
(char *) nxt_php_script.data,
nxt_php_script.len, track_vars_array TSRMLS_CC);
php_register_variable_safe((char *) "SCRIPT_FILENAME",
(char *) nxt_php_path.data,
nxt_php_path.len, track_vars_array TSRMLS_CC);
(char *) nxt_php_path.data,
nxt_php_path.len, track_vars_array TSRMLS_CC);
php_register_variable_safe((char *) "DOCUMENT_ROOT",
(char *) nxt_php_root.data,
nxt_php_root.len, track_vars_array TSRMLS_CC);
(char *) nxt_php_root.data,
nxt_php_root.len, track_vars_array TSRMLS_CC);
#else
php_register_variable_safe((char *) "SCRIPT_NAME",
(char *) r->header.path.data,
ctx->script_name_len, track_vars_array TSRMLS_CC);
(char *) r->header.path.data,
ctx->script_name_len, track_vars_array TSRMLS_CC);
php_register_variable_safe((char *) "SCRIPT_FILENAME",
(char *) ctx->script.data, ctx->script.len,
track_vars_array TSRMLS_CC);
(char *) ctx->script.data, ctx->script.len,
track_vars_array TSRMLS_CC);
php_register_variable_safe((char *) "DOCUMENT_ROOT", (char *) root,
sizeof(root) - 1, track_vars_array TSRMLS_CC);
sizeof(root) - 1, track_vars_array TSRMLS_CC);
#endif
php_register_variable_safe((char *) "REQUEST_METHOD",
(char *) r->header.method.data,
r->header.method.len, track_vars_array TSRMLS_CC);
(char *) r->header.method.data,
r->header.method.len, track_vars_array TSRMLS_CC);
php_register_variable_safe((char *) "REQUEST_URI",
(char *) r->header.path.data,
r->header.path.len, track_vars_array TSRMLS_CC);
(char *) r->header.path.data,
r->header.path.len, track_vars_array TSRMLS_CC);
if (ctx->query.data != NULL) {
php_register_variable_safe((char *) "QUERY_STRING",
(char *) ctx->query.data,
ctx->query.len, track_vars_array TSRMLS_CC);
(char *) ctx->query.data,
ctx->query.len, track_vars_array TSRMLS_CC);
}
if (ctx->content_type != NULL) {
php_register_variable_safe((char *) "CONTENT_TYPE",
(char *) ctx->content_type->data,
ctx->content_type->len, track_vars_array TSRMLS_CC);
(char *) ctx->content_type->data,
ctx->content_type->len, track_vars_array TSRMLS_CC);
}
if (ctx->content_length != NULL) {
php_register_variable_safe((char *) "CONTENT_LENGTH",
(char *) ctx->content_length->data,
ctx->content_length->len, track_vars_array TSRMLS_CC);
(char *) ctx->content_length->data,
ctx->content_length->len, track_vars_array TSRMLS_CC);
}
var = nxt_mem_nalloc(r->mem_pool, sizeof(prefix) + ctx->max_name + 1);

View File

@@ -5,7 +5,7 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
@@ -29,18 +29,26 @@ nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
void
nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
nxt_uint_t i, n;
nxt_port_t *port;
nxt_uint_t i, n, nprocesses, nports;
nxt_port_t *port;
nxt_process_t *process;
port = cycle->ports->elts;
n = cycle->ports->nelts;
process = rt->processes->elts;
nprocesses = rt->processes->nelts;
for (i = 0; i < n; i++) {
if (nxt_pid != port[i].pid) {
(void) nxt_port_socket_write(task, &port[i], type, fd, stream, b);
for (i = 0; i < nprocesses; i++) {
if (nxt_pid != process[i].pid) {
port = process[i].ports->elts;
nports = process[i].ports->nelts;
for (n = 0; n < nports; n++) {
(void) nxt_port_socket_write(task, &port[n], type,
fd, stream, b);
}
}
}
}
@@ -70,20 +78,21 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_cycle_quit(task, NULL);
nxt_runtime_quit(task);
}
void
nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port)
{
nxt_buf_t *b;
nxt_uint_t i, n;
nxt_port_t *port;
nxt_process_t *process;
nxt_port_msg_new_port_t *msg;
n = cycle->ports->nelts;
n = rt->processes->nelts;
if (n == 0) {
return;
}
@@ -91,34 +100,33 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_debug(task, "new port %d for process %PI engine %uD",
new_port->socket.fd, new_port->pid, new_port->engine);
port = cycle->ports->elts;
process = rt->processes->elts;
for (i = 0; i < n; i++) {
if (port[i].pid == new_port->pid
|| port[i].pid == nxt_pid
|| port[i].engine != 0)
{
if (process[i].pid == new_port->pid || process[i].pid == nxt_pid) {
continue;
}
b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0);
port = process[i].ports->elts;
b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
}
b->data = &port[i];
b->data = port;
b->completion_handler = nxt_port_new_port_buf_completion;
b->mem.free += sizeof(nxt_port_msg_new_port_t);
msg = (nxt_port_msg_new_port_t *) b->mem.pos;
msg->pid = new_port->pid;
msg->engine = new_port->engine;
msg->max_size = port[i].max_size;
msg->max_share = port[i].max_share;
msg->max_size = port->max_size;
msg->max_share = port->max_share;
(void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_NEW_PORT,
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
new_port->socket.fd, 0, b);
}
}
@@ -143,13 +151,19 @@ void
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_process_t *process;
nxt_runtime_t *rt;
nxt_mem_pool_t *mp;
nxt_port_msg_new_port_t *new_port_msg;
cycle = nxt_thread_cycle();
rt = task->thread->runtime;
port = nxt_array_add(cycle->ports);
process = nxt_runtime_new_process(rt);
if (nxt_slow_path(process == NULL)) {
return;
}
port = nxt_array_zero_add(process->ports);
if (nxt_slow_path(port == NULL)) {
return;
}
@@ -167,6 +181,8 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "new port %d received for process %PI engine %uD",
msg->fd, new_port_msg->pid, new_port_msg->engine);
process->pid = new_port_msg->pid;
port->pid = new_port_msg->pid;
port->engine = new_port_msg->engine;
port->pair[0] = -1;
@@ -183,27 +199,29 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_uint_t slot, nxt_fd_t fd)
nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
nxt_fd_t fd)
{
nxt_buf_t *b;
nxt_uint_t i, n;
nxt_port_t *port;
nxt_buf_t *b;
nxt_uint_t i, n;
nxt_port_t *port;
nxt_process_t *process;
n = cycle->ports->nelts;
n = rt->processes->nelts;
if (n == 0) {
return;
}
nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
port = cycle->ports->elts;
process = rt->processes->elts;
/* port[0] is master process port. */
/* process[0] is master process. */
for (i = 1; i < n; i++) {
b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0);
port = process[i].ports->elts;
b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
}
@@ -211,7 +229,7 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
*(nxt_uint_t *) b->mem.pos = slot;
b->mem.free += sizeof(nxt_uint_t);
(void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_CHANGE_FILE,
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
fd, 0, b);
}
}
@@ -220,17 +238,17 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
void
nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b;
nxt_uint_t slot;
nxt_file_t *log_file;
nxt_cycle_t *cycle;
nxt_buf_t *b;
nxt_uint_t slot;
nxt_file_t *log_file;
nxt_runtime_t *rt;
cycle = nxt_thread_cycle();
rt = task->thread->runtime;
b = msg->buf;
slot = *(nxt_uint_t *) b->mem.pos;
log_file = nxt_list_elt(cycle->log_files, slot);
log_file = nxt_list_elt(rt->log_files, slot);
nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd);

View File

@@ -8,9 +8,6 @@
#define _NXT_PORT_H_INCLUDED_
typedef struct nxt_port_s nxt_port_t;
typedef struct {
uint32_t stream;
@@ -28,17 +25,14 @@ typedef struct {
} nxt_port_send_msg_t;
typedef struct nxt_port_recv_msg_s {
struct nxt_port_recv_msg_s {
uint32_t stream;
uint16_t type;
nxt_fd_t fd;
nxt_buf_t *buf;
nxt_port_t *port;
} nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
};
struct nxt_port_s {
@@ -61,7 +55,6 @@ struct nxt_port_s {
nxt_pid_t pid;
uint32_t engine;
uint32_t generation;
};
@@ -105,11 +98,11 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
nxt_port_handler_t *handlers);
void nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
void nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *port);
void nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
nxt_uint_t slot, nxt_fd_t fd);
void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);

View File

@@ -5,9 +5,11 @@
*/
#include <nxt_main.h>
#include <nxt_master_process.h>
static nxt_int_t nxt_user_groups_get(nxt_user_cred_t *uc);
static void nxt_process_start(nxt_task_t *task, nxt_process_init_t *process);
static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc);
/* A cached process pid. */
@@ -18,20 +20,17 @@ nxt_pid_t nxt_ppid;
nxt_pid_t
nxt_process_create(nxt_process_start_t start, void *data, const char *name)
nxt_process_create(nxt_task_t *task, nxt_process_init_t *process)
{
nxt_pid_t pid;
nxt_thread_t *thr;
thr = nxt_thread();
nxt_pid_t pid;
pid = fork();
switch (pid) {
case -1:
nxt_log_alert(thr->log, "fork() failed while creating \"%s\" %E",
name, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "fork() failed while creating \"%s\" %E",
process->name, nxt_errno);
break;
case 0:
@@ -39,14 +38,14 @@ nxt_process_create(nxt_process_start_t start, void *data, const char *name)
nxt_pid = getpid();
/* Clean inherited cached thread tid. */
thr->tid = 0;
task->thread->tid = 0;
start(data);
nxt_process_start(task, process);
break;
default:
/* A parent. */
nxt_log_debug(thr->log, "fork(): %PI", pid);
nxt_debug(task, "fork(\"%s\"): %PI", process->name, pid);
break;
}
@@ -54,6 +53,73 @@ nxt_process_create(nxt_process_start_t start, void *data, const char *name)
}
static void
nxt_process_start(nxt_task_t *task, nxt_process_init_t *process)
{
nxt_int_t ret;
nxt_thread_t *thread;
nxt_runtime_t *rt;
nxt_event_engine_t *engine;
const nxt_event_interface_t *interface;
nxt_log(task, NXT_LOG_INFO, "%s started", process->name);
nxt_process_title(task, "nginext: %s", process->name);
nxt_random_init(&nxt_random_data);
if (process->user_cred != NULL && getuid() == 0) {
/* Super-user. */
ret = nxt_user_cred_set(task, process->user_cred);
if (ret != NXT_OK) {
goto fail;
}
}
thread = task->thread;
rt = thread->runtime;
rt->type = process->type;
engine = thread->engine;
/* Update inherited master process event engine and signals processing. */
engine->signals->sigev = process->signals;
interface = nxt_service_get(rt->services, "engine", rt->engine);
if (interface == NULL) {
goto fail;
}
if (nxt_event_engine_change(engine, interface, rt->batch) != NXT_OK) {
goto fail;
}
nxt_port_read_close(process->master_port);
nxt_port_write_enable(task, process->master_port);
/* A worker process port. */
nxt_port_create(thread, process->port, process->port_handlers);
ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
60000 * 1000000LL);
if (ret != NXT_OK) {
goto fail;
}
ret = process->start(task, rt);
if (ret == NXT_OK) {
return;
}
fail:
exit(1);
}
#if (NXT_HAVE_POSIX_SPAWN)
/*
@@ -76,14 +142,15 @@ nxt_process_create(nxt_process_start_t start, void *data, const char *name)
*/
nxt_pid_t
nxt_process_execute(char *name, char **argv, char **envp)
nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
{
nxt_pid_t pid;
nxt_thread_log_debug("posix_spawn(\"%s\")", name);
nxt_debug(task, "posix_spawn(\"%s\")", name);
if (posix_spawn(&pid, name, NULL, NULL, argv, envp) != 0) {
nxt_thread_log_alert("posix_spawn(\"%s\") failed %E", name, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "posix_spawn(\"%s\") failed %E",
name, nxt_errno);
return -1;
}
@@ -93,7 +160,7 @@ nxt_process_execute(char *name, char **argv, char **envp)
#else
nxt_pid_t
nxt_process_execute(char *name, char **argv, char **envp)
nxt_process_execute(nxt_task_t *task, char *name, char **argv, char **envp)
{
nxt_pid_t pid;
@@ -109,24 +176,26 @@ nxt_process_execute(char *name, char **argv, char **envp)
switch (pid) {
case -1:
nxt_thread_log_alert("vfork() failed while executing \"%s\" %E",
name, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "vfork() failed while executing \"%s\" %E",
name, nxt_errno);
break;
case 0:
/* A child. */
nxt_thread_log_debug("execve(\"%s\")", name);
nxt_debug(task, "execve(\"%s\")", name);
(void) execve(name, argv, envp);
nxt_thread_log_alert("execve(\"%s\") failed %E", name, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "execve(\"%s\") failed %E",
name, nxt_errno);
exit(1);
nxt_unreachable();
break;
default:
/* A parent. */
nxt_thread_log_debug("vfork(): %PI", pid);
nxt_debug(task, "vfork(): %PI", pid);
break;
}
@@ -137,14 +206,11 @@ nxt_process_execute(char *name, char **argv, char **envp)
nxt_int_t
nxt_process_daemon(void)
nxt_process_daemon(nxt_task_t *task)
{
nxt_fd_t fd;
nxt_pid_t pid;
const char *msg;
nxt_thread_t *thr;
thr = nxt_thread();
/*
* fork() followed by a parent process's exit() detaches a child process
@@ -166,7 +232,7 @@ nxt_process_daemon(void)
default:
/* A parent. */
nxt_log_debug(thr->log, "fork(): %PI", pid);
nxt_debug(task, "fork(): %PI", pid);
exit(0);
nxt_unreachable();
}
@@ -174,14 +240,14 @@ nxt_process_daemon(void)
nxt_pid = getpid();
/* Clean inherited cached thread tid. */
thr->tid = 0;
task->thread->tid = 0;
nxt_log_debug(thr->log, "daemon");
nxt_debug(task, "daemon");
/* Detach from controlling terminal. */
if (setsid() == -1) {
nxt_log_emerg(thr->log, "setsid() failed %E", nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "setsid() failed %E", nxt_errno);
return NXT_ERROR;
}
@@ -217,7 +283,7 @@ nxt_process_daemon(void)
fail:
nxt_log_emerg(thr->log, msg, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, msg, nxt_errno);
return NXT_ERROR;
}
@@ -236,7 +302,7 @@ nxt_nanosleep(nxt_nsec_t ns)
nxt_int_t
nxt_user_cred_get(nxt_user_cred_t *uc, const char *group)
nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc, const char *group)
{
struct group *grp;
struct passwd *pwd;
@@ -244,7 +310,8 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group)
pwd = getpwnam(uc->user);
if (nxt_slow_path(pwd == NULL)) {
nxt_thread_log_emerg("getpwnam(%s) failed %E", uc->user, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "getpwnam(%s) failed %E",
uc->user, nxt_errno);
return NXT_ERROR;
}
@@ -255,7 +322,8 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group)
grp = getgrnam(group);
if (nxt_slow_path(grp == NULL)) {
nxt_thread_log_emerg("getgrnam(%s) failed %E", group, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "getgrnam(%s) failed %E",
group, nxt_errno);
return NXT_ERROR;
}
@@ -263,7 +331,7 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group)
}
if (getuid() == 0) {
return nxt_user_groups_get(uc);
return nxt_user_groups_get(task, uc);
}
return NXT_OK;
@@ -297,7 +365,7 @@ nxt_user_cred_get(nxt_user_cred_t *uc, const char *group)
*/
static nxt_int_t
nxt_user_groups_get(nxt_user_cred_t *uc)
nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc)
{
int nsaved, ngroups;
nxt_int_t ret;
@@ -306,11 +374,11 @@ nxt_user_groups_get(nxt_user_cred_t *uc)
nsaved = getgroups(0, NULL);
if (nsaved == -1) {
nxt_thread_log_emerg("getgroups(0, NULL) failed %E", nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "getgroups(0, NULL) failed %E", nxt_errno);
return NXT_ERROR;
}
nxt_thread_log_debug("getgroups(0, NULL): %d", nsaved);
nxt_debug(task, "getgroups(0, NULL): %d", nsaved);
if (nsaved > NGROUPS_MAX) {
/* MacOSX case. */
@@ -328,14 +396,15 @@ nxt_user_groups_get(nxt_user_cred_t *uc)
nsaved = getgroups(nsaved, saved);
if (nsaved == -1) {
nxt_thread_log_emerg("getgroups(%d) failed %E", nsaved, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "getgroups(%d) failed %E",
nsaved, nxt_errno);
goto fail;
}
nxt_thread_log_debug("getgroups(): %d", nsaved);
nxt_debug(task, "getgroups(): %d", nsaved);
if (initgroups(uc->user, uc->base_gid) != 0) {
nxt_thread_log_emerg("initgroups(%s, %d) failed",
nxt_log(task, NXT_LOG_CRIT, "initgroups(%s, %d) failed",
uc->user, uc->base_gid);
goto restore;
}
@@ -343,11 +412,11 @@ nxt_user_groups_get(nxt_user_cred_t *uc)
ngroups = getgroups(0, NULL);
if (ngroups == -1) {
nxt_thread_log_emerg("getgroups(0, NULL) failed %E", nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "getgroups(0, NULL) failed %E", nxt_errno);
goto restore;
}
nxt_thread_log_debug("getgroups(0, NULL): %d", ngroups);
nxt_debug(task, "getgroups(0, NULL): %d", ngroups);
uc->gids = nxt_malloc(ngroups * sizeof(nxt_gid_t));
@@ -358,7 +427,8 @@ nxt_user_groups_get(nxt_user_cred_t *uc)
ngroups = getgroups(ngroups, uc->gids);
if (ngroups == -1) {
nxt_thread_log_emerg("getgroups(%d) failed %E", ngroups, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "getgroups(%d) failed %E",
ngroups, nxt_errno);
goto restore;
}
@@ -377,9 +447,9 @@ nxt_user_groups_get(nxt_user_cred_t *uc)
p = nxt_sprintf(p, end, "%uL:", (uint64_t) uc->gids[i]);
}
nxt_thread_log_debug("user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s",
uc->user, (uint64_t) uc->uid,
(uint64_t) uc->base_gid, p - msg, msg);
nxt_debug(task, "user \"%s\" cred: uid:%uL base gid:%uL, gids:%*s",
uc->user, (uint64_t) uc->uid, (uint64_t) uc->base_gid,
p - msg, msg);
}
#endif
@@ -388,7 +458,8 @@ nxt_user_groups_get(nxt_user_cred_t *uc)
restore:
if (setgroups(nsaved, saved) != 0) {
nxt_thread_log_emerg("setgroups(%d) failed %E", nsaved, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "setgroups(%d) failed %E",
nsaved, nxt_errno);
ret = NXT_ERROR;
}
@@ -401,34 +472,35 @@ fail:
nxt_int_t
nxt_user_cred_set(nxt_user_cred_t *uc)
nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
{
nxt_thread_log_debug("user cred set: \"%s\" uid:%uL base gid:%uL",
uc->user, (uint64_t) uc->uid, uc->base_gid);
nxt_debug(task, "user cred set: \"%s\" uid:%uL base gid:%uL",
uc->user, (uint64_t) uc->uid, uc->base_gid);
if (setgid(uc->base_gid) != 0) {
nxt_thread_log_emerg("setgid(%d) failed %E", uc->base_gid, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "setgid(%d) failed %E",
uc->base_gid, nxt_errno);
return NXT_ERROR;
}
if (uc->gids != NULL) {
if (setgroups(uc->ngroups, uc->gids) != 0) {
nxt_thread_log_emerg("setgroups(%i) failed %E",
uc->ngroups, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "setgroups(%i) failed %E",
uc->ngroups, nxt_errno);
return NXT_ERROR;
}
} else {
/* MacOSX fallback. */
if (initgroups(uc->user, uc->base_gid) != 0) {
nxt_thread_log_emerg("initgroups(%s, %d) failed",
uc->user, uc->base_gid);
nxt_log(task, NXT_LOG_CRIT, "initgroups(%s, %d) failed",
uc->user, uc->base_gid);
return NXT_ERROR;
}
}
if (setuid(uc->uid) != 0) {
nxt_thread_log_emerg("setuid(%d) failed %E", uc->uid, nxt_errno);
nxt_log(task, NXT_LOG_CRIT, "setuid(%d) failed %E", uc->uid, nxt_errno);
return NXT_ERROR;
}

View File

@@ -4,62 +4,95 @@
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_UNIX_PROCESS_H_INCLUDED_
#define _NXT_UNIX_PROCESS_H_INCLUDED_
#ifndef _NXT_PROCESS_H_INCLUDED_
#define _NXT_PROCESS_H_INCLUDED_
typedef pid_t nxt_pid_t;
typedef enum {
NXT_PROCESS_SINGLE = 0,
NXT_PROCESS_MASTER,
NXT_PROCESS_CONTROLLER,
NXT_PROCESS_ROUTER,
NXT_PROCESS_WORKER,
} nxt_process_type_t;
#define \
nxt_sched_yield() \
typedef pid_t nxt_pid_t;
typedef uid_t nxt_uid_t;
typedef gid_t nxt_gid_t;
typedef struct {
const char *user;
nxt_uid_t uid;
nxt_gid_t base_gid;
nxt_uint_t ngroups;
nxt_gid_t *gids;
} nxt_user_cred_t;
typedef struct nxt_process_init_s nxt_process_init_t;
typedef nxt_int_t (*nxt_process_star_t)(nxt_task_t *task, nxt_runtime_t *rt);
struct nxt_process_init_s {
nxt_process_star_t start;
const char *name;
nxt_user_cred_t *user_cred;
nxt_port_t *port;
nxt_port_t *master_port;
nxt_port_handler_t *port_handlers;
const nxt_sig_event_t *signals;
nxt_process_type_t type:8; /* 3 bits */
};
typedef struct {
nxt_pid_t pid;
nxt_array_t *ports; /* of nxt_port_t */
nxt_process_init_t *init;
} nxt_process_t;
NXT_EXPORT nxt_pid_t nxt_process_create(nxt_task_t *task,
nxt_process_init_t *process);
NXT_EXPORT nxt_pid_t nxt_process_execute(nxt_task_t *task, char *name,
char **argv, char **envp);
NXT_EXPORT nxt_int_t nxt_process_daemon(nxt_task_t *task);
NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns);
NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
char ***orig_envp);
#if (NXT_HAVE_SETPROCTITLE)
#define nxt_process_title(task, fmt, ...) \
setproctitle(fmt, __VA_ARGS__)
#elif (NXT_LINUX || NXT_SOLARIS || NXT_MACOSX)
#define NXT_SETPROCTITLE_ARGV 1
NXT_EXPORT void nxt_process_title(nxt_task_t *task, const char *fmt, ...);
#endif
#define nxt_sched_yield() \
sched_yield()
#define \
nxt_process_id() \
nxt_pid
/*
* Solaris declares abort() as __NORETURN,
* raise(SIGABRT) is mostly the same.
*/
#define \
nxt_abort() \
#define nxt_abort() \
(void) raise(SIGABRT)
typedef void (*nxt_process_start_t)(void *data);
NXT_EXPORT nxt_pid_t nxt_process_create(nxt_process_start_t start, void *data,
const char *name);
NXT_EXPORT nxt_pid_t nxt_process_execute(char *name, char **argv, char **envp);
NXT_EXPORT nxt_int_t nxt_process_daemon(void);
NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns);
NXT_EXPORT void nxt_process_arguments(char **orig_argv, char ***orig_envp);
#if (NXT_HAVE_SETPROCTITLE)
#define \
nxt_process_title(title) \
setproctitle("%s", title)
#elif (NXT_LINUX || NXT_SOLARIS || NXT_MACOSX)
#define NXT_SETPROCTITLE_ARGV 1
NXT_EXPORT void nxt_process_title(const char *title);
#else
#define \
nxt_process_title(title)
#endif
NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_task_t *task, nxt_user_cred_t *uc,
const char *group);
NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc);
NXT_EXPORT extern nxt_pid_t nxt_pid;
NXT_EXPORT extern nxt_pid_t nxt_ppid;
@@ -67,21 +100,4 @@ NXT_EXPORT extern char **nxt_process_argv;
NXT_EXPORT extern char ***nxt_process_environ;
typedef uid_t nxt_uid_t;
typedef gid_t nxt_gid_t;
typedef struct {
const char *user;
nxt_uid_t uid;
nxt_gid_t base_gid;
nxt_uint_t ngroups;
nxt_gid_t *gids;
} nxt_user_cred_t;
NXT_EXPORT nxt_int_t nxt_user_cred_get(nxt_user_cred_t *uc, const char *group);
NXT_EXPORT nxt_int_t nxt_user_cred_set(nxt_user_cred_t *uc);
#endif /* _NXT_UNIX_PROCESS_H_INCLUDED_ */
#endif /* _NXT_PROCESS_H_INCLUDED_ */

View File

@@ -43,7 +43,7 @@ static u_char *nxt_process_title_end;
void
nxt_process_arguments(char **orig_argv, char ***orig_envp)
nxt_process_arguments(nxt_task_t *task, char **orig_argv, char ***orig_envp)
{
u_char *p, *end, *argv_end, **argv, **env;
size_t size, argv_size, environ_size, strings_size;
@@ -126,7 +126,7 @@ nxt_process_arguments(char **orig_argv, char ***orig_envp)
* There is no reason to modify environ if arguments
* and environment are not contiguous.
*/
nxt_thread_log_debug("arguments and environment are not contiguous");
nxt_debug(task, "arguments and environment are not contiguous");
goto done;
}
@@ -187,9 +187,10 @@ done:
void
nxt_process_title(const char *title)
nxt_process_title(nxt_task_t *task, const char *fmt, ...)
{
u_char *p, *start, *end;
u_char *p, *start, *end;
va_list args;
start = nxt_process_title_start;
@@ -199,7 +200,9 @@ nxt_process_title(const char *title)
end = nxt_process_title_end;
p = nxt_sprintf(start, end, "%s", title);
va_start(args, fmt);
p = nxt_vsprintf(start, end, fmt, args);
va_end(args);
#if (NXT_SOLARIS)
/*
@@ -238,7 +241,7 @@ nxt_process_title(const char *title)
*/
nxt_memset(p, '\0', end - p);
nxt_thread_log_debug("setproctitle: \"%s\"", start);
nxt_debug(task, "setproctitle: \"%s\"", start);
}
#else /* !(NXT_SETPROCTITLE_ARGV) */

View File

@@ -15,7 +15,7 @@
#endif
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
#include <nxt_application.h>
@@ -45,7 +45,7 @@ static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
extern nxt_int_t nxt_python_wsgi_init(nxt_thread_t *thr, nxt_cycle_t *cycle);
extern nxt_int_t nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt);
nxt_application_module_t nxt_python_module = {
@@ -130,7 +130,7 @@ static nxt_app_request_t *nxt_app_request;
nxt_int_t
nxt_python_wsgi_init(nxt_thread_t *thr, nxt_cycle_t *cycle)
nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt)
{
char **argv;
u_char *p, *dir;
@@ -295,7 +295,8 @@ nxt_python_init(nxt_thread_t *thr)
return NXT_ERROR;
}
pModule = PyImport_ExecCodeModuleEx((char *) "_wsgi_nginext", co, (char *) script);
pModule = PyImport_ExecCodeModuleEx((char *) "_wsgi_nginext", co,
(char *) script);
Py_XDECREF(co);
#endif

View File

@@ -68,7 +68,7 @@ nxt_queue_is_empty(queue) \
*
* for (lnk = nxt_queue_last(queue);
* lnk != nxt_queue_head(queue);
* lnk = nxt_queue_next(lnk))
* lnk = nxt_queue_prev(lnk))
* {
* tp = nxt_queue_link_data(lnk, nxt_type_t, link);
*/

85
src/nxt_router.c Normal file
View File

@@ -0,0 +1,85 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_master_process.h>
static nxt_int_t nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt);
nxt_int_t
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
{
if (nxt_router_listen_socket(task, rt) != NXT_OK) {
return NXT_ERROR;
}
return NXT_OK;
}
static nxt_int_t
nxt_router_listen_socket(nxt_task_t *task, nxt_runtime_t *rt)
{
nxt_sockaddr_t *sa;
nxt_listen_socket_t *ls;
sa = nxt_sockaddr_alloc(rt->mem_pool, sizeof(struct sockaddr_in),
NXT_INET_ADDR_STR_LEN);
if (sa == NULL) {
return NXT_ERROR;
}
sa->type = SOCK_STREAM;
sa->u.sockaddr_in.sin_family = AF_INET;
sa->u.sockaddr_in.sin_port = htons(8000);
nxt_sockaddr_text(sa);
ls = nxt_runtime_listen_socket_add(rt, sa);
if (ls == NULL) {
return NXT_ERROR;
}
ls->read_after_accept = 1;
ls->flags = NXT_NONBLOCK;
#if 0
/* STUB */
wq = nxt_mem_zalloc(cf->mem_pool, sizeof(nxt_work_queue_t));
if (wq == NULL) {
return NXT_ERROR;
}
nxt_work_queue_name(wq, "listen");
/**/
ls->work_queue = wq;
#endif
ls->handler = nxt_stream_connection_init;
/*
* Connection memory pool chunk size is tunned to
* allocate the most data in one mem_pool chunk.
*/
ls->mem_pool_size = nxt_listen_socket_pool_min_size(ls)
+ sizeof(nxt_event_conn_proxy_t)
+ sizeof(nxt_event_conn_t)
+ 4 * sizeof(nxt_buf_t);
if (nxt_listen_socket_create(task, ls, 0) != NXT_OK) {
return NXT_ERROR;
}
if (nxt_event_conn_listen(task, ls) != NXT_OK) {
return NXT_ERROR;
}
return NXT_OK;
}

1499
src/nxt_runtime.c Normal file

File diff suppressed because it is too large Load Diff

108
src/nxt_runtime.h Normal file
View File

@@ -0,0 +1,108 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) Valentin V. Bartenev
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_RUNTIME_H_INCLUDED_
#define _NXT_RUNTIME_H_INCLUDED_
typedef void (*nxt_runtime_cont_t)(nxt_task_t *task);
struct nxt_runtime_s {
nxt_mem_pool_t *mem_pool;
nxt_array_t *inherited_sockets; /* of nxt_listen_socket_t */
nxt_array_t *listen_sockets; /* of nxt_listen_socket_t */
nxt_array_t *services; /* of nxt_service_t */
nxt_array_t *engines; /* of nxt_event_engine_t */
nxt_runtime_cont_t start;
nxt_str_t *conf_prefix;
nxt_str_t *prefix;
nxt_str_t hostname;
nxt_file_name_t *pid_file;
#if (NXT_THREADS)
nxt_array_t *thread_pools; /* of nxt_thread_pool_t */
nxt_runtime_cont_t continuation;
#endif
nxt_array_t *processes; /* of nxt_process_t */
nxt_list_t *log_files; /* of nxt_file_t */
uint32_t last_engine_id;
nxt_process_type_t type;
nxt_timer_t timer;
uint8_t daemon;
uint8_t batch;
uint8_t master_process;
const char *engine;
uint32_t engine_connections;
uint32_t worker_processes;
uint32_t auxiliary_threads;
nxt_user_cred_t user_cred;
const char *group;
const char *pid;
const char *error_log;
nxt_sockaddr_t *controller_listen;
nxt_listen_socket_t *controller_socket;
nxt_str_t upstream;
};
typedef nxt_int_t (*nxt_module_init_t)(nxt_thread_t *thr, nxt_runtime_t *rt);
nxt_int_t nxt_runtime_create(nxt_task_t *task);
void nxt_runtime_quit(nxt_task_t *task);
void nxt_runtime_event_engine_free(nxt_runtime_t *rt);
#if (NXT_THREADS)
nxt_int_t nxt_runtime_thread_pool_create(nxt_thread_t *thr, nxt_runtime_t *rt,
nxt_uint_t max_threads, nxt_nsec_t timeout);
#endif
nxt_process_t *nxt_runtime_new_process(nxt_runtime_t *rt);
/* STUB */
nxt_int_t nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt);
nxt_str_t *nxt_current_directory(nxt_mem_pool_t *mp);
nxt_listen_socket_t *nxt_runtime_listen_socket_add(nxt_runtime_t *rt,
nxt_sockaddr_t *sa);
nxt_int_t nxt_runtime_listen_sockets_create(nxt_task_t *task,
nxt_runtime_t *rt);
nxt_int_t nxt_runtime_listen_sockets_enable(nxt_task_t *task,
nxt_runtime_t *rt);
nxt_file_t *nxt_runtime_log_file_add(nxt_runtime_t *rt, nxt_str_t *name);
/* STUB */
void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log,
const char *fmt, ...);
void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt);
extern nxt_module_init_t nxt_init_modules[];
extern nxt_uint_t nxt_init_modules_n;
#endif /* _NXT_RUNTIME_H_INCLIDED_ */

View File

@@ -8,11 +8,11 @@
#define _NXT_SIGNAL_H_INCLUDED_
typedef struct {
struct nxt_sig_event_s {
int signo;
nxt_work_handler_t handler;
const char *name;
} nxt_sig_event_t;
};
#define nxt_event_signal(sig, handler) \
{ sig, handler, #sig }

View File

@@ -5,7 +5,7 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
static void nxt_stream_connection_peer(nxt_task_t *task,
@@ -17,7 +17,7 @@ static void nxt_stream_connection_close(nxt_task_t *task, void *obj,
void
nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
{
nxt_cycle_t *cycle;
nxt_runtime_t *rt;
nxt_event_conn_t *c;
nxt_upstream_peer_t *up;
@@ -32,10 +32,10 @@ nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data)
up->data = c;
cycle = nxt_thread_cycle();
rt = task->thread->runtime;
if (cycle->upstream.length != 0) {
up->addr = cycle->upstream;
if (rt->upstream.length != 0) {
up->addr = rt->upstream;
} else {
nxt_str_set(&up->addr, "127.0.0.1:8080");

View File

@@ -178,6 +178,7 @@ struct nxt_thread_s {
nxt_thread_time_t time;
nxt_runtime_t *runtime;
nxt_event_engine_t *engine;
/*

View File

@@ -110,7 +110,7 @@ nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
/* STUB */
up->sockaddr = peer[0].sockaddr;
nxt_job_destroy(jbs);
nxt_job_destroy(task, jbs);
up->ready_handler(task, up);
//nxt_upstream_round_robin_get_peer(up);
@@ -118,7 +118,7 @@ nxt_upstream_round_robin_create(nxt_task_t *task, void *obj, void *data)
fail:
nxt_job_destroy(jbs);
nxt_job_destroy(task, jbs);
up->ready_handler(task, up);
}

View File

@@ -28,7 +28,7 @@ struct nxt_task_s {
* A work handler with just the obj and data arguments instead
* of pointer to a possibly large a work struct allows to call
* the handler not only via a work queue but also directly.
* The only obj argument is enough for the most cases expect the
* The only obj argument is enough for the most cases except the
* source filters, so the data argument has been introduced and
* is used where appropriate.
*/

View File

@@ -5,7 +5,7 @@
*/
#include <nxt_main.h>
#include <nxt_cycle.h>
#include <nxt_runtime.h>
#include <nxt_port.h>
#include <nxt_master_process.h>
@@ -21,7 +21,7 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
static nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
@@ -29,112 +29,29 @@ static nxt_port_handler_t nxt_worker_process_port_handlers[] = {
};
static const nxt_sig_event_t nxt_worker_process_signals[] = {
const nxt_sig_event_t nxt_worker_process_signals[] = {
nxt_event_signal(SIGHUP, nxt_worker_process_signal_handler),
nxt_event_signal(SIGINT, nxt_worker_process_sigterm_handler),
nxt_event_signal(SIGQUIT, nxt_worker_process_sigterm_handler),
nxt_event_signal(SIGTERM, nxt_worker_process_sigquit_handler),
nxt_event_signal(SIGCHLD, nxt_worker_process_signal_handler),
nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler),
nxt_event_signal(SIGUSR1, nxt_worker_process_signal_handler),
nxt_event_signal(SIGUSR2, nxt_worker_process_signal_handler),
nxt_event_signal_end,
};
void
nxt_worker_process_start(void *data)
{
nxt_int_t n;
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_thread_t *thr;
const nxt_event_interface_t *interface;
cycle = data;
nxt_thread_init_data(nxt_thread_cycle_data);
nxt_thread_cycle_set(cycle);
thr = nxt_thread();
nxt_log_error(NXT_LOG_INFO, thr->log, "worker process");
nxt_process_title("nginext: worker process");
cycle->type = NXT_PROCESS_WORKER;
nxt_random_init(&nxt_random_data);
if (getuid() == 0) {
/* Super-user. */
n = nxt_user_cred_set(&cycle->user_cred);
if (n != NXT_OK) {
goto fail;
}
}
/* Update inherited master process event engine and signals processing. */
thr->engine->signals->sigev = nxt_worker_process_signals;
interface = nxt_service_get(cycle->services, "engine", cycle->engine);
if (interface == NULL) {
goto fail;
}
if (nxt_event_engine_change(thr, &nxt_main_task, interface, cycle->batch) != NXT_OK) {
goto fail;
}
if (nxt_cycle_listen_sockets_enable(&thr->engine->task, cycle) != NXT_OK) {
goto fail;
}
port = cycle->ports->elts;
/* A master process port. */
nxt_port_read_close(&port[0]);
nxt_port_write_enable(&nxt_main_task, &port[0]);
/* A worker process port. */
nxt_port_create(thr, &port[cycle->current_process],
nxt_worker_process_port_handlers);
#if (NXT_THREADS)
{
nxt_int_t ret;
ret = nxt_cycle_thread_pool_create(thr, cycle, cycle->auxiliary_threads,
60000 * 1000000LL);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
}
nxt_app_start(cycle);
#endif
return;
fail:
exit(1);
nxt_unreachable();
}
static void
nxt_worker_process_quit(nxt_task_t *task)
{
nxt_uint_t n;
nxt_cycle_t *cycle;
nxt_queue_t *listen;
nxt_runtime_t *rt;
nxt_queue_link_t *link, *next;
nxt_listen_socket_t *ls;
nxt_event_conn_listen_t *cls;
cycle = nxt_thread_cycle();
rt = task->thread->runtime;
nxt_debug(task, "close listen connections");
@@ -151,10 +68,10 @@ nxt_worker_process_quit(nxt_task_t *task)
nxt_fd_event_close(task->thread->engine, &cls->socket);
}
if (cycle->listen_sockets != NULL) {
if (rt->listen_sockets != NULL) {
ls = cycle->listen_sockets->elts;
n = cycle->listen_sockets->nelts;
ls = rt->listen_sockets->elts;
n = rt->listen_sockets->nelts;
while (n != 0) {
nxt_socket_close(task, ls->socket);
@@ -164,10 +81,10 @@ nxt_worker_process_quit(nxt_task_t *task)
n--;
}
cycle->listen_sockets->nelts = 0;
rt->listen_sockets->nelts = 0;
}
nxt_cycle_quit(task, cycle);
nxt_runtime_quit(task);
}
@@ -194,7 +111,7 @@ nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj, void *data)
/* A fast exit. */
nxt_cycle_quit(task, NULL);
nxt_runtime_quit(task);
}