Port allocation and destroy changed. Worker process stop introduced.

This commit is contained in:
Max Romanov
2017-07-18 00:21:14 +03:00
parent 47b359388c
commit eb675f2d78
12 changed files with 298 additions and 162 deletions

View File

@@ -99,6 +99,7 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
static nxt_bool_t nxt_router_app_free(nxt_app_t *app);
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
void *data);
@@ -189,6 +190,12 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_fast_path(sw != NULL)) {
msg->new_port->app = sw->app;
sw->app->workers++;
nxt_assert(sw->app->pending_workers != 0);
sw->app->pending_workers--;
nxt_router_app_release_port(task, msg->new_port, sw->app);
sw->work.handler = nxt_router_sw_release;
@@ -754,25 +761,25 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
{
nxt_socket_conf_t *conf;
nxt_socket_conf_t *skcf;
conf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
if (nxt_slow_path(conf == NULL)) {
skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
if (nxt_slow_path(skcf == NULL)) {
return NULL;
}
conf->sockaddr = sa;
skcf->sockaddr = sa;
conf->listen.sockaddr = sa;
conf->listen.socklen = sa->socklen;
conf->listen.address_length = sa->length;
skcf->listen.sockaddr = sa;
skcf->listen.socklen = sa->socklen;
skcf->listen.address_length = sa->length;
conf->listen.socket = -1;
conf->listen.backlog = NXT_LISTEN_BACKLOG;
conf->listen.flags = NXT_NONBLOCK;
conf->listen.read_after_accept = 1;
skcf->listen.socket = -1;
skcf->listen.backlog = NXT_LISTEN_BACKLOG;
skcf->listen.flags = NXT_NONBLOCK;
skcf->listen.read_after_accept = 1;
return conf;
return skcf;
}
@@ -1179,17 +1186,33 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
static void
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{
nxt_app_t *app;
nxt_app_t *app;
nxt_port_t *port;
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_queue_remove(&app->link);
// TODO RELEASE APP
#if 0
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
#endif
app->live = 0;
if (nxt_router_app_free(app) != 0) {
continue;
}
if (nxt_queue_is_empty(&app->requests)) {
do {
port = nxt_router_app_get_port(app);
if (port == NULL) {
break;
}
nxt_port_socket_write(&port->engine->task, port,
NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
} while (1);
}
} nxt_queue_loop;
nxt_queue_add(&router->apps, &tmcf->previous);
@@ -1266,21 +1289,17 @@ nxt_router_thread_start(void *data)
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
port = nxt_mp_zalloc(engine->mem_pool, sizeof(nxt_port_t));
port = nxt_port_new(nxt_port_get_next_id(), nxt_pid, NXT_PROCESS_ROUTER);
if (nxt_slow_path(port == NULL)) {
return;
}
port->id = nxt_port_get_next_id();
port->pid = nxt_pid;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_mp_release(port->mem_pool, port);
return;
}
port->type = NXT_PROCESS_ROUTER;
engine->port = port;
nxt_port_enable(task, port, nxt_router_app_port_handlers);
@@ -1391,6 +1410,9 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
nxt_fd_event_delete(engine, &listen->socket);
nxt_debug(task, "engine %p: listen socket delete: %d", engine,
listen->socket.fd);
listen->timer.handler = nxt_router_listen_socket_close;
listen->timer.work_queue = &engine->fast_work_queue;
@@ -1414,6 +1436,9 @@ nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
joint = listen->socket.data;
nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
listen->socket.fd);
nxt_queue_remove(&listen->link);
/* 'task' refers to listen->task and we cannot use after nxt_free() */
@@ -1439,6 +1464,9 @@ nxt_router_listen_socket_release(nxt_task_t *task,
nxt_thread_spin_lock(lock);
nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
task->thread->engine, rtsk->count);
if (--rtsk->count != 0) {
rtsk = NULL;
}
@@ -1463,7 +1491,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_router_conf_t *rtcf;
nxt_thread_spinlock_t *lock;
nxt_debug(task, "conf joint count: %D", joint->count);
nxt_debug(task, "conf joint %p count: %D", joint, joint->count);
if (--joint->count != 0) {
return;
@@ -1477,6 +1505,9 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
nxt_thread_spin_lock(lock);
nxt_debug(task, "conf skcf %p: %D, rtcf %p: %D", skcf, skcf->count,
rtcf, rtcf->count);
if (--skcf->count != 0) {
rtcf = NULL;
@@ -1531,18 +1562,10 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
// TODO notify all apps
if (port->pair[0] != -1) {
nxt_fd_close(port->pair[0]);
}
if (port->pair[1] != -1) {
nxt_fd_close(port->pair[1]);
}
if (port->mem_pool) {
nxt_mp_destroy(port->mem_pool);
}
nxt_mp_thread_adopt(port->mem_pool);
nxt_port_release(port);
nxt_mp_thread_adopt(engine->mem_pool);
nxt_mp_destroy(engine->mem_pool);
nxt_event_engine_free(engine);
@@ -1683,17 +1706,17 @@ nxt_router_text_by_code(int code)
}
}
static void
nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
const char* fmt, ...)
{
va_list args;
nxt_buf_t *b, *last;
const char *msg;
b = nxt_buf_mem_alloc(c->mem_pool, 16384, 0);
static nxt_buf_t *
nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
const char* fmt, va_list args)
{
nxt_buf_t *b, *last;
const char *msg;
b = nxt_buf_mem_ts_alloc(task, mp, 16384);
if (nxt_slow_path(b == NULL)) {
/* TODO pogorevaTb */
return NULL;
}
b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
@@ -1704,19 +1727,38 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
msg = (const char *) b->mem.free;
va_start(args, fmt);
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
va_end(args);
nxt_log_alert(task->log, "error %d: %s", code, msg);
last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
last = nxt_buf_mem_ts_alloc(task, mp, 0);
if (nxt_slow_path(last == NULL)) {
/* TODO pogorevaTb */
nxt_mp_release(mp, b);
return NULL;
}
nxt_buf_set_sync(last);
nxt_buf_set_last(last);
nxt_buf_chain_add(&b, last);
return b;
}
static void
nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
const char* fmt, ...)
{
va_list args;
nxt_buf_t *b;
va_start(args, fmt);
b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
va_end(args);
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
@@ -1742,6 +1784,19 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
sw = obj;
app = sw->app;
if (app->workers + app->pending_workers >= app->max_workers) {
sw->work.handler = nxt_router_sw_release;
nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD "
"release to %p", sw->stream, sw->work.data);
nxt_event_engine_post(sw->work.data, &sw->work);
return;
}
app->pending_workers++;
nxt_debug(task, "send sw #%uD", sw->stream);
nxt_router_sw_add(task, nxt_router, sw);
@@ -1758,6 +1813,23 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
}
static nxt_bool_t
nxt_router_app_free(nxt_app_t *app)
{
if (app->live == 0 && app->workers == 0 &&
app->pending_workers == 0 &&
nxt_queue_is_empty(&app->requests)) {
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
return 1;
}
return 0;
}
static nxt_port_t *
nxt_router_app_get_port(nxt_app_t *app)
{
@@ -1789,6 +1861,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
nxt_app_t *app;
nxt_port_t *port;
nxt_work_t *work;
nxt_process_t *process;
nxt_queue_link_t *lnk;
nxt_req_conn_link_t *rc;
@@ -1801,7 +1874,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
if (task->thread->engine != port->engine) {
work = (nxt_work_t *) (port + 1);
work = &port->work;
nxt_debug(task, "post release port to engine %p", port->engine);
@@ -1822,7 +1895,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
nxt_debug(task, "process request #%uxD", rc->req_id);
nxt_debug(task, "app '%V' process next request #%uxD",
&app->name, rc->req_id);
rc->app_port = port;
@@ -1831,7 +1905,37 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
return;
}
nxt_debug(task, "app requests queue is empty");
if (port->pair[1] == -1) {
nxt_debug(task, "app '%V' port already closed (pid %PI dead?)",
&app->name, port->pid);
app->workers--;
nxt_router_app_free(app);
port->app = NULL;
process = port->process;
nxt_port_release(port);
if (nxt_queue_is_empty(&process->ports)) {
nxt_runtime_process_destroy(task->thread->runtime, process);
}
return;
}
if (!app->live) {
nxt_debug(task, "app '%V' is not alive, send QUIT to port",
&app->name);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
-1, 0, 0, NULL);
return;
}
nxt_debug(task, "app '%V' requests queue is empty, keep the port",
&app->name);
nxt_thread_mutex_lock(&app->mutex);
@@ -1841,29 +1945,42 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
}
void
nxt_bool_t
nxt_router_app_remove_port(nxt_port_t *port)
{
nxt_app_t *app;
if (port->app_link.next == NULL) {
return;
}
nxt_app_t *app;
nxt_bool_t busy;
app = port->app;
busy = 1;
#if (NXT_DEBUG)
if (nxt_slow_path(app == NULL)) {
nxt_abort();
if (app == NULL) {
nxt_assert(port->app_link.next == NULL);
return 1;
}
#endif
nxt_thread_mutex_lock(&app->mutex);
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
if (port->app_link.next != NULL) {
nxt_queue_remove(&port->app_link);
port->app_link.next = NULL;
busy = 0;
}
nxt_thread_mutex_unlock(&app->mutex);
if (busy == 0) {
app->workers--;
nxt_router_app_free(app);
return 1;
}
return 0;
}
@@ -1894,11 +2011,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
port = nxt_router_app_get_port(app);
if (port != NULL) {
nxt_debug(task, "already have port for app '%V'", &app->name);
rc->app_port = port;
return NXT_OK;
}
sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
if (nxt_slow_path(sw == NULL)) {
@@ -2069,6 +2187,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
return;
}
rc->ap = ap;
nxt_event_engine_request_add(engine, rc);
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
@@ -2104,7 +2224,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
}
reply_port = rc->reply_port;
ap = rc->conn->socket.data;
ap = rc->ap;
port_mp = port->mem_pool;
port->mem_pool = mp;