New process port exchange changed. READY message type introduced.

Application process start request DATA message from router to master.
Master notifies router via NEW_PORT message after worker process become ready.
This commit is contained in:
Max Romanov
2017-07-12 20:32:16 +03:00
parent c38bcb7d70
commit b0c1e740cf
23 changed files with 1234 additions and 393 deletions

View File

@@ -9,9 +9,10 @@
#include <nxt_main.h> #include <nxt_main.h>
#include <nxt_runtime.h> #include <nxt_runtime.h>
#include <nxt_application.h> #include <nxt_application.h>
#include <nxt_master_process.h>
nxt_application_module_t *nxt_app; nxt_application_module_t *nxt_app_modules[NXT_APP_MAX];
static nxt_thread_mutex_t nxt_app_mutex; static nxt_thread_mutex_t nxt_app_mutex;
static nxt_thread_cond_t nxt_app_cond; static nxt_thread_cond_t nxt_app_cond;
@@ -19,9 +20,16 @@ static nxt_thread_cond_t nxt_app_cond;
static nxt_http_fields_hash_entry_t nxt_app_request_fields[]; static nxt_http_fields_hash_entry_t nxt_app_request_fields[];
static nxt_http_fields_hash_t *nxt_app_request_fields_hash; static nxt_http_fields_hash_t *nxt_app_request_fields_hash;
static nxt_application_module_t *nxt_app;
nxt_int_t nxt_int_t
nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_app_start(nxt_task_t *task, void *data)
{ {
nxt_int_t ret;
nxt_common_app_conf_t *app_conf;
app_conf = data;
if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) { if (nxt_slow_path(nxt_thread_mutex_create(&nxt_app_mutex) != NXT_OK)) {
return NXT_ERROR; return NXT_ERROR;
} }
@@ -30,11 +38,18 @@ nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR; return NXT_ERROR;
} }
if (nxt_slow_path(nxt_app->init(task) != NXT_OK)) { nxt_app = nxt_app_modules[app_conf->type_id];
ret = nxt_app->init(task, data);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_debug(task, "application init failed"); nxt_debug(task, "application init failed");
} else {
nxt_debug(task, "application init done");
} }
return NXT_OK; return ret;
} }
@@ -570,3 +585,36 @@ nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
return NXT_OK; return NXT_OK;
} }
nxt_app_type_t
nxt_app_parse_type(nxt_str_t *str)
{
if (nxt_str_eq(str, "python", 6)) {
return NXT_APP_PYTHON;
} else if (nxt_str_eq(str, "python2", 7)) {
return NXT_APP_PYTHON2;
} else if (nxt_str_eq(str, "python3", 7)) {
return NXT_APP_PYTHON3;
} else if (nxt_str_eq(str, "php", 3)) {
return NXT_APP_PHP;
} else if (nxt_str_eq(str, "php5", 4)) {
return NXT_APP_PHP5;
} else if (nxt_str_eq(str, "php7", 4)) {
return NXT_APP_PHP7;
} else if (nxt_str_eq(str, "ruby", 4)) {
return NXT_APP_RUBY;
} else if (nxt_str_eq(str, "go", 2)) {
return NXT_APP_GO;
}
return NXT_APP_UNKNOWN;
}

View File

@@ -10,13 +10,57 @@
typedef enum { typedef enum {
NXT_APP_PYTHON = 0, NXT_APP_UNKNOWN = 0,
NXT_APP_PYTHON,
NXT_APP_PYTHON2,
NXT_APP_PYTHON3,
NXT_APP_PHP, NXT_APP_PHP,
NXT_APP_PHP5,
NXT_APP_PHP7,
NXT_APP_RUBY, NXT_APP_RUBY,
NXT_APP_GO, NXT_APP_GO,
NXT_APP_MAX,
} nxt_app_type_t; } nxt_app_type_t;
typedef struct nxt_common_app_conf_s nxt_common_app_conf_t;
typedef struct {
nxt_str_t path;
nxt_str_t module;
} nxt_python_app_conf_t;
typedef struct {
nxt_str_t root;
nxt_str_t script;
nxt_str_t index;
} nxt_php_app_conf_t;
typedef struct {
nxt_str_t executable;
} nxt_go_app_conf_t;
struct nxt_common_app_conf_s {
nxt_str_t type;
nxt_app_type_t type_id;
nxt_str_t user;
nxt_str_t group;
uint32_t workers;
union {
nxt_python_app_conf_t python;
nxt_php_app_conf_t php;
nxt_go_app_conf_t go;
} u;
};
typedef struct { typedef struct {
nxt_str_t name; nxt_str_t name;
nxt_str_t value; nxt_str_t value;
@@ -138,18 +182,22 @@ nxt_int_t nxt_app_msg_read_size(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
size_t *size); size_t *size);
typedef struct { typedef struct nxt_app_module_s nxt_application_module_t;
nxt_int_t (*init)(nxt_task_t *task); typedef struct nxt_app_module_s nxt_app_module_t;
struct nxt_app_module_s {
nxt_int_t (*init)(nxt_task_t *task,
nxt_common_app_conf_t *conf);
nxt_int_t (*prepare_msg)(nxt_task_t *task, nxt_int_t (*prepare_msg)(nxt_task_t *task,
nxt_app_request_t *r, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg); nxt_app_wmsg_t *wmsg);
nxt_int_t (*run)(nxt_task_t *task, nxt_int_t (*run)(nxt_task_t *task,
nxt_app_rmsg_t *rmsg, nxt_app_rmsg_t *rmsg,
nxt_app_wmsg_t *wmsg); nxt_app_wmsg_t *wmsg);
} nxt_application_module_t; };
extern nxt_application_module_t *nxt_app; extern nxt_application_module_t *nxt_app_modules[NXT_APP_MAX];
@@ -227,4 +275,6 @@ nxt_app_msg_read_length(u_char *src, size_t *length)
} }
nxt_app_type_t nxt_app_parse_type(nxt_str_t *str);
#endif /* _NXT_APPLICATION_H_INCLIDED_ */ #endif /* _NXT_APPLICATION_H_INCLIDED_ */

View File

@@ -258,5 +258,20 @@ nxt_buf_free(mp, b) \
NXT_EXPORT void nxt_buf_chain_add(nxt_buf_t **head, nxt_buf_t *in); NXT_EXPORT void nxt_buf_chain_add(nxt_buf_t **head, nxt_buf_t *in);
NXT_EXPORT size_t nxt_buf_chain_length(nxt_buf_t *b); NXT_EXPORT size_t nxt_buf_chain_length(nxt_buf_t *b);
nxt_inline nxt_buf_t *
nxt_buf_cpy(nxt_buf_t *b, const void *src, size_t length)
{
nxt_memcpy(b->mem.free, src, length);
b->mem.free += length;
return b;
}
nxt_inline nxt_buf_t *
nxt_buf_cpystr(nxt_buf_t *b, const nxt_str_t *str)
{
return nxt_buf_cpy(b, str->start, str->length);
}
#endif /* _NXT_BUF_H_INCLIDED_ */ #endif /* _NXT_BUF_H_INCLIDED_ */

View File

@@ -187,8 +187,11 @@ typedef uint32_t nxt_req_id_t;
typedef struct { typedef struct {
nxt_req_id_t req_id; nxt_req_id_t req_id;
nxt_conn_t *conn; nxt_conn_t *conn;
nxt_port_t *app_port;
nxt_port_t *reply_port;
nxt_queue_link_t link; nxt_queue_link_t link; /* for nxt_conn_t.requests */
nxt_queue_link_t app_link; /* for nxt_app_t.requests */
} nxt_req_conn_link_t; } nxt_req_conn_link_t;

View File

@@ -89,15 +89,18 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_int_t nxt_int_t
nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_controller_start(nxt_task_t *task, void *data)
{ {
nxt_mp_t *mp; nxt_mp_t *mp;
nxt_runtime_t *rt;
nxt_conf_value_t *conf; nxt_conf_value_t *conf;
nxt_http_fields_hash_t *hash; nxt_http_fields_hash_t *hash;
static const nxt_str_t json static const nxt_str_t json
= nxt_string("{ \"listeners\": {}, \"applications\": {} }"); = nxt_string("{ \"listeners\": {}, \"applications\": {} }");
rt = task->thread->runtime;
hash = nxt_http_fields_hash_create(nxt_controller_request_fields, hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
rt->mem_pool); rt->mem_pool);
if (nxt_slow_path(hash == NULL)) { if (nxt_slow_path(hash == NULL)) {
@@ -853,13 +856,7 @@ nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
rt = task->thread->runtime; rt = task->thread->runtime;
nxt_runtime_port_each(rt, port) { port = rt->port_by_type[NXT_PROCESS_ROUTER];
if (port->type == NXT_PROCESS_ROUTER) {
break;
}
} nxt_runtime_port_loop;
size = nxt_conf_json_length(conf, NULL); size = nxt_conf_json_length(conf, NULL);

View File

@@ -162,4 +162,17 @@ NXT_EXPORT extern nxt_log_t nxt_main_log;
NXT_EXPORT extern nxt_str_t nxt_log_levels[]; NXT_EXPORT extern nxt_str_t nxt_log_levels[];
#define nxt_assert(c) \
do { \
if (nxt_fast_path(c)) { \
break; \
\
} else { \
nxt_thread_log_alert("%s:%d assertion failed: %s", \
__FILE__, __LINE__, #c ); \
nxt_abort(); \
} \
} while (0)
#endif /* _NXT_LOG_H_INCLUDED_ */ #endif /* _NXT_LOG_H_INCLUDED_ */

View File

@@ -8,6 +8,8 @@
#include <nxt_runtime.h> #include <nxt_runtime.h>
#include <nxt_port.h> #include <nxt_port.h>
#include <nxt_master_process.h> #include <nxt_master_process.h>
#include <nxt_conf.h>
#include <nxt_application.h>
static nxt_int_t nxt_master_process_port_create(nxt_task_t *task, static nxt_int_t nxt_master_process_port_create(nxt_task_t *task,
@@ -17,8 +19,8 @@ static nxt_int_t nxt_master_start_controller_process(nxt_task_t *task,
nxt_runtime_t *rt); nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_router_process(nxt_task_t *task, static nxt_int_t nxt_master_start_router_process(nxt_task_t *task,
nxt_runtime_t *rt); nxt_runtime_t *rt);
static nxt_int_t nxt_master_start_worker_processes(nxt_task_t *task, static nxt_int_t nxt_master_start_worker_process(nxt_task_t *task,
nxt_runtime_t *rt); nxt_runtime_t *rt, nxt_common_app_conf_t *app_conf, uint32_t stream);
static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task, static nxt_int_t nxt_master_create_worker_process(nxt_task_t *task,
nxt_runtime_t *rt, nxt_process_init_t *init); nxt_runtime_t *rt, nxt_process_init_t *init);
static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj, static void nxt_master_process_sigterm_handler(nxt_task_t *task, void *obj,
@@ -64,14 +66,134 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
return ret; return ret;
} }
ret = nxt_master_start_router_process(task, rt); return nxt_master_start_router_process(task, rt);
if (ret != NXT_OK) {
return ret;
} }
return nxt_master_start_worker_processes(task, rt);
static nxt_conf_map_t nxt_common_app_conf[] = {
{
nxt_string("type"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, type),
},
{
nxt_string("user"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, user),
},
{
nxt_string("group"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, group),
},
{
nxt_string("workers"),
NXT_CONF_MAP_INT32,
offsetof(nxt_common_app_conf_t, workers),
},
{
nxt_string("path"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.python.path),
},
{
nxt_string("module"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.python.module),
},
{
nxt_string("root"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.php.root),
},
{
nxt_string("script"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.php.script),
},
{
nxt_string("index"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.php.index),
},
{
nxt_string("executable"),
NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.go.executable),
},
};
static void
nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
size_t dump_size;
nxt_mp_t *mp;
nxt_int_t ret;
nxt_buf_t *b;
nxt_conf_value_t *conf;
nxt_common_app_conf_t app_conf;
static nxt_str_t nobody = nxt_string("nobody");
b = msg->buf;
dump_size = b->mem.free - b->mem.pos;
if (dump_size > 300) {
dump_size = 300;
} }
nxt_debug(task, "master data: %*s", dump_size, b->mem.pos);
mp = nxt_mp_create(1024, 128, 256, 32);
conf = nxt_conf_json_parse(mp, b->mem.pos, b->mem.free);
b->mem.pos = b->mem.free;
if (conf == NULL) {
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
return;
}
nxt_memzero(&app_conf, sizeof(nxt_common_app_conf_t));
app_conf.user = nobody;
ret = nxt_conf_map_object(conf, nxt_common_app_conf,
nxt_nitems(nxt_common_app_conf), &app_conf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "root map error");
return;
}
app_conf.type_id = nxt_app_parse_type(&app_conf.type);
ret = nxt_master_start_worker_process(task, task->thread->runtime,
&app_conf, msg->port_msg.stream);
nxt_mp_destroy(mp);
}
static nxt_port_handler_t nxt_master_process_port_handlers[] = {
NULL,
nxt_port_new_port_handler,
NULL,
nxt_port_mmap_handler,
nxt_port_master_data_handler,
NULL,
nxt_port_ready_handler,
};
static nxt_int_t static nxt_int_t
nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt) nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
@@ -85,7 +207,8 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return NXT_ERROR; return NXT_ERROR;
} }
port = nxt_process_port_new(process); port = nxt_process_port_new(rt, process, nxt_port_get_next_id(),
NXT_PROCESS_MASTER);
if (nxt_slow_path(port == NULL)) { if (nxt_slow_path(port == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
@@ -95,16 +218,15 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
return ret; return ret;
} }
port->engine = 0;
port->type = NXT_PROCESS_MASTER;
nxt_runtime_port_add(rt, port); nxt_runtime_port_add(rt, port);
/* /*
* A master process port. A write port is not closed * A master process port. A write port is not closed
* since it should be inherited by worker processes. * since it should be inherited by worker processes.
*/ */
nxt_port_read_enable(task, port); nxt_port_enable(task, port, nxt_master_process_port_handlers);
process->ready = 1;
return NXT_OK; return NXT_OK;
} }
@@ -148,6 +270,9 @@ nxt_master_start_controller_process(nxt_task_t *task, nxt_runtime_t *rt)
init->port_handlers = nxt_controller_process_port_handlers; init->port_handlers = nxt_controller_process_port_handlers;
init->signals = nxt_worker_process_signals; init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_CONTROLLER; init->type = NXT_PROCESS_CONTROLLER;
init->data = rt;
init->stream = 0;
init->restart = 1;
return nxt_master_create_worker_process(task, rt, init); return nxt_master_create_worker_process(task, rt, init);
} }
@@ -169,41 +294,45 @@ nxt_master_start_router_process(nxt_task_t *task, nxt_runtime_t *rt)
init->port_handlers = nxt_router_process_port_handlers; init->port_handlers = nxt_router_process_port_handlers;
init->signals = nxt_worker_process_signals; init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_ROUTER; init->type = NXT_PROCESS_ROUTER;
init->data = rt;
init->stream = 0;
init->restart = 1;
return nxt_master_create_worker_process(task, rt, init); return nxt_master_create_worker_process(task, rt, init);
} }
static nxt_int_t static nxt_int_t
nxt_master_start_worker_processes(nxt_task_t *task, nxt_runtime_t *rt) nxt_master_start_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
nxt_common_app_conf_t *app_conf, uint32_t stream)
{ {
nxt_int_t ret;
nxt_uint_t n;
nxt_process_init_t *init; nxt_process_init_t *init;
init = nxt_mp_get(rt->mem_pool, sizeof(nxt_process_init_t)); init = nxt_malloc(sizeof(nxt_process_init_t) +
sizeof(nxt_user_cred_t));
if (nxt_slow_path(init == NULL)) { if (nxt_slow_path(init == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
init->user_cred = (nxt_user_cred_t *) (init + 1);
init->start = nxt_app_start; init->start = nxt_app_start;
init->name = "worker process"; init->name = "worker process";
init->user_cred = &rt->user_cred;
init->user_cred->user = (char *) app_conf->user.start;
if (nxt_user_cred_get(task, init->user_cred,
(char *) app_conf->group.start) != NXT_OK) {
return NXT_ERROR;
}
init->port_handlers = nxt_app_process_port_handlers; init->port_handlers = nxt_app_process_port_handlers;
init->signals = nxt_worker_process_signals; init->signals = nxt_worker_process_signals;
init->type = NXT_PROCESS_WORKER; init->type = NXT_PROCESS_WORKER;
init->data = app_conf;
init->stream = stream;
init->restart = 0;
n = rt->worker_processes; return nxt_master_create_worker_process(task, rt, init);
while (n-- != 0) {
ret = nxt_master_create_worker_process(task, rt, init);
if (ret != NXT_OK) {
return ret;
}
}
return NXT_OK;
} }
@@ -214,7 +343,7 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
nxt_int_t ret; nxt_int_t ret;
nxt_pid_t pid; nxt_pid_t pid;
nxt_port_t *port; nxt_port_t *port;
nxt_process_t *process, *master_process; nxt_process_t *process;
/* /*
* TODO: remove process, init, ports from array on memory and fork failures. * TODO: remove process, init, ports from array on memory and fork failures.
@@ -226,24 +355,20 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
} }
process->init = init; process->init = init;
master_process = rt->mprocess;
init->master_port = nxt_process_port_first(master_process);
port = nxt_process_port_new(process); port = nxt_process_port_new(rt, process, 0, init->type);
if (nxt_slow_path(port == NULL)) { if (nxt_slow_path(port == NULL)) {
nxt_runtime_process_destroy(rt, process);
return NXT_ERROR; return NXT_ERROR;
} }
init->port = port;
ret = nxt_port_socket_init(task, port, 0); ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
nxt_mp_free(rt->mem_pool, port);
nxt_runtime_process_destroy(rt, process);
return ret; return ret;
} }
port->engine = 0;
port->type = init->type;
pid = nxt_process_create(task, process); pid = nxt_process_create(task, process);
switch (pid) { switch (pid) {
@@ -261,7 +386,6 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_read_close(port); nxt_port_read_close(port);
nxt_port_write_enable(task, port); nxt_port_write_enable(task, port);
nxt_port_send_new_port(task, rt, port);
return NXT_OK; return NXT_OK;
} }
} }
@@ -503,7 +627,12 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
} }
} else if (init != NULL) { } else if (init != NULL) {
if (init->restart != 0) {
(void) nxt_master_create_worker_process(task, rt, init); (void) nxt_master_create_worker_process(task, rt, init);
} else {
nxt_free(init);
}
} }
} }
} }

View File

@@ -12,9 +12,9 @@ nxt_int_t nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
nxt_runtime_t *runtime); nxt_runtime_t *runtime);
void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *runtime); void nxt_master_stop_worker_processes(nxt_task_t *task, nxt_runtime_t *runtime);
nxt_int_t nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt); nxt_int_t nxt_controller_start(nxt_task_t *task, void *data);
nxt_int_t nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt); nxt_int_t nxt_router_start(nxt_task_t *task, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, void *data);
extern nxt_port_handler_t nxt_controller_process_port_handlers[]; extern nxt_port_handler_t nxt_controller_process_port_handlers[];
extern nxt_port_handler_t nxt_worker_process_port_handlers[]; extern nxt_port_handler_t nxt_worker_process_port_handlers[];

View File

@@ -917,7 +917,7 @@ nxt_mp_retain(nxt_mp_t *mp, size_t size)
} }
void uint32_t
nxt_mp_release(nxt_mp_t *mp, void *p) nxt_mp_release(nxt_mp_t *mp, void *p)
{ {
nxt_mp_free(mp, p); nxt_mp_free(mp, p);
@@ -928,7 +928,11 @@ nxt_mp_release(nxt_mp_t *mp, void *p)
if (mp->retain == 0) { if (mp->retain == 0) {
nxt_mp_destroy(mp); nxt_mp_destroy(mp);
return 0;
} }
return mp->retain;
} }

View File

@@ -87,7 +87,7 @@ NXT_EXPORT void *nxt_mp_retain(nxt_mp_t *mp, size_t size)
* nxt_mp_release() returns freeable memory and decreases memory pool * nxt_mp_release() returns freeable memory and decreases memory pool
* retention counter. If the counter becomes zero the pool is destroyed. * retention counter. If the counter becomes zero the pool is destroyed.
*/ */
NXT_EXPORT void nxt_mp_release(nxt_mp_t *mp, void *p); NXT_EXPORT uint32_t nxt_mp_release(nxt_mp_t *mp, void *p);
/* nxt_mp_nget() returns non-aligned non-freeable memory. */ /* nxt_mp_nget() returns non-aligned non-freeable memory. */

View File

@@ -11,13 +11,28 @@
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
static nxt_atomic_uint_t nxt_port_last_id;
nxt_port_id_t
nxt_port_get_next_id()
{
return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
}
void void
nxt_port_create(nxt_task_t *task, nxt_port_t *port, nxt_port_reset_next_id()
{
nxt_port_last_id = 1;
}
void
nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handler_t *handlers) nxt_port_handler_t *handlers)
{ {
port->pid = nxt_pid; port->pid = nxt_pid;
port->engine = task->thread->engine->id; port->engine = task->thread->engine;
port->handler = nxt_port_handler; port->handler = nxt_port_handler;
port->data = handlers; port->data = handlers;
@@ -77,12 +92,13 @@ nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void void
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *new_port) nxt_port_t *new_port, uint32_t stream)
{ {
nxt_port_t *port;
nxt_process_t *process; nxt_process_t *process;
nxt_debug(task, "new port %d for process %PI engine %uD", nxt_debug(task, "new port %d for process %PI",
new_port->pair[1], new_port->pid, new_port->engine); new_port->pair[1], new_port->pid);
nxt_runtime_process_each(rt, process) nxt_runtime_process_each(rt, process)
{ {
@@ -90,15 +106,22 @@ nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
continue; continue;
} }
(void) nxt_port_send_port(task, nxt_process_port_first(process), port = nxt_process_port_first(process);
new_port);
if (port->type == NXT_PROCESS_MASTER ||
port->type == NXT_PROCESS_CONTROLLER ||
port->type == NXT_PROCESS_ROUTER) {
(void) nxt_port_send_port(task, port, new_port, stream);
}
} }
nxt_runtime_process_loop; nxt_runtime_process_loop;
} }
nxt_int_t nxt_int_t
nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port) nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
uint32_t stream)
{ {
nxt_buf_t *b; nxt_buf_t *b;
nxt_port_msg_new_port_t *msg; nxt_port_msg_new_port_t *msg;
@@ -116,13 +139,12 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port)
msg->id = new_port->id; msg->id = new_port->id;
msg->pid = new_port->pid; msg->pid = new_port->pid;
msg->engine = new_port->engine;
msg->max_size = port->max_size; msg->max_size = port->max_size;
msg->max_share = port->max_share; msg->max_share = port->max_share;
msg->type = new_port->type; msg->type = new_port->type;
return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT, return nxt_port_socket_write(task, port, NXT_PORT_MSG_NEW_PORT,
new_port->pair[1], 0, 0, b); new_port->pair[1], stream, 0, b);
} }
@@ -140,12 +162,26 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos; new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
msg->buf->mem.pos = msg->buf->mem.free; msg->buf->mem.pos = msg->buf->mem.free;
nxt_debug(task, "new port %d received for process %PI:%d",
msg->fd, new_port_msg->pid, new_port_msg->id);
port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
if (port != NULL) {
nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
new_port_msg->id);
nxt_fd_close(msg->fd);
msg->fd = -1;
return;
}
process = nxt_runtime_process_get(rt, new_port_msg->pid); process = nxt_runtime_process_get(rt, new_port_msg->pid);
if (nxt_slow_path(process == NULL)) { if (nxt_slow_path(process == NULL)) {
return; return;
} }
port = nxt_process_port_new(process); port = nxt_process_port_new(rt, process, new_port_msg->id,
new_port_msg->type);
if (nxt_slow_path(port == NULL)) { if (nxt_slow_path(port == NULL)) {
return; return;
} }
@@ -157,16 +193,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
port->mem_pool = mp; port->mem_pool = mp;
nxt_debug(task, "new port %d received for process %PI engine %uD",
msg->fd, new_port_msg->pid, new_port_msg->engine);
port->id = new_port_msg->id;
port->engine = new_port_msg->engine;
port->pair[0] = -1; port->pair[0] = -1;
port->pair[1] = msg->fd; port->pair[1] = msg->fd;
port->max_size = new_port_msg->max_size; port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share; port->max_share = new_port_msg->max_share;
port->type = new_port_msg->type;
nxt_queue_init(&port->messages); nxt_queue_init(&port->messages);
@@ -175,6 +205,37 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_runtime_port_add(rt, port); nxt_runtime_port_add(rt, port);
nxt_port_write_enable(task, port); nxt_port_write_enable(task, port);
msg->new_port = port;
}
void
nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
nxt_process_t *process;
nxt_runtime_t *rt;
rt = task->thread->runtime;
process = nxt_runtime_process_get(rt, msg->port_msg.pid);
if (nxt_slow_path(process == NULL)) {
return;
}
process->ready = 1;
port = nxt_process_port_first(process);
if (nxt_slow_path(port == NULL)) {
return;
}
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
if (nxt_runtime_is_master(rt)) {
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
}
} }

View File

@@ -15,6 +15,7 @@ typedef enum {
NXT_PORT_MSG_MMAP, NXT_PORT_MSG_MMAP,
NXT_PORT_MSG_DATA, NXT_PORT_MSG_DATA,
NXT_PORT_MSG_REMOVE_PID, NXT_PORT_MSG_REMOVE_PID,
NXT_PORT_MSG_READY,
NXT_PORT_MSG_MAX, NXT_PORT_MSG_MAX,
} nxt_port_msg_type_t; } nxt_port_msg_type_t;
@@ -53,13 +54,19 @@ struct nxt_port_recv_msg_s {
nxt_port_t *port; nxt_port_t *port;
nxt_port_msg_t port_msg; nxt_port_msg_t port_msg;
size_t size; size_t size;
nxt_port_t *new_port;
}; };
typedef struct nxt_app_s nxt_app_t;
struct nxt_port_s { struct nxt_port_s {
nxt_fd_event_t socket; nxt_fd_event_t socket;
nxt_queue_link_t link; /* for nxt_process_t.ports */ nxt_queue_link_t link; /* for nxt_process_t.ports */
nxt_process_t *process;
nxt_queue_link_t app_link; /* for nxt_app_t.ports */
nxt_app_t *app;
nxt_queue_t messages; /* of nxt_port_send_msg_t */ nxt_queue_t messages; /* of nxt_port_send_msg_t */
@@ -69,25 +76,24 @@ struct nxt_port_s {
uint32_t max_share; uint32_t max_share;
nxt_port_handler_t handler; nxt_port_handler_t handler;
void *data; nxt_port_handler_t *data;
nxt_mp_t *mem_pool; nxt_mp_t *mem_pool;
nxt_event_engine_t *engine;
nxt_buf_t *free_bufs; nxt_buf_t *free_bufs;
nxt_socket_t pair[2]; nxt_socket_t pair[2];
nxt_port_id_t id; nxt_port_id_t id;
nxt_pid_t pid; nxt_pid_t pid;
uint32_t engine;
nxt_process_type_t type:8; nxt_process_type_t type;
nxt_process_t *process;
}; };
typedef struct { typedef struct {
nxt_port_id_t id; nxt_port_id_t id;
nxt_pid_t pid; nxt_pid_t pid;
uint32_t engine;
size_t max_size; size_t max_size;
size_t max_share; size_t max_share;
nxt_process_type_t type:8; nxt_process_type_t type:8;
@@ -104,6 +110,9 @@ typedef union {
} nxt_port_data_t; } nxt_port_data_t;
nxt_port_id_t nxt_port_get_next_id(void);
void nxt_port_reset_next_id(void);
nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
size_t max_size); size_t max_size);
void nxt_port_destroy(nxt_port_t *port); void nxt_port_destroy(nxt_port_t *port);
@@ -115,19 +124,20 @@ nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port,
nxt_buf_t *b); nxt_buf_t *b);
void nxt_port_create(nxt_task_t *task, nxt_port_t *port, void nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
nxt_port_handler_t *handlers); nxt_port_handler_t *handlers);
void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type, void nxt_port_write(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b); nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt, void nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
nxt_port_t *port); nxt_port_t *port, uint32_t stream);
nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_int_t nxt_port_send_port(nxt_task_t *task, nxt_port_t *port,
nxt_port_t *new_port); nxt_port_t *new_port, uint32_t stream);
void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, void nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt,
nxt_uint_t slot, nxt_fd_t fd); nxt_uint_t slot, nxt_fd_t fd);
void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_change_log_file_handler(nxt_task_t *task, void nxt_port_change_log_file_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg); nxt_port_recv_msg_t *msg);
void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);

View File

@@ -26,6 +26,56 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
} }
static nxt_array_t *
nxt_port_mmaps_create()
{
nxt_mp_t *mp;
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NULL;
}
return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
}
static nxt_port_mmap_t *
nxt_port_mmap_add(nxt_array_t *port_mmaps)
{
nxt_mp_thread_adopt(port_mmaps->mem_pool);
return nxt_array_zero_add(port_mmaps);
}
void
nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
{
uint32_t i;
nxt_port_mmap_t *port_mmap;
if (port_mmaps == NULL) {
return;
}
nxt_mp_thread_adopt(port_mmaps->mem_pool);
port_mmap = port_mmaps->elts;
for (i = 0; i < port_mmaps->nelts; i++) {
nxt_port_mmap_destroy(port_mmap);
}
port_mmaps->nelts = 0;
if (destroy_pool != 0) {
nxt_mp_destroy(port_mmaps->mem_pool);
}
}
static void static void
nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data) nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
{ {
@@ -63,6 +113,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
p = b->mem.pos - 1; p = b->mem.pos - 1;
c = nxt_port_mmap_chunk_id(hdr, p) + 1; c = nxt_port_mmap_chunk_id(hdr, p) + 1;
p = nxt_port_mmap_chunk_start(hdr, c); p = nxt_port_mmap_chunk_start(hdr, c);
} else { } else {
p = b->mem.start; p = b->mem.start;
c = nxt_port_mmap_chunk_id(hdr, p); c = nxt_port_mmap_chunk_id(hdr, p);
@@ -103,9 +154,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_thread_mutex_lock(&process->incoming_mutex); nxt_thread_mutex_lock(&process->incoming_mutex);
if (process->incoming == NULL) { if (process->incoming == NULL) {
process->incoming_mp = nxt_mp_create(1024, 128, 256, 32); process->incoming = nxt_port_mmaps_create();
process->incoming = nxt_array_create(process->incoming_mp, 1,
sizeof(nxt_port_mmap_t));
} }
if (nxt_slow_path(process->incoming == NULL)) { if (nxt_slow_path(process->incoming == NULL)) {
@@ -114,9 +163,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
goto fail; goto fail;
} }
nxt_mp_thread_adopt(process->incoming_mp); port_mmap = nxt_port_mmap_add(process->incoming);
port_mmap = nxt_array_zero_add(process->incoming);
if (nxt_slow_path(port_mmap == NULL)) { if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array"); nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
@@ -140,6 +187,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) { if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)", nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
port_mmap->hdr->id, process->incoming->nelts - 1); port_mmap->hdr->id, process->incoming->nelts - 1);
nxt_abort();
} }
fail: fail:
@@ -195,9 +243,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
port_mmap = NULL; port_mmap = NULL;
if (process->outgoing == NULL) { if (process->outgoing == NULL) {
process->outgoing_mp = nxt_mp_create(1024, 128, 256, 32); process->outgoing = nxt_port_mmaps_create();
process->outgoing = nxt_array_create(process->outgoing_mp, 1,
sizeof(nxt_port_mmap_t));
} }
if (nxt_slow_path(process->outgoing == NULL)) { if (nxt_slow_path(process->outgoing == NULL)) {
@@ -206,9 +252,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL; return NULL;
} }
nxt_mp_thread_adopt(process->outgoing_mp); port_mmap = nxt_port_mmap_add(process->outgoing);
port_mmap = nxt_array_zero_add(process->outgoing);
if (nxt_slow_path(port_mmap == NULL)) { if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN, nxt_log(task, NXT_LOG_WARN,
"failed to add port mmap to outgoing array"); "failed to add port mmap to outgoing array");

View File

@@ -15,6 +15,8 @@ typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
void void
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap); nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
void nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool);
/* /*
* Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem' * Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
* pointers to first available shared mem bucket(s). 'size' used as a hint to * pointers to first available shared mem bucket(s). 'size' used as a hint to

View File

@@ -121,7 +121,9 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.log = &nxt_main_log; port->socket.log = &nxt_main_log;
port->socket.write_ready = 1; port->socket.write_ready = 1;
port->socket.write_work_queue = &task->thread->engine->fast_work_queue; port->engine = task->thread->engine;
port->socket.write_work_queue = &port->engine->fast_work_queue;
port->socket.write_handler = nxt_port_write_handler; port->socket.write_handler = nxt_port_write_handler;
port->socket.error_handler = nxt_port_error_handler; port->socket.error_handler = nxt_port_error_handler;
} }
@@ -344,6 +346,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
} while (port->socket.write_ready); } while (port->socket.write_ready);
if (nxt_fd_event_is_disabled(port->socket.write)) { if (nxt_fd_event_is_disabled(port->socket.write)) {
/* TODO task->thread->engine or port->engine ? */
nxt_fd_event_enable_write(task->thread->engine, &port->socket); nxt_fd_event_enable_write(task->thread->engine, &port->socket);
} }
@@ -362,11 +365,13 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.fd = port->pair[0]; port->socket.fd = port->pair[0];
port->socket.log = &nxt_main_log; port->socket.log = &nxt_main_log;
port->socket.read_work_queue = &task->thread->engine->fast_work_queue; port->engine = task->thread->engine;
port->socket.read_work_queue = &port->engine->fast_work_queue;
port->socket.read_handler = nxt_port_read_handler; port->socket.read_handler = nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler; port->socket.error_handler = nxt_port_error_handler;
nxt_fd_event_enable_read(task->thread->engine, &port->socket); nxt_fd_event_enable_read(port->engine, &port->socket);
} }
@@ -389,6 +394,8 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
port = msg.port = nxt_container_of(obj, nxt_port_t, socket); port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
nxt_assert(port->engine == task->thread->engine);
for ( ;; ) { for ( ;; ) {
b = nxt_port_buf_alloc(port); b = nxt_port_buf_alloc(port);

View File

@@ -8,7 +8,7 @@
#include <nxt_master_process.h> #include <nxt_master_process.h>
static void nxt_process_start(nxt_task_t *task, nxt_process_init_t *process); static void nxt_process_start(nxt_task_t *task, nxt_process_t *process);
static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc); static nxt_int_t nxt_user_groups_get(nxt_task_t *task, nxt_user_cred_t *uc);
@@ -23,6 +23,7 @@ nxt_pid_t
nxt_process_create(nxt_task_t *task, nxt_process_t *process) nxt_process_create(nxt_task_t *task, nxt_process_t *process)
{ {
nxt_pid_t pid; nxt_pid_t pid;
nxt_process_t *p;
nxt_runtime_t *rt; nxt_runtime_t *rt;
rt = task->thread->runtime; rt = task->thread->runtime;
@@ -44,13 +45,31 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
task->thread->tid = 0; task->thread->tid = 0;
process->pid = nxt_pid; process->pid = nxt_pid;
process->init->port->pid = nxt_pid;
rt->types = 0; rt->types = 0;
nxt_port_reset_next_id();
/* Remove not ready processes */
nxt_runtime_process_each(rt, p) {
if (!p->ready) {
nxt_debug(task, "remove not ready process %PI", p->pid);
nxt_runtime_process_remove(rt, p);
} else {
nxt_port_mmaps_destroy(p->incoming, 0);
nxt_port_mmaps_destroy(p->outgoing, 0);
}
} nxt_runtime_process_loop;
nxt_runtime_process_add(rt, process); nxt_runtime_process_add(rt, process);
nxt_process_start(task, process->init); nxt_process_start(task, process);
process->ready = 1;
break; break;
default: default:
@@ -58,7 +77,6 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
nxt_debug(task, "fork(\"%s\"): %PI", process->init->name, pid); nxt_debug(task, "fork(\"%s\"): %PI", process->init->name, pid);
process->pid = pid; process->pid = pid;
process->init->port->pid = pid;
nxt_runtime_process_add(rt, process); nxt_runtime_process_add(rt, process);
@@ -70,26 +88,30 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
static void static void
nxt_process_start(nxt_task_t *task, nxt_process_init_t *process) nxt_process_start(nxt_task_t *task, nxt_process_t *process)
{ {
nxt_int_t ret; nxt_int_t ret;
nxt_port_t *port, *master_port;
nxt_thread_t *thread; nxt_thread_t *thread;
nxt_runtime_t *rt; nxt_runtime_t *rt;
nxt_process_init_t *init;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
const nxt_event_interface_t *interface; const nxt_event_interface_t *interface;
nxt_log(task, NXT_LOG_INFO, "%s started", process->name); init = process->init;
nxt_process_title(task, "nginext: %s", process->name); nxt_log(task, NXT_LOG_INFO, "%s started", init->name);
nxt_process_title(task, "nginext: %s", init->name);
thread = task->thread; thread = task->thread;
nxt_random_init(&thread->random); nxt_random_init(&thread->random);
if (process->user_cred != NULL && getuid() == 0) { if (init->user_cred != NULL && getuid() == 0) {
/* Super-user. */ /* Super-user. */
ret = nxt_user_cred_set(task, process->user_cred); ret = nxt_user_cred_set(task, init->user_cred);
if (ret != NXT_OK) { if (ret != NXT_OK) {
goto fail; goto fail;
} }
@@ -97,15 +119,15 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process)
rt = thread->runtime; rt = thread->runtime;
rt->types |= (1U << process->type); rt->types |= (1U << init->type);
engine = thread->engine; engine = thread->engine;
/* Update inherited master process event engine and signals processing. */ /* Update inherited master process event engine and signals processing. */
engine->signals->sigev = process->signals; engine->signals->sigev = init->signals;
interface = nxt_service_get(rt->services, "engine", rt->engine); interface = nxt_service_get(rt->services, "engine", rt->engine);
if (interface == NULL) { if (nxt_slow_path(interface == NULL)) {
goto fail; goto fail;
} }
@@ -113,25 +135,40 @@ nxt_process_start(nxt_task_t *task, nxt_process_init_t *process)
goto fail; goto fail;
} }
nxt_port_read_close(process->master_port);
nxt_port_write_enable(task, process->master_port);
/* A worker process port. */
nxt_port_write_close(process->port);
nxt_port_create(task, process->port, process->port_handlers);
ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads, ret = nxt_runtime_thread_pool_create(thread, rt, rt->auxiliary_threads,
60000 * 1000000LL); 60000 * 1000000LL);
if (ret != NXT_OK) { if (nxt_slow_path(ret != NXT_OK)) {
goto fail; goto fail;
} }
ret = process->start(task, rt); master_port = rt->port_by_type[NXT_PROCESS_MASTER];
if (ret == NXT_OK) { nxt_port_read_close(master_port);
return; nxt_port_write_enable(task, master_port);
port = nxt_process_port_first(process);
nxt_port_write_close(port);
ret = init->start(task, init->data);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
} }
nxt_port_enable(task, port, init->port_handlers);
ret = nxt_port_socket_write(task, master_port, NXT_PORT_MSG_READY,
-1, init->stream, 0, NULL);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_log(task, NXT_LOG_ERR, "failed to send READY message to master");
goto fail;
}
return;
fail: fail:
exit(1); exit(1);
@@ -527,15 +564,24 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
nxt_port_t * nxt_port_t *
nxt_process_port_new(nxt_process_t *process) nxt_process_port_new(nxt_runtime_t *rt, nxt_process_t *process,
nxt_port_id_t id, nxt_process_type_t type)
{ {
size_t size;
nxt_port_t *port; nxt_port_t *port;
port = nxt_mp_zalloc(process->mem_pool, sizeof(nxt_port_t)); size = sizeof(nxt_port_t);
if (size == NXT_PROCESS_WORKER) {
size += sizeof(nxt_work_t);
}
port = nxt_mp_zalloc(rt->mem_pool, size);
if (nxt_fast_path(port != NULL)) { if (nxt_fast_path(port != NULL)) {
port->id = process->last_port_id++; port->id = id;
port->pid = process->pid; port->pid = process->pid;
port->process = process; port->process = process;
port->type = type;
nxt_process_port_add(process, port); nxt_process_port_add(process, port);
} }
@@ -547,25 +593,45 @@ nxt_process_port_new(nxt_process_t *process)
void void
nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port) nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
{ {
/* TODO lock ports */ nxt_thread_mutex_lock(&process->cp_mutex);
nxt_port_hash_add(&process->connected_ports, process->mem_pool, port); if (process->cp_mem_pool == NULL) {
process->cp_mem_pool = nxt_mp_create(1024, 128, 256, 32);
}
nxt_mp_thread_adopt(process->cp_mem_pool);
nxt_port_hash_add(&process->connected_ports, process->cp_mem_pool, port);
nxt_thread_mutex_unlock(&process->cp_mutex);
} }
void void
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port) nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
{ {
/* TODO lock ports */ nxt_thread_mutex_lock(&process->cp_mutex);
nxt_port_hash_remove(&process->connected_ports, process->mem_pool, port); if (process->cp_mem_pool != NULL) {
nxt_mp_thread_adopt(process->cp_mem_pool);
nxt_port_hash_remove(&process->connected_ports, process->cp_mem_pool,
port);
}
nxt_thread_mutex_unlock(&process->cp_mutex);
} }
nxt_port_t * nxt_port_t *
nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid, nxt_process_connected_port_find(nxt_process_t *process, nxt_pid_t pid,
nxt_port_id_t port_id) nxt_port_id_t port_id)
{ {
/* TODO lock ports */ nxt_port_t *res;
return nxt_port_hash_find(&process->connected_ports, pid, port_id); nxt_thread_mutex_lock(&process->cp_mutex);
res = nxt_port_hash_find(&process->connected_ports, pid, port_id);
nxt_thread_mutex_unlock(&process->cp_mutex);
return res;
} }

View File

@@ -14,6 +14,8 @@ typedef enum {
NXT_PROCESS_CONTROLLER, NXT_PROCESS_CONTROLLER,
NXT_PROCESS_ROUTER, NXT_PROCESS_ROUTER,
NXT_PROCESS_WORKER, NXT_PROCESS_WORKER,
NXT_PROCESS_MAX,
} nxt_process_type_t; } nxt_process_type_t;
@@ -31,38 +33,41 @@ typedef struct {
} nxt_user_cred_t; } nxt_user_cred_t;
typedef struct nxt_process_init_s nxt_process_init_t; typedef struct nxt_process_init_s nxt_process_init_t;
typedef nxt_int_t (*nxt_process_star_t)(nxt_task_t *task, nxt_runtime_t *rt); typedef nxt_int_t (*nxt_process_start_t)(nxt_task_t *task, void *data);
struct nxt_process_init_s { struct nxt_process_init_s {
nxt_process_star_t start; nxt_process_start_t start;
const char *name; const char *name;
nxt_user_cred_t *user_cred; nxt_user_cred_t *user_cred;
nxt_port_t *port;
nxt_port_t *master_port;
nxt_port_handler_t *port_handlers; nxt_port_handler_t *port_handlers;
const nxt_sig_event_t *signals; const nxt_sig_event_t *signals;
nxt_process_type_t type:8; /* 3 bits */ nxt_process_type_t type;
void *data;
uint32_t stream;
nxt_bool_t restart;
}; };
typedef struct { typedef struct {
nxt_mp_t *mem_pool;
nxt_pid_t pid; nxt_pid_t pid;
nxt_queue_t ports; /* of nxt_port_t */ nxt_queue_t ports; /* of nxt_port_t */
nxt_port_id_t last_port_id; nxt_bool_t ready;
nxt_process_init_t *init; nxt_process_init_t *init;
nxt_thread_mutex_t incoming_mutex; nxt_thread_mutex_t incoming_mutex;
nxt_mp_t *incoming_mp;
nxt_array_t *incoming; /* of nxt_port_mmap_t */ nxt_array_t *incoming; /* of nxt_port_mmap_t */
nxt_thread_mutex_t outgoing_mutex; nxt_thread_mutex_t outgoing_mutex;
nxt_mp_t *outgoing_mp;
nxt_array_t *outgoing; /* of nxt_port_mmap_t */ nxt_array_t *outgoing; /* of nxt_port_mmap_t */
nxt_thread_mutex_t cp_mutex;
nxt_mp_t *cp_mem_pool;
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */ nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
} nxt_process_t; } nxt_process_t;
@@ -77,7 +82,8 @@ NXT_EXPORT void nxt_nanosleep(nxt_nsec_t ns);
NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv, NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
char ***orig_envp); char ***orig_envp);
NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_process_t *process); NXT_EXPORT nxt_port_t * nxt_process_port_new(nxt_runtime_t *rt,
nxt_process_t *process, nxt_port_id_t id, nxt_process_type_t type);
#define nxt_process_port_remove(port) \ #define nxt_process_port_remove(port) \
nxt_queue_remove(&port->link) nxt_queue_remove(&port->link)

View File

@@ -58,7 +58,7 @@ typedef struct {
} nxt_py_error_t; } nxt_py_error_t;
static nxt_int_t nxt_python_init(nxt_task_t *task); static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task,
nxt_app_request_t *r, nxt_app_wmsg_t *msg); nxt_app_request_t *r, nxt_app_wmsg_t *msg);
@@ -167,8 +167,6 @@ static PyTypeObject nxt_py_input_type = {
}; };
static char *nxt_py_module;
static PyObject *nxt_py_application; static PyObject *nxt_py_application;
static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_start_resp_obj;
static PyObject *nxt_py_environ_ptyp; static PyObject *nxt_py_environ_ptyp;
@@ -181,72 +179,47 @@ static nxt_python_run_ctx_t *nxt_python_run_ctx;
nxt_int_t nxt_int_t
nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt) nxt_python_wsgi_init(nxt_thread_t *thr, nxt_runtime_t *rt)
{ {
char **argv; nxt_app_modules[NXT_APP_PYTHON] = &nxt_python_module;
char *p;
argv = nxt_process_argv; #if PY_MAJOR_VERSION == 2
nxt_app_modules[NXT_APP_PYTHON2] = &nxt_python_module;
#endif
while (*argv != NULL) { #if PY_MAJOR_VERSION == 3
p = *argv++; nxt_app_modules[NXT_APP_PYTHON3] = &nxt_python_module;
#endif
if (nxt_strcmp(p, "--py-module") == 0) {
if (*argv == NULL) {
nxt_log_emerg(thr->log,
"no argument for option \"--py-module\"");
return NXT_ERROR;
}
nxt_py_module = *argv++;
nxt_log_error(NXT_LOG_INFO, thr->log, "python module: \"%s\"",
nxt_py_module);
break;
}
}
if (nxt_py_module == NULL) {
return NXT_OK;
}
nxt_app = &nxt_python_module;
return NXT_OK; return NXT_OK;
} }
static nxt_int_t static nxt_int_t
nxt_python_init(nxt_task_t *task) nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
{ {
char **argv; char *nxt_py_module;
char *p, *dir;
PyObject *obj, *pypath, *module; PyObject *obj, *pypath, *module;
nxt_python_app_conf_t *c;
c = &conf->u.python;
if (c->module.length == 0) {
nxt_log_emerg(task->log, "python module is empty");
return NXT_ERROR;
}
Py_InitializeEx(0); Py_InitializeEx(0);
obj = NULL; obj = NULL;
module = NULL; module = NULL;
argv = nxt_process_argv;
while (*argv != NULL) { if (c->path.length > 0) {
p = *argv++; obj = PyString_FromStringAndSize((char *) c->path.start,
c->path.length);
if (nxt_strcmp(p, "--py-path") == 0) {
if (*argv == NULL) {
nxt_log_emerg(task->log, "no argument for option \"--py-path\"");
goto fail;
}
dir = *argv++;
nxt_log_error(NXT_LOG_INFO, task->log, "python path \"%s\"", dir);
obj = PyString_FromString((char *) dir);
if (nxt_slow_path(obj == NULL)) { if (nxt_slow_path(obj == NULL)) {
nxt_log_alert(task->log, nxt_log_alert(task->log,
"Python failed create string object \"%s\"", dir); "Python failed create string object \"%V\"",
&c->path);
goto fail; goto fail;
} }
@@ -260,15 +233,13 @@ nxt_python_init(nxt_task_t *task)
if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) { if (nxt_slow_path(PyList_Insert(pypath, 0, obj) != 0)) {
nxt_log_alert(task->log, nxt_log_alert(task->log,
"Python failed to insert \"%s\" into \"sys.path\"", dir); "Python failed to insert \"%V\" into \"sys.path\"",
&c->path);
goto fail; goto fail;
} }
Py_DECREF(obj); Py_DECREF(obj);
obj = NULL; obj = NULL;
continue;
}
} }
obj = PyCFunction_New(nxt_py_start_resp_method, NULL); obj = PyCFunction_New(nxt_py_start_resp_method, NULL);
@@ -303,7 +274,9 @@ nxt_python_init(nxt_task_t *task)
Py_DECREF(obj); Py_DECREF(obj);
// PyOS_AfterFork(); nxt_py_module = nxt_alloca(c->module.length + 1);
nxt_memcpy(nxt_py_module, c->module.start, c->module.length);
nxt_py_module[c->module.length] = '\0';
module = PyImport_ImportModule(nxt_py_module); module = PyImport_ImportModule(nxt_py_module);

View File

@@ -20,6 +20,19 @@ typedef struct {
} nxt_router_listener_conf_t; } nxt_router_listener_conf_t;
typedef struct nxt_start_worker_s nxt_start_worker_t;
struct nxt_start_worker_s {
uint32_t stream;
nxt_app_t *app;
nxt_req_conn_link_t *rc;
nxt_mp_t *mem_pool;
void *joint;
nxt_work_t work;
};
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static nxt_int_t nxt_router_conf_new(nxt_task_t *task, static nxt_int_t nxt_router_conf_new(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end); nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
@@ -87,11 +100,22 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task, static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint); nxt_socket_conf_joint_t *joint);
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
nxt_start_worker_t *sw);
static nxt_start_worker_t *nxt_router_sw_find_remove(nxt_task_t *task,
nxt_router_t *router, uint32_t id);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data); void *data);
static void nxt_router_process_http_request(nxt_task_t *task, static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap); nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task,
nxt_req_conn_link_t *rc, nxt_mp_t *mp);
static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
@@ -99,13 +123,19 @@ static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data); static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
const char* fmt, ...);
static nxt_router_t *nxt_router; static nxt_router_t *nxt_router;
nxt_int_t nxt_int_t
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_router_start(nxt_task_t *task, void *data)
{ {
nxt_int_t ret; nxt_int_t ret;
nxt_router_t *router; nxt_router_t *router;
nxt_runtime_t *rt;
rt = task->thread->runtime;
ret = nxt_app_http_init(task, rt); ret = nxt_app_http_init(task, rt);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
@@ -127,6 +157,51 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
} }
static void
nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
{
nxt_start_worker_t *sw;
nxt_socket_conf_joint_t *joint;
sw = obj;
joint = sw->joint;
nxt_debug(task, "sw #%uxD release", sw->stream);
if (nxt_mp_release(sw->mem_pool, sw) == 0) {
nxt_router_conf_release(task, joint);
}
}
void
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_start_worker_t *sw;
nxt_port_new_port_handler(task, msg);
if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
return;
}
sw = nxt_router_sw_find_remove(task, nxt_router, msg->port_msg.stream);
if (nxt_fast_path(sw != NULL)) {
msg->new_port->app = sw->app;
nxt_router_app_release_port(task, msg->new_port, sw->app);
sw->work.handler = nxt_router_sw_release;
nxt_debug(task, "post sw #%uxD release to %p", sw->stream,
sw->work.data);
nxt_event_engine_post(sw->work.data, &sw->work);
}
}
void void
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{ {
@@ -412,7 +487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev; nxt_app_t *app, *prev;
nxt_app_type_t type; nxt_app_type_t type;
nxt_sockaddr_t *sa; nxt_sockaddr_t *sa;
nxt_queue_link_t *qlk, *nqlk;
nxt_conf_value_t *conf, *http; nxt_conf_value_t *conf, *http;
nxt_conf_value_t *applications, *application; nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener; nxt_conf_value_t *listeners, *listener;
@@ -496,19 +570,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type); nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers); nxt_debug(task, "application workers: %D", apcf.workers);
if (nxt_str_eq(&apcf.type, "python", 6)) { type = nxt_app_parse_type(&apcf.type);
type = NXT_APP_PYTHON;
} else if (nxt_str_eq(&apcf.type, "php", 3)) { if (type == NXT_APP_UNKNOWN) {
type = NXT_APP_PHP; nxt_log(task, NXT_LOG_CRIT, "unknown application type: \"%V\"",
&apcf.type);
goto app_fail;
}
} else if (nxt_str_eq(&apcf.type, "ruby", 4)) { if (nxt_app_modules[type] == NULL) {
type = NXT_APP_RUBY;
} else if (nxt_str_eq(&apcf.type, "go", 2)) {
type = NXT_APP_GO;
} else {
nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"", nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
&apcf.type); &apcf.type);
goto app_fail; goto app_fail;
@@ -519,10 +589,14 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
goto app_fail; goto app_fail;
} }
nxt_queue_init(&app->ports);
nxt_queue_init(&app->requests);
app->name = name; app->name = name;
app->type = type; app->type = type;
app->max_workers = apcf.workers; app->max_workers = apcf.workers;
app->live = 1; app->live = 1;
app->module = nxt_app_modules[type];
nxt_queue_insert_tail(&tmcf->apps, &app->link); nxt_queue_insert_tail(&tmcf->apps, &app->link);
} }
@@ -607,16 +681,13 @@ app_fail:
fail: fail:
for (qlk = nxt_queue_first(&tmcf->apps); nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
qlk != nxt_queue_tail(&tmcf->apps);
qlk = nqlk)
{
nqlk = nxt_queue_next(qlk);
app = nxt_queue_link_data(qlk, nxt_app_t, link);
nxt_queue_remove(&app->link);
nxt_thread_mutex_destroy(&app->mutex); nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app); nxt_free(app);
}
} nxt_queue_loop;
return NXT_ERROR; return NXT_ERROR;
} }
@@ -626,18 +697,14 @@ static nxt_app_t *
nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name) nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
{ {
nxt_app_t *app; nxt_app_t *app;
nxt_queue_link_t *qlk;
for (qlk = nxt_queue_first(queue); nxt_queue_each(app, queue, nxt_app_t, link) {
qlk != nxt_queue_tail(queue);
qlk = nxt_queue_next(qlk))
{
app = nxt_queue_link_data(qlk, nxt_app_t, link);
if (nxt_strstr_eq(name, &app->name)) { if (nxt_strstr_eq(name, &app->name)) {
return app; return app;
} }
}
} nxt_queue_loop;
return NULL; return NULL;
} }
@@ -1088,8 +1155,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine) nxt_event_engine_t *engine)
{ {
nxt_int_t ret; nxt_int_t ret;
nxt_port_t *port;
nxt_process_t *process;
nxt_thread_link_t *link; nxt_thread_link_t *link;
nxt_thread_handle_t handle; nxt_thread_handle_t handle;
@@ -1107,28 +1172,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_queue_insert_tail(&rt->engines, &engine->link); nxt_queue_insert_tail(&rt->engines, &engine->link);
process = nxt_runtime_process_find(rt, nxt_pid);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
port = nxt_process_port_new(process);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
port->engine = 0;
port->type = NXT_PROCESS_ROUTER;
engine->port = port;
nxt_runtime_port_add(rt, port);
ret = nxt_thread_create(&handle, link); ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
@@ -1143,19 +1186,13 @@ static void
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf) nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{ {
nxt_app_t *app; nxt_app_t *app;
nxt_queue_link_t *qlk, *nqlk;
for (qlk = nxt_queue_first(&router->apps); nxt_queue_each(app, &router->apps, nxt_app_t, link) {
qlk != nxt_queue_tail(&router->apps);
qlk = nqlk)
{
nqlk = nxt_queue_next(qlk);
app = nxt_queue_link_data(qlk, nxt_app_t, link);
nxt_queue_remove(&app->link); nxt_queue_remove(&app->link);
// RELEASE APP // TODO RELEASE APP
} } nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous); nxt_queue_add(&router->apps, &tmcf->previous);
nxt_queue_add(&router->apps, &tmcf->apps); nxt_queue_add(&router->apps, &tmcf->apps);
@@ -1232,6 +1269,8 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = {
static void static void
nxt_router_thread_start(void *data) nxt_router_thread_start(void *data)
{ {
nxt_int_t ret;
nxt_port_t *port;
nxt_task_t *task; nxt_task_t *task;
nxt_thread_t *thread; nxt_thread_t *thread;
nxt_thread_link_t *link; nxt_thread_link_t *link;
@@ -1252,13 +1291,27 @@ nxt_router_thread_start(void *data)
thread->task = &engine->task; thread->task = &engine->task;
thread->fiber = &engine->fibers->fiber; thread->fiber = &engine->fibers->fiber;
nxt_mp_thread_adopt(engine->port->mem_pool);
engine->port->socket.task = task;
nxt_port_create(task, engine->port, nxt_router_app_port_handlers);
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64); engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t));
if (nxt_slow_path(port == NULL)) {
return;
}
port->id = nxt_port_get_next_id();
port->pid = nxt_pid;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return;
}
port->type = NXT_PROCESS_ROUTER;
engine->port = port;
nxt_port_enable(task, port, nxt_router_app_port_handlers);
nxt_event_engine_start(engine); nxt_event_engine_start(engine);
} }
@@ -1456,6 +1509,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_unlock(lock); nxt_thread_spin_unlock(lock);
/* TODO remove engine->port */
/* TODO excude from connected ports */
if (rtcf != NULL) { if (rtcf != NULL) {
nxt_debug(task, "old router conf is destroyed"); nxt_debug(task, "old router conf is destroyed");
@@ -1473,6 +1529,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
static void static void
nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data) nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
{ {
nxt_port_t *port;
nxt_thread_link_t *link; nxt_thread_link_t *link;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_thread_handle_t handle; nxt_thread_handle_t handle;
@@ -1486,13 +1543,27 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&engine->link); nxt_queue_remove(&engine->link);
port = engine->port;
// TODO notify all apps
if (port->pair[0] != -1) {
nxt_fd_close(port->pair[0]);
}
if (port->pair[1] != -1) {
nxt_fd_close(port->pair[1]);
}
if (port->mem_pool) {
nxt_mp_destroy(port->mem_pool);
}
nxt_mp_destroy(engine->mem_pool); nxt_mp_destroy(engine->mem_pool);
nxt_event_engine_free(engine); nxt_event_engine_free(engine);
nxt_free(link); nxt_free(link);
// TODO: free port
} }
@@ -1616,28 +1687,265 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
} }
} }
nxt_inline const char *
nxt_inline nxt_port_t * nxt_router_text_by_code(int code)
nxt_router_app_port(nxt_task_t *task)
{ {
nxt_port_t *port; switch (code) {
nxt_runtime_t *rt; case 400: return "Bad request";
case 404: return "Not found";
rt = task->thread->runtime; case 403: return "Forbidden";
case 500:
nxt_runtime_port_each(rt, port) { default: return "Internal server error";
}
if (nxt_pid == port->pid) {
continue;
} }
if (port->type == NXT_PROCESS_WORKER) { static void
nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
const char* fmt, ...)
{
va_list args;
nxt_buf_t *b, *last;
const char *msg;
b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0);
if (nxt_slow_path(b == NULL)) {
/* TODO pogorevaTb */
}
b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
"HTTP/1.0 %d %s\r\n"
"Content-Type: text/plain\r\n"
"Connection: close\r\n\r\n",
code, nxt_router_text_by_code(code));
msg = (const char *) b->mem.free;
va_start(args, fmt);
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
va_end(args);
nxt_log_alert(task->log, "error %d: %s", code, msg);
last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(last == NULL)) {
/* TODO pogorevaTb */
}
nxt_buf_chain_add(&b, last);
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
nxt_conn_write(task->thread->engine, c);
} else {
nxt_debug(task, "router data attach out bufs to existing chain");
nxt_buf_chain_add(&c->write, b);
}
}
static void
nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_app_t *app;
nxt_port_t *port;
nxt_runtime_t *rt;
nxt_start_worker_t *sw;
sw = obj;
app = sw->app;
nxt_debug(task, "send sw #%uD", sw->stream);
nxt_router_sw_add(task, nxt_router, sw);
nxt_queue_insert_tail(&app->requests, &sw->rc->app_link);
rt = task->thread->runtime;
port = rt->port_by_type[NXT_PROCESS_MASTER];
b = nxt_buf_mem_alloc(port->mem_pool, app->conf.length, 0);
nxt_buf_cpystr(b, &app->conf);
nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, sw->stream, 0, b);
}
static nxt_port_t *
nxt_router_app_get_port(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
port = NULL;
nxt_thread_mutex_lock(&app->mutex);
if (!nxt_queue_is_empty(&app->ports)) {
lnk = nxt_queue_first(&app->ports);
nxt_queue_remove(lnk);
lnk->next = NULL;
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
}
nxt_thread_mutex_unlock(&app->mutex);
return port; return port;
} }
} nxt_runtime_port_loop;
return NULL; static void
nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
nxt_port_t *port;
nxt_work_t *work;
nxt_queue_link_t *lnk;
nxt_req_conn_link_t *rc;
port = obj;
app = data;
nxt_assert(app != NULL);
nxt_assert(app == port->app);
nxt_assert(port->app_link.next == NULL);
if (task->thread->engine != port->engine) {
work = (nxt_work_t *) (port + 1);
nxt_debug(task, "post release port to engine %p", port->engine);
work->next = NULL;
work->handler = nxt_router_app_release_port;
work->task = port->socket.task;
work->obj = port;
work->data = app;
nxt_event_engine_post(port->engine, work);
return;
}
if (!nxt_queue_is_empty(&app->requests)) {
lnk = nxt_queue_first(&app->requests);
nxt_queue_remove(lnk);
rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
nxt_debug(task, "process request #%uxD", rc->req_id);
rc->app_port = port;
nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool);
return;
}
nxt_debug(task, "app requests queue is empty");
nxt_thread_mutex_lock(&app->mutex);
nxt_queue_insert_head(&app->ports, &port->app_link);
nxt_thread_mutex_unlock(&app->mutex);
}
void
nxt_router_app_remove_port(nxt_port_t *port)
{
nxt_app_t *app;
if (port->app_link.next == NULL) {
return;
}
app = port->app;
#if (NXT_DEBUG)
if (nxt_slow_path(app == NULL)) {
nxt_abort();
}
#endif
nxt_thread_mutex_lock(&app->mutex);
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
nxt_thread_mutex_unlock(&app->mutex);
}
nxt_inline nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
{
nxt_app_t *app;
nxt_conn_t *c;
nxt_port_t *port, *master_port;
nxt_runtime_t *rt;
nxt_start_worker_t *sw;
nxt_socket_conf_joint_t *joint;
port = NULL;
c = rc->conn;
joint = c->listen->socket.data;
app = joint->socket_conf->application;
if (app == NULL) {
nxt_router_gen_error(task, rc->conn, 500,
"Application is NULL in socket_conf");
return NXT_ERROR;
}
port = nxt_router_app_get_port(app);
if (port != NULL) {
rc->app_port = port;
return NXT_OK;
}
sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
if (nxt_slow_path(sw == NULL)) {
nxt_router_gen_error(task, rc->conn, 500,
"Failed to allocate start worker struct");
return NXT_ERROR;
}
nxt_memzero(sw, sizeof(nxt_start_worker_t));
sw->stream = nxt_random(&task->thread->random);
sw->app = app;
sw->rc = rc;
sw->mem_pool = c->mem_pool;
sw->joint = c->listen->socket.data;
sw->work.handler = nxt_router_send_sw_request;
sw->work.task = task;
sw->work.obj = sw;
sw->work.data = task->thread->engine;
rt = task->thread->runtime;
master_port = rt->port_by_type[NXT_PROCESS_MASTER];
nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream,
master_port->engine);
nxt_event_engine_post(master_port->engine, &sw->work);
return NXT_AGAIN;
} }
@@ -1715,13 +2023,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
} }
size = c->read->mem.free - c->read->mem.pos; size = c->read->mem.free - c->read->mem.pos;
nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
b->mem.free += size; c->read = nxt_buf_cpy(b, c->read->mem.pos, size);
c->read = b;
} else { } else {
// TODO 500 Too long request headers nxt_router_gen_error(task, c, 400,
nxt_log_alert(task->log, "Too long request headers"); "Too long request headers");
return;
} }
} }
} }
@@ -1735,15 +2042,12 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0); b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
// TODO 500 Failed to allocate buffer for request body nxt_router_gen_error(task, c, 500, "Failed to allocate "
nxt_log_alert(task->log, "Failed to allocate buffer for " "buffer for request body");
"request body"); return;
} }
b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, c->read = nxt_buf_cpy(b, c->read->mem.pos, preread);
preread);
c->read = b;
} }
nxt_debug(task, "router request body read again, rest: %uz", nxt_debug(task, "router request body read again, rest: %uz",
@@ -1761,26 +2065,11 @@ static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap) nxt_app_parse_ctx_t *ap)
{ {
nxt_mp_t *port_mp;
nxt_int_t res; nxt_int_t res;
nxt_port_t *port, *c_port;
nxt_req_id_t req_id; nxt_req_id_t req_id;
nxt_app_wmsg_t wmsg;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc; nxt_req_conn_link_t *rc;
if (nxt_slow_path(nxt_app == NULL)) {
// 500 Application not found
nxt_log_alert(task->log, "application is NULL");
}
port = nxt_router_app_port(task);
if (nxt_slow_path(port == NULL)) {
// 500 Application port not found
nxt_log_alert(task->log, "application port not found");
}
engine = task->thread->engine; engine = task->thread->engine;
do { do {
@@ -1790,8 +2079,10 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
rc = nxt_conn_request_add(c, req_id); rc = nxt_conn_request_add(c, req_id);
if (nxt_slow_path(rc == NULL)) { if (nxt_slow_path(rc == NULL)) {
// 500 Failed to allocate req->conn link nxt_router_gen_error(task, c, 500, "Failed to allocate "
nxt_log_alert(task->log, "failed to allocate req->conn link"); "req->conn link");
return;
} }
nxt_event_engine_request_add(engine, rc); nxt_event_engine_request_add(engine, rc);
@@ -1799,33 +2090,66 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p", nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
req_id, c, engine); req_id, c, engine);
port_mp = port->mem_pool; rc->reply_port = engine->port;
port->mem_pool = c->mem_pool;
c_port = nxt_process_connected_port_find(port->process, res = nxt_router_app_port(task, rc);
engine->port->pid,
engine->port->id);
if (nxt_slow_path(c_port != engine->port)) {
res = nxt_port_send_port(task, port, engine->port);
if (nxt_slow_path(res != NXT_OK)) { if (res != NXT_OK) {
// 500 Failed to send reply port return;
nxt_log_alert(task->log, "failed to send reply port to application");
} }
nxt_process_connected_port_add(port->process, engine->port); nxt_router_process_http_request_mp(task, rc, c->mem_pool);
}
static void
nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
nxt_mp_t *mp)
{
nxt_mp_t *port_mp;
nxt_int_t res;
nxt_port_t *port, *c_port, *reply_port;
nxt_app_wmsg_t wmsg;
nxt_app_parse_ctx_t *ap;
port = rc->app_port;
if (nxt_slow_path(port == NULL)) {
nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
return;
}
reply_port = rc->reply_port;
ap = rc->conn->socket.data;
port_mp = port->mem_pool;
port->mem_pool = mp;
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
reply_port->id);
if (nxt_slow_path(c_port != reply_port)) {
res = nxt_port_send_port(task, port, reply_port, 0);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, rc->conn, 500,
"Failed to send reply port to application");
goto fail;
}
nxt_process_connected_port_add(port->process, reply_port);
} }
wmsg.port = port; wmsg.port = port;
wmsg.write = NULL; wmsg.write = NULL;
wmsg.buf = &wmsg.write; wmsg.buf = &wmsg.write;
wmsg.stream = req_id; wmsg.stream = rc->req_id;
res = nxt_app->prepare_msg(task, &ap->r, &wmsg); res = rc->app_port->app->module->prepare_msg(task, &ap->r, &wmsg);
if (nxt_slow_path(res != NXT_OK)) { if (nxt_slow_path(res != NXT_OK)) {
// 500 Failed to prepare message nxt_router_gen_error(task, rc->conn, 500,
nxt_log_alert(task->log, "failed to prepare message for application"); "Failed to prepare message for application");
goto fail;
} }
nxt_debug(task, "about to send %d bytes buffer to worker port %d", nxt_debug(task, "about to send %d bytes buffer to worker port %d",
@@ -1833,13 +2157,15 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
wmsg.port->socket.fd); wmsg.port->socket.fd);
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA, res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, req_id, engine->port->id, wmsg.write); -1, rc->req_id, reply_port->id, wmsg.write);
if (nxt_slow_path(res != NXT_OK)) { if (nxt_slow_path(res != NXT_OK)) {
// 500 Failed to send message nxt_router_gen_error(task, rc->conn, 500,
nxt_log_alert(task->log, "failed to send message to application"); "Failed to send message to application");
goto fail;
} }
fail:
port->mem_pool = port_mp; port->mem_pool = port_mp;
} }
@@ -1934,6 +2260,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id); nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
if (rc->app_port != NULL) {
nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
rc->app_port = NULL;
}
nxt_event_engine_request_remove(task->thread->engine, rc); nxt_event_engine_request_remove(task->thread->engine, rc);
} nxt_queue_loop; } nxt_queue_loop;
@@ -1944,10 +2276,10 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
task = &task->thread->engine->task; task = &task->thread->engine->task;
nxt_mp_release(c->mem_pool, c); if (nxt_mp_release(c->mem_pool, c) == 0) {
nxt_router_conf_release(task, joint); nxt_router_conf_release(task, joint);
} }
}
static void static void
@@ -1992,3 +2324,71 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
return nxt_value_at(nxt_msec_t, joint->socket_conf, data); return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
} }
static nxt_int_t
nxt_sw_test(nxt_lvlhsh_query_t *lhq, void *data)
{
return NXT_OK;
}
static const nxt_lvlhsh_proto_t lvlhsh_sw_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_sw_test,
nxt_lvlhsh_alloc,
nxt_lvlhsh_free,
};
static void
nxt_router_sw_add(nxt_task_t *task, nxt_router_t *router,
nxt_start_worker_t *sw)
{
nxt_lvlhsh_query_t lhq;
lhq.key_hash = nxt_murmur_hash2(&sw->stream, sizeof(sw->stream));
lhq.key.length = sizeof(sw->stream);
lhq.key.start = (u_char *) &sw->stream;
lhq.proto = &lvlhsh_sw_proto;
lhq.replace = 0;
lhq.value = sw;
lhq.pool = task->thread->runtime->mem_pool;
switch (nxt_lvlhsh_insert(&router->start_workers, &lhq)) {
case NXT_OK:
break;
default:
nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw add failed",
sw->stream);
break;
}
}
static nxt_start_worker_t *
nxt_router_sw_find_remove(nxt_task_t *task, nxt_router_t *router, uint32_t id)
{
nxt_lvlhsh_query_t lhq;
lhq.key_hash = nxt_murmur_hash2(&id, sizeof(id));
lhq.key.length = sizeof(id);
lhq.key.start = (u_char *) &id;
lhq.proto = &lvlhsh_sw_proto;
lhq.pool = task->thread->runtime->mem_pool;
switch (nxt_lvlhsh_delete(&router->start_workers, &lhq)) {
case NXT_OK:
return lhq.value;
default:
nxt_log_error(NXT_LOG_WARN, task->log, "stream %08uxD sw remove failed",
id);
break;
}
return NULL;
}

View File

@@ -20,6 +20,8 @@ typedef struct {
nxt_queue_t sockets; /* of nxt_socket_conf_t */ nxt_queue_t sockets; /* of nxt_socket_conf_t */
nxt_queue_t apps; /* of nxt_app_t */ nxt_queue_t apps; /* of nxt_app_t */
nxt_lvlhsh_t start_workers; /* stream to nxt_start_worker_t */
} nxt_router_t; } nxt_router_t;
@@ -61,9 +63,13 @@ typedef struct {
} nxt_router_temp_conf_t; } nxt_router_temp_conf_t;
typedef struct { typedef struct nxt_app_module_s nxt_app_module_t;
typedef struct nxt_app_s nxt_app_t;
struct nxt_app_s {
nxt_thread_mutex_t mutex; nxt_thread_mutex_t mutex;
nxt_queue_t ports; nxt_queue_t ports;
nxt_queue_t requests; /* of nxt_req_conn_link_t */
nxt_str_t name; nxt_str_t name;
uint32_t workers; uint32_t workers;
@@ -75,7 +81,8 @@ typedef struct {
nxt_queue_link_t link; nxt_queue_link_t link;
nxt_str_t conf; nxt_str_t conf;
} nxt_app_t; nxt_app_module_t *module;
};
typedef struct { typedef struct {
@@ -111,7 +118,9 @@ typedef struct {
} nxt_socket_conf_joint_t; } nxt_socket_conf_joint_t;
void nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_router_app_remove_port(nxt_port_t *port);
#endif /* _NXT_ROUTER_H_INCLUDED_ */ #endif /* _NXT_ROUTER_H_INCLUDED_ */

View File

@@ -9,6 +9,7 @@
#include <nxt_runtime.h> #include <nxt_runtime.h>
#include <nxt_port.h> #include <nxt_port.h>
#include <nxt_master_process.h> #include <nxt_master_process.h>
#include <nxt_router.h>
static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task, static nxt_int_t nxt_runtime_inherited_listen_sockets(nxt_task_t *task,
@@ -1479,16 +1480,34 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
nxt_queue_init(&process->ports); nxt_queue_init(&process->ports);
/* TODO each process should have it's own mem_pool for ports allocation */
process->mem_pool = rt->mem_pool;
nxt_thread_mutex_create(&process->incoming_mutex); nxt_thread_mutex_create(&process->incoming_mutex);
nxt_thread_mutex_create(&process->outgoing_mutex); nxt_thread_mutex_create(&process->outgoing_mutex);
nxt_thread_mutex_create(&process->cp_mutex);
return process; return process;
} }
void
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
{
nxt_port_mmaps_destroy(process->incoming, 1);
nxt_port_mmaps_destroy(process->outgoing, 1);
if (process->cp_mem_pool != NULL) {
nxt_mp_thread_adopt(process->cp_mem_pool);
nxt_mp_destroy(process->cp_mem_pool);
}
nxt_thread_mutex_destroy(&process->incoming_mutex);
nxt_thread_mutex_destroy(&process->outgoing_mutex);
nxt_thread_mutex_destroy(&process->cp_mutex);
nxt_mp_free(rt->mem_pool, process);
}
static nxt_int_t static nxt_int_t
nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data) nxt_runtime_lvlhsh_pid_test(nxt_lvlhsh_query_t *lhq, void *data)
{ {
@@ -1606,6 +1625,8 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
nxt_process_port_each(process, port) { nxt_process_port_each(process, port) {
port->pid = process->pid;
nxt_runtime_port_add(rt, port); nxt_runtime_port_add(rt, port);
} nxt_process_port_loop; } nxt_process_port_loop;
@@ -1621,9 +1642,7 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
void void
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process) nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
{ {
uint32_t i;
nxt_port_t *port; nxt_port_t *port;
nxt_port_mmap_t *port_mmap;
nxt_lvlhsh_query_t lhq; nxt_lvlhsh_query_t lhq;
lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid)); lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
@@ -1645,35 +1664,8 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
} nxt_process_port_loop; } nxt_process_port_loop;
if (process->incoming) { nxt_runtime_process_destroy(rt, process);
nxt_mp_thread_adopt(process->incoming_mp);
port_mmap = process->incoming->elts;
for (i = 0; i < process->incoming->nelts; i++) {
nxt_port_mmap_destroy(port_mmap);
}
nxt_thread_mutex_destroy(&process->incoming_mutex);
nxt_mp_destroy(process->incoming_mp);
}
if (process->outgoing) {
nxt_mp_thread_adopt(process->outgoing_mp);
port_mmap = process->outgoing->elts;
for (i = 0; i < process->outgoing->nelts; i++) {
nxt_port_mmap_destroy(port_mmap);
}
nxt_thread_mutex_destroy(&process->outgoing_mutex);
nxt_mp_destroy(process->outgoing_mp);
}
nxt_mp_free(rt->mem_pool, process);
break; break;
default: default:
@@ -1704,6 +1696,8 @@ void
nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port) nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
{ {
nxt_port_hash_add(&rt->ports, rt->mem_pool, port); nxt_port_hash_add(&rt->ports, rt->mem_pool, port);
rt->port_by_type[port->type] = port;
} }
@@ -1712,6 +1706,10 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
{ {
nxt_port_hash_remove(&rt->ports, rt->mem_pool, port); nxt_port_hash_remove(&rt->ports, rt->mem_pool, port);
if (rt->port_by_type[port->type] == port) {
rt->port_by_type[port->type] = NULL;
}
if (port->pair[0] != -1) { if (port->pair[0] != -1) {
nxt_fd_close(port->pair[0]); nxt_fd_close(port->pair[0]);
} }
@@ -1720,11 +1718,15 @@ nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
nxt_fd_close(port->pair[1]); nxt_fd_close(port->pair[1]);
} }
if (port->type == NXT_PROCESS_WORKER) {
nxt_router_app_remove_port(port);
}
if (port->mem_pool) { if (port->mem_pool) {
nxt_mp_destroy(port->mem_pool); nxt_mp_destroy(port->mem_pool);
} }
nxt_mp_free(port->process->mem_pool, port); nxt_mp_free(rt->mem_pool, port);
} }

View File

@@ -39,6 +39,7 @@ struct nxt_runtime_s {
size_t nprocesses; size_t nprocesses;
nxt_lvlhsh_t processes; /* of nxt_process_t */ nxt_lvlhsh_t processes; /* of nxt_process_t */
nxt_port_t *port_by_type[NXT_PROCESS_MAX];
nxt_lvlhsh_t ports; /* of nxt_port_t */ nxt_lvlhsh_t ports; /* of nxt_port_t */
nxt_list_t *log_files; /* of nxt_file_t */ nxt_list_t *log_files; /* of nxt_file_t */
@@ -100,6 +101,8 @@ nxt_runtime_is_master(nxt_runtime_t *rt)
nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt); nxt_process_t *nxt_runtime_process_new(nxt_runtime_t *rt);
void nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process);
nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid); nxt_process_t *nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid);
void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process); void nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process);
@@ -150,7 +153,6 @@ void nxt_cdecl nxt_log_time_handler(nxt_uint_t level, nxt_log_t *log,
const char *fmt, ...); const char *fmt, ...);
void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data); void nxt_stream_connection_init(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_app_start(nxt_task_t *task, nxt_runtime_t *rt);
void nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); void nxt_port_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);

View File

@@ -54,7 +54,7 @@ nxt_port_handler_t nxt_app_process_port_handlers[] = {
nxt_port_handler_t nxt_router_process_port_handlers[] = { nxt_port_handler_t nxt_router_process_port_handlers[] = {
nxt_worker_process_quit_handler, nxt_worker_process_quit_handler,
nxt_port_new_port_handler, nxt_router_new_port_handler,
nxt_port_change_log_file_handler, nxt_port_change_log_file_handler,
nxt_port_mmap_handler, nxt_port_mmap_handler,
nxt_router_conf_data_handler, nxt_router_conf_data_handler,