Request-app link introduced to prevent mp destroy for penging requests.
nxt_req_conn_link_t still used for lookup connection by request id. New nxt_req_app_link_t (ra) allocated from conn->mem_pool using mp_retain(). ra stored in app->requests if there is no free worker to process request.
This commit is contained in:
@@ -189,11 +189,8 @@ typedef struct {
|
|||||||
nxt_req_id_t req_id;
|
nxt_req_id_t req_id;
|
||||||
nxt_conn_t *conn;
|
nxt_conn_t *conn;
|
||||||
nxt_port_t *app_port;
|
nxt_port_t *app_port;
|
||||||
nxt_port_t *reply_port;
|
|
||||||
nxt_app_parse_ctx_t *ap;
|
|
||||||
|
|
||||||
nxt_queue_link_t link; /* for nxt_conn_t.requests */
|
nxt_queue_link_t link; /* for nxt_conn_t.requests */
|
||||||
nxt_queue_link_t app_link; /* for nxt_app_t.requests */
|
|
||||||
} nxt_req_conn_link_t;
|
} nxt_req_conn_link_t;
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type)
|
|||||||
nxt_mp_destroy(mp);
|
nxt_mp_destroy(mp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
|
||||||
|
|
||||||
return port;
|
return port;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -48,6 +50,9 @@ nxt_port_new(nxt_port_id_t id, nxt_pid_t pid, nxt_process_type_t type)
|
|||||||
nxt_bool_t
|
nxt_bool_t
|
||||||
nxt_port_release(nxt_port_t *port)
|
nxt_port_release(nxt_port_t *port)
|
||||||
{
|
{
|
||||||
|
nxt_thread_log_debug("port %p %d:%d release, type %d", port, port->pid,
|
||||||
|
port->id, port->type);
|
||||||
|
|
||||||
if (port->pair[0] != -1) {
|
if (port->pair[0] != -1) {
|
||||||
nxt_fd_close(port->pair[0]);
|
nxt_fd_close(port->pair[0]);
|
||||||
port->pair[0] = -1;
|
port->pair[0] = -1;
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ struct nxt_port_s {
|
|||||||
uint32_t max_size;
|
uint32_t max_size;
|
||||||
/* Maximum interleave of message parts. */
|
/* Maximum interleave of message parts. */
|
||||||
uint32_t max_share;
|
uint32_t max_share;
|
||||||
|
uint32_t app_req_id;
|
||||||
|
|
||||||
nxt_port_handler_t handler;
|
nxt_port_handler_t handler;
|
||||||
nxt_port_handler_t *data;
|
nxt_port_handler_t *data;
|
||||||
|
|||||||
424
src/nxt_router.c
424
src/nxt_router.c
@@ -20,18 +20,33 @@ typedef struct {
|
|||||||
} nxt_router_listener_conf_t;
|
} nxt_router_listener_conf_t;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct nxt_req_app_link_s nxt_req_app_link_t;
|
||||||
typedef struct nxt_start_worker_s nxt_start_worker_t;
|
typedef struct nxt_start_worker_s nxt_start_worker_t;
|
||||||
|
|
||||||
struct nxt_start_worker_s {
|
struct nxt_start_worker_s {
|
||||||
uint32_t stream;
|
uint32_t stream;
|
||||||
nxt_app_t *app;
|
nxt_app_t *app;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_app_link_t *ra;
|
||||||
nxt_mp_t *mem_pool;
|
nxt_mp_t *mem_pool;
|
||||||
|
|
||||||
nxt_work_t work;
|
nxt_work_t work;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
struct nxt_req_app_link_s {
|
||||||
|
nxt_req_id_t req_id;
|
||||||
|
nxt_port_t *app_port;
|
||||||
|
nxt_port_t *reply_port;
|
||||||
|
nxt_app_parse_ctx_t *ap;
|
||||||
|
nxt_req_conn_link_t *rc;
|
||||||
|
|
||||||
|
nxt_queue_link_t link; /* for nxt_app_t.requests */
|
||||||
|
|
||||||
|
nxt_mp_t *mem_pool;
|
||||||
|
nxt_work_t work;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
|
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
|
||||||
static nxt_int_t nxt_router_conf_new(nxt_task_t *task,
|
static nxt_int_t nxt_router_conf_new(nxt_task_t *task,
|
||||||
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
|
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
|
||||||
@@ -98,8 +113,10 @@ static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
|
|||||||
static void nxt_router_conf_release(nxt_task_t *task,
|
static void nxt_router_conf_release(nxt_task_t *task,
|
||||||
nxt_socket_conf_joint_t *joint);
|
nxt_socket_conf_joint_t *joint);
|
||||||
|
|
||||||
static nxt_bool_t nxt_router_app_free(nxt_app_t *app);
|
static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
|
||||||
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app);
|
void *data);
|
||||||
|
static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
|
||||||
|
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id);
|
||||||
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
|
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
|
||||||
void *data);
|
void *data);
|
||||||
|
|
||||||
@@ -114,7 +131,7 @@ static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
|
|||||||
static void nxt_router_process_http_request(nxt_task_t *task,
|
static void nxt_router_process_http_request(nxt_task_t *task,
|
||||||
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
|
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
|
||||||
static void nxt_router_process_http_request_mp(nxt_task_t *task,
|
static void nxt_router_process_http_request_mp(nxt_task_t *task,
|
||||||
nxt_req_conn_link_t *rc, nxt_mp_t *mp);
|
nxt_req_app_link_t *ra, nxt_port_t *port);
|
||||||
static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
|
||||||
@@ -156,12 +173,74 @@ nxt_router_start(nxt_task_t *task, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static nxt_start_worker_t *
|
||||||
|
nxt_router_sw_create(nxt_task_t *task, nxt_mp_t *mp, nxt_app_t *app,
|
||||||
|
nxt_req_app_link_t *ra)
|
||||||
|
{
|
||||||
|
nxt_port_t *master_port;
|
||||||
|
nxt_runtime_t *rt;
|
||||||
|
nxt_start_worker_t *sw;
|
||||||
|
|
||||||
|
sw = nxt_mp_retain(mp, sizeof(nxt_start_worker_t));
|
||||||
|
|
||||||
|
if (nxt_slow_path(sw == NULL)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_memzero(sw, sizeof(nxt_start_worker_t));
|
||||||
|
|
||||||
|
sw->stream = nxt_random(&task->thread->random);
|
||||||
|
sw->mem_pool = mp;
|
||||||
|
|
||||||
|
sw->app = app;
|
||||||
|
sw->ra = ra;
|
||||||
|
|
||||||
|
nxt_debug(task, "sw #%uxD create, request #%uD, app '%V' %p", sw->stream,
|
||||||
|
ra->req_id, &app->name, app);
|
||||||
|
|
||||||
|
rt = task->thread->runtime;
|
||||||
|
master_port = rt->port_by_type[NXT_PROCESS_MASTER];
|
||||||
|
|
||||||
|
sw->work.handler = nxt_router_send_sw_request;
|
||||||
|
sw->work.task = &master_port->engine->task;
|
||||||
|
sw->work.obj = sw;
|
||||||
|
sw->work.data = task->thread->engine;
|
||||||
|
sw->work.next = NULL;
|
||||||
|
|
||||||
|
if (task->thread->engine != master_port->engine) {
|
||||||
|
nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream,
|
||||||
|
master_port->engine);
|
||||||
|
|
||||||
|
nxt_event_engine_post(master_port->engine, &sw->work);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_router_send_sw_request(task, sw, sw->work.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
return sw;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
|
nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
nxt_start_worker_t *sw;
|
nxt_start_worker_t *sw;
|
||||||
|
|
||||||
sw = obj;
|
sw = obj;
|
||||||
|
engine = data;
|
||||||
|
|
||||||
|
if (task->thread->engine != engine) {
|
||||||
|
sw->work.handler = nxt_router_sw_release;
|
||||||
|
sw->work.task = &engine->task;
|
||||||
|
sw->work.next = NULL;
|
||||||
|
|
||||||
|
nxt_debug(task, "sw #%uxD post release to %p", sw->stream, engine);
|
||||||
|
|
||||||
|
nxt_event_engine_post(engine, &sw->work);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
nxt_debug(task, "sw #%uxD release", sw->stream);
|
nxt_debug(task, "sw #%uxD release", sw->stream);
|
||||||
|
|
||||||
@@ -169,11 +248,80 @@ nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static nxt_req_app_link_t *
|
||||||
|
nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
||||||
|
{
|
||||||
|
nxt_mp_t *mp;
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
|
|
||||||
|
mp = rc->conn->mem_pool;
|
||||||
|
|
||||||
|
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
|
||||||
|
|
||||||
|
if (nxt_slow_path(ra == NULL)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "ra #%uxD create", ra->req_id);
|
||||||
|
|
||||||
|
nxt_memzero(ra, sizeof(nxt_req_app_link_t));
|
||||||
|
|
||||||
|
ra->req_id = rc->req_id;
|
||||||
|
ra->app_port = NULL;
|
||||||
|
ra->rc = rc;
|
||||||
|
|
||||||
|
ra->mem_pool = mp;
|
||||||
|
|
||||||
|
ra->work.handler = NULL;
|
||||||
|
ra->work.task = &task->thread->engine->task;
|
||||||
|
ra->work.obj = ra;
|
||||||
|
ra->work.data = task->thread->engine;
|
||||||
|
|
||||||
|
return ra;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
|
|
||||||
|
ra = obj;
|
||||||
|
engine = data;
|
||||||
|
|
||||||
|
if (task->thread->engine != engine) {
|
||||||
|
ra->work.handler = nxt_router_ra_release;
|
||||||
|
ra->work.task = &engine->task;
|
||||||
|
ra->work.next = NULL;
|
||||||
|
|
||||||
|
nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine);
|
||||||
|
|
||||||
|
nxt_event_engine_post(engine, &ra->work);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "ra #%uxD release", ra->req_id);
|
||||||
|
|
||||||
|
if (ra->app_port != NULL) {
|
||||||
|
|
||||||
|
if (ra->rc->conn != NULL) {
|
||||||
|
ra->rc->app_port = ra->app_port;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_mp_release(ra->mem_pool, ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
nxt_start_worker_t *sw;
|
nxt_start_worker_t *sw;
|
||||||
nxt_event_engine_t *engine;
|
|
||||||
|
|
||||||
nxt_port_new_port_handler(task, msg);
|
nxt_port_new_port_handler(task, msg);
|
||||||
|
|
||||||
@@ -186,23 +334,16 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
if (nxt_fast_path(sw != NULL)) {
|
if (nxt_fast_path(sw != NULL)) {
|
||||||
msg->new_port->app = sw->app;
|
msg->new_port->app = sw->app;
|
||||||
|
|
||||||
sw->app->workers++;
|
|
||||||
|
|
||||||
nxt_assert(sw->app->pending_workers != 0);
|
nxt_assert(sw->app->pending_workers != 0);
|
||||||
|
|
||||||
sw->app->pending_workers--;
|
sw->app->pending_workers--;
|
||||||
|
sw->app->workers++;
|
||||||
|
|
||||||
|
nxt_debug(task, "sw #%uxD got port %p", sw->stream, msg->new_port);
|
||||||
|
|
||||||
nxt_router_app_release_port(task, msg->new_port, sw->app);
|
nxt_router_app_release_port(task, msg->new_port, sw->app);
|
||||||
|
|
||||||
engine = sw->work.data;
|
nxt_router_sw_release(task, sw, sw->work.data);
|
||||||
|
|
||||||
sw->work.handler = nxt_router_sw_release;
|
|
||||||
sw->work.task = &engine->task;
|
|
||||||
|
|
||||||
nxt_debug(task, "post sw #%uxD release to %p", sw->stream,
|
|
||||||
sw->work.data);
|
|
||||||
|
|
||||||
nxt_event_engine_post(engine, &sw->work);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1185,33 +1326,40 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|||||||
static void
|
static void
|
||||||
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
|
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
|
||||||
{
|
{
|
||||||
nxt_app_t *app;
|
nxt_app_t *app;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
|
|
||||||
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
|
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
|
||||||
|
|
||||||
nxt_queue_remove(&app->link);
|
nxt_queue_remove(&app->link);
|
||||||
|
|
||||||
|
nxt_thread_log_debug("about to remove app '%V' %p", &app->name, app);
|
||||||
|
|
||||||
app->live = 0;
|
app->live = 0;
|
||||||
|
|
||||||
if (nxt_router_app_free(app) != 0) {
|
if (nxt_router_app_free(NULL, app) != 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nxt_queue_is_empty(&app->requests)) {
|
if (!nxt_queue_is_empty(&app->requests)) {
|
||||||
|
|
||||||
do {
|
|
||||||
port = nxt_router_app_get_port(app);
|
|
||||||
if (port == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_port_socket_write(&port->engine->task, port,
|
|
||||||
NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
|
|
||||||
} while (1);
|
|
||||||
|
|
||||||
|
nxt_thread_log_debug("app '%V' %p pending requests found",
|
||||||
|
&app->name, app);
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
port = nxt_router_app_get_port(app, 0);
|
||||||
|
if (port == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_thread_log_debug("port %p send quit", port);
|
||||||
|
|
||||||
|
nxt_port_socket_write(&port->engine->task, port,
|
||||||
|
NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
|
||||||
|
} while (1);
|
||||||
|
|
||||||
} nxt_queue_loop;
|
} nxt_queue_loop;
|
||||||
|
|
||||||
nxt_queue_add(&router->apps, &tmcf->previous);
|
nxt_queue_add(&router->apps, &tmcf->previous);
|
||||||
@@ -1677,6 +1825,12 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
nxt_buf_chain_add(&b, last);
|
nxt_buf_chain_add(&b, last);
|
||||||
|
|
||||||
|
if (rc->app_port != NULL) {
|
||||||
|
nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
|
||||||
|
|
||||||
|
rc->app_port = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (b == NULL) {
|
if (b == NULL) {
|
||||||
@@ -1781,31 +1935,28 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_runtime_t *rt;
|
nxt_runtime_t *rt;
|
||||||
nxt_start_worker_t *sw;
|
nxt_start_worker_t *sw;
|
||||||
nxt_event_engine_t *engine;
|
|
||||||
|
|
||||||
sw = obj;
|
sw = obj;
|
||||||
app = sw->app;
|
app = sw->app;
|
||||||
|
|
||||||
|
nxt_queue_insert_tail(&app->requests, &sw->ra->link);
|
||||||
|
|
||||||
if (app->workers + app->pending_workers >= app->max_workers) {
|
if (app->workers + app->pending_workers >= app->max_workers) {
|
||||||
engine = sw->work.data;
|
nxt_debug(task, "app '%V' %p %uD/%uD running/penging workers, "
|
||||||
|
"post sw #%uxD release to %p", &app->name, app,
|
||||||
|
app->workers, app->pending_workers, sw->stream,
|
||||||
|
sw->work.data);
|
||||||
|
|
||||||
sw->work.handler = nxt_router_sw_release;
|
nxt_router_sw_release(task, sw, sw->work.data);
|
||||||
sw->work.task = &engine->task;
|
|
||||||
|
|
||||||
nxt_debug(task, "%uD/%uD running/penging workers, post sw #%uxD "
|
|
||||||
"release to %p", sw->stream, sw->work.data);
|
|
||||||
|
|
||||||
nxt_event_engine_post(engine, &sw->work);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
app->pending_workers++;
|
app->pending_workers++;
|
||||||
|
|
||||||
nxt_debug(task, "send sw #%uD", sw->stream);
|
nxt_debug(task, "sw #%uxD send", sw->stream);
|
||||||
|
|
||||||
nxt_router_sw_add(task, nxt_router, sw);
|
nxt_router_sw_add(task, nxt_router, sw);
|
||||||
nxt_queue_insert_tail(&app->requests, &sw->rc->app_link);
|
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
port = rt->port_by_type[NXT_PROCESS_MASTER];
|
port = rt->port_by_type[NXT_PROCESS_MASTER];
|
||||||
@@ -1819,8 +1970,17 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
|
|
||||||
static nxt_bool_t
|
static nxt_bool_t
|
||||||
nxt_router_app_free(nxt_app_t *app)
|
nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
|
||||||
{
|
{
|
||||||
|
nxt_port_t *master_port;
|
||||||
|
nxt_runtime_t *rt;
|
||||||
|
nxt_queue_link_t *lnk;
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
|
|
||||||
|
nxt_thread_log_debug("app '%V' %p state: %d/%uD/%uD/%d", &app->name, app,
|
||||||
|
app->live, app->workers, app->pending_workers,
|
||||||
|
nxt_queue_is_empty(&app->requests));
|
||||||
|
|
||||||
if (app->live == 0 && app->workers == 0 &&
|
if (app->live == 0 && app->workers == 0 &&
|
||||||
app->pending_workers == 0 &&
|
app->pending_workers == 0 &&
|
||||||
nxt_queue_is_empty(&app->requests)) {
|
nxt_queue_is_empty(&app->requests)) {
|
||||||
@@ -1831,12 +1991,26 @@ nxt_router_app_free(nxt_app_t *app)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (app->live == 1 && nxt_queue_is_empty(&app->requests) == 0 &&
|
||||||
|
(app->workers + app->pending_workers < app->max_workers)) {
|
||||||
|
|
||||||
|
lnk = nxt_queue_first(&app->requests);
|
||||||
|
nxt_queue_remove(lnk);
|
||||||
|
|
||||||
|
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
|
||||||
|
|
||||||
|
rt = task->thread->runtime;
|
||||||
|
master_port = rt->port_by_type[NXT_PROCESS_MASTER];
|
||||||
|
|
||||||
|
nxt_router_sw_create(task, master_port->mem_pool, app, ra);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_port_t *
|
static nxt_port_t *
|
||||||
nxt_router_app_get_port(nxt_app_t *app)
|
nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id)
|
||||||
{
|
{
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_queue_link_t *lnk;
|
nxt_queue_link_t *lnk;
|
||||||
@@ -1852,6 +2026,8 @@ nxt_router_app_get_port(nxt_app_t *app)
|
|||||||
lnk->next = NULL;
|
lnk->next = NULL;
|
||||||
|
|
||||||
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
||||||
|
|
||||||
|
port->app_req_id = req_id;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&app->mutex);
|
nxt_thread_mutex_unlock(&app->mutex);
|
||||||
@@ -1867,7 +2043,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_work_t *work;
|
nxt_work_t *work;
|
||||||
nxt_queue_link_t *lnk;
|
nxt_queue_link_t *lnk;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_app_link_t *ra;
|
||||||
|
|
||||||
port = obj;
|
port = obj;
|
||||||
app = data;
|
app = data;
|
||||||
@@ -1897,24 +2073,28 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
|||||||
lnk = nxt_queue_first(&app->requests);
|
lnk = nxt_queue_first(&app->requests);
|
||||||
nxt_queue_remove(lnk);
|
nxt_queue_remove(lnk);
|
||||||
|
|
||||||
rc = nxt_queue_link_data(lnk, nxt_req_conn_link_t, app_link);
|
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
|
||||||
|
|
||||||
nxt_debug(task, "app '%V' process next request #%uxD",
|
nxt_debug(task, "app '%V' %p process next request #%uxD",
|
||||||
&app->name, rc->req_id);
|
&app->name, app, ra->req_id);
|
||||||
|
|
||||||
rc->app_port = port;
|
ra->app_port = port;
|
||||||
|
|
||||||
nxt_router_process_http_request_mp(task, rc, rc->app_port->mem_pool);
|
nxt_router_process_http_request_mp(task, ra, port);
|
||||||
|
|
||||||
|
nxt_router_ra_release(task, ra, ra->work.data);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
port->app_req_id = 0;
|
||||||
|
|
||||||
if (port->pair[1] == -1) {
|
if (port->pair[1] == -1) {
|
||||||
nxt_debug(task, "app '%V' port already closed (pid %PI dead?)",
|
nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
|
||||||
&app->name, port->pid);
|
&app->name, app, port->pid);
|
||||||
|
|
||||||
app->workers--;
|
app->workers--;
|
||||||
nxt_router_app_free(app);
|
nxt_router_app_free(task, app);
|
||||||
|
|
||||||
port->app = NULL;
|
port->app = NULL;
|
||||||
|
|
||||||
@@ -1924,8 +2104,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (!app->live) {
|
if (!app->live) {
|
||||||
nxt_debug(task, "app '%V' is not alive, send QUIT to port",
|
nxt_debug(task, "app '%V' %p is not alive, send QUIT to port",
|
||||||
&app->name);
|
&app->name, app);
|
||||||
|
|
||||||
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT,
|
||||||
-1, 0, 0, NULL);
|
-1, 0, 0, NULL);
|
||||||
@@ -1933,8 +2113,8 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_debug(task, "app '%V' requests queue is empty, keep the port",
|
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
|
||||||
&app->name);
|
&app->name, app);
|
||||||
|
|
||||||
nxt_thread_mutex_lock(&app->mutex);
|
nxt_thread_mutex_lock(&app->mutex);
|
||||||
|
|
||||||
@@ -1951,9 +2131,11 @@ nxt_router_app_remove_port(nxt_port_t *port)
|
|||||||
nxt_bool_t busy;
|
nxt_bool_t busy;
|
||||||
|
|
||||||
app = port->app;
|
app = port->app;
|
||||||
busy = 1;
|
busy = port->app_req_id != 0;
|
||||||
|
|
||||||
if (app == NULL) {
|
if (app == NULL) {
|
||||||
|
nxt_thread_log_debug("port %p app remove, no app", port);
|
||||||
|
|
||||||
nxt_assert(port->app_link.next == NULL);
|
nxt_assert(port->app_link.next == NULL);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
@@ -1966,84 +2148,66 @@ nxt_router_app_remove_port(nxt_port_t *port)
|
|||||||
nxt_queue_remove(&port->app_link);
|
nxt_queue_remove(&port->app_link);
|
||||||
port->app_link.next = NULL;
|
port->app_link.next = NULL;
|
||||||
|
|
||||||
busy = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&app->mutex);
|
nxt_thread_mutex_unlock(&app->mutex);
|
||||||
|
|
||||||
if (busy == 0) {
|
if (busy == 0) {
|
||||||
|
nxt_thread_log_debug("port %p app remove, free, app '%V' %p", port,
|
||||||
|
&app->name, app);
|
||||||
|
|
||||||
app->workers--;
|
app->workers--;
|
||||||
nxt_router_app_free(app);
|
nxt_router_app_free(&port->engine->task, app);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD",
|
||||||
|
port, &app->name, app, port->app_req_id);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_inline nxt_int_t
|
static nxt_int_t
|
||||||
nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
|
||||||
{
|
{
|
||||||
nxt_app_t *app;
|
nxt_app_t *app;
|
||||||
nxt_conn_t *c;
|
nxt_conn_t *c;
|
||||||
nxt_port_t *port, *master_port;
|
nxt_port_t *port;
|
||||||
nxt_runtime_t *rt;
|
|
||||||
nxt_start_worker_t *sw;
|
nxt_start_worker_t *sw;
|
||||||
nxt_socket_conf_joint_t *joint;
|
nxt_socket_conf_joint_t *joint;
|
||||||
|
|
||||||
port = NULL;
|
port = NULL;
|
||||||
c = rc->conn;
|
c = ra->rc->conn;
|
||||||
|
|
||||||
joint = c->listen->socket.data;
|
joint = c->listen->socket.data;
|
||||||
app = joint->socket_conf->application;
|
app = joint->socket_conf->application;
|
||||||
|
|
||||||
|
|
||||||
if (app == NULL) {
|
if (app == NULL) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500,
|
nxt_router_gen_error(task, c, 500,
|
||||||
"Application is NULL in socket_conf");
|
"Application is NULL in socket_conf");
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
port = nxt_router_app_get_port(app);
|
port = nxt_router_app_get_port(app, ra->req_id);
|
||||||
|
|
||||||
if (port != NULL) {
|
if (port != NULL) {
|
||||||
nxt_debug(task, "already have port for app '%V'", &app->name);
|
nxt_debug(task, "already have port for app '%V'", &app->name);
|
||||||
|
|
||||||
rc->app_port = port;
|
ra->app_port = port;
|
||||||
return NXT_OK;
|
return NXT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
sw = nxt_mp_retain(c->mem_pool, sizeof(nxt_start_worker_t));
|
sw = nxt_router_sw_create(task, c->mem_pool, app, ra);
|
||||||
|
|
||||||
if (nxt_slow_path(sw == NULL)) {
|
if (nxt_slow_path(sw == NULL)) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500,
|
nxt_router_gen_error(task, c, 500,
|
||||||
"Failed to allocate start worker struct");
|
"Failed to allocate start worker struct");
|
||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_memzero(sw, sizeof(nxt_start_worker_t));
|
|
||||||
|
|
||||||
sw->stream = nxt_random(&task->thread->random);
|
|
||||||
sw->app = app;
|
|
||||||
sw->rc = rc;
|
|
||||||
sw->mem_pool = c->mem_pool;
|
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
|
||||||
master_port = rt->port_by_type[NXT_PROCESS_MASTER];
|
|
||||||
|
|
||||||
sw->work.handler = nxt_router_send_sw_request;
|
|
||||||
sw->work.task = &master_port->engine->task;
|
|
||||||
sw->work.obj = sw;
|
|
||||||
sw->work.data = task->thread->engine;
|
|
||||||
|
|
||||||
nxt_debug(task, "post send sw %uxD to master engine %p", sw->stream,
|
|
||||||
master_port->engine);
|
|
||||||
|
|
||||||
nxt_event_engine_post(master_port->engine, &sw->work);
|
|
||||||
|
|
||||||
return NXT_AGAIN;
|
return NXT_AGAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2164,9 +2328,12 @@ static void
|
|||||||
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
||||||
nxt_app_parse_ctx_t *ap)
|
nxt_app_parse_ctx_t *ap)
|
||||||
{
|
{
|
||||||
|
nxt_mp_t *port_mp;
|
||||||
nxt_int_t res;
|
nxt_int_t res;
|
||||||
|
nxt_port_t *port;
|
||||||
nxt_req_id_t req_id;
|
nxt_req_id_t req_id;
|
||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_conn_link_t *rc;
|
||||||
|
|
||||||
engine = task->thread->engine;
|
engine = task->thread->engine;
|
||||||
@@ -2184,47 +2351,55 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc->ap = ap;
|
|
||||||
|
|
||||||
nxt_event_engine_request_add(engine, rc);
|
nxt_event_engine_request_add(engine, rc);
|
||||||
|
|
||||||
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
|
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
|
||||||
req_id, c, engine);
|
req_id, c, engine);
|
||||||
|
|
||||||
rc->reply_port = engine->port;
|
|
||||||
|
|
||||||
res = nxt_router_app_port(task, rc);
|
ra = nxt_router_ra_create(task, rc);
|
||||||
|
|
||||||
|
ra->ap = ap;
|
||||||
|
ra->reply_port = engine->port;
|
||||||
|
|
||||||
|
res = nxt_router_app_port(task, ra);
|
||||||
|
|
||||||
if (res != NXT_OK) {
|
if (res != NXT_OK) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_router_process_http_request_mp(task, rc, c->mem_pool);
|
port = ra->app_port;
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
|
|
||||||
nxt_mp_t *mp)
|
|
||||||
{
|
|
||||||
nxt_mp_t *port_mp;
|
|
||||||
nxt_int_t res;
|
|
||||||
nxt_port_t *port, *c_port, *reply_port;
|
|
||||||
nxt_app_wmsg_t wmsg;
|
|
||||||
nxt_app_parse_ctx_t *ap;
|
|
||||||
|
|
||||||
port = rc->app_port;
|
|
||||||
|
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
|
nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
reply_port = rc->reply_port;
|
|
||||||
ap = rc->ap;
|
|
||||||
|
|
||||||
port_mp = port->mem_pool;
|
port_mp = port->mem_pool;
|
||||||
port->mem_pool = mp;
|
port->mem_pool = c->mem_pool;
|
||||||
|
|
||||||
|
nxt_router_process_http_request_mp(task, ra, port);
|
||||||
|
|
||||||
|
port->mem_pool = port_mp;
|
||||||
|
|
||||||
|
|
||||||
|
nxt_router_ra_release(task, ra, ra->work.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
|
||||||
|
nxt_port_t *port)
|
||||||
|
{
|
||||||
|
nxt_int_t res;
|
||||||
|
nxt_port_t *c_port, *reply_port;
|
||||||
|
nxt_conn_t *c;
|
||||||
|
nxt_app_wmsg_t wmsg;
|
||||||
|
nxt_app_parse_ctx_t *ap;
|
||||||
|
|
||||||
|
reply_port = ra->reply_port;
|
||||||
|
ap = ra->ap;
|
||||||
|
c = ra->rc->conn;
|
||||||
|
|
||||||
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
|
c_port = nxt_process_connected_port_find(port->process, reply_port->pid,
|
||||||
reply_port->id);
|
reply_port->id);
|
||||||
@@ -2232,9 +2407,9 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
|
|||||||
res = nxt_port_send_port(task, port, reply_port, 0);
|
res = nxt_port_send_port(task, port, reply_port, 0);
|
||||||
|
|
||||||
if (nxt_slow_path(res != NXT_OK)) {
|
if (nxt_slow_path(res != NXT_OK)) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500,
|
nxt_router_gen_error(task, c, 500,
|
||||||
"Failed to send reply port to application");
|
"Failed to send reply port to application");
|
||||||
goto fail;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_process_connected_port_add(port->process, reply_port);
|
nxt_process_connected_port_add(port->process, reply_port);
|
||||||
@@ -2243,14 +2418,14 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
|
|||||||
wmsg.port = port;
|
wmsg.port = port;
|
||||||
wmsg.write = NULL;
|
wmsg.write = NULL;
|
||||||
wmsg.buf = &wmsg.write;
|
wmsg.buf = &wmsg.write;
|
||||||
wmsg.stream = rc->req_id;
|
wmsg.stream = ra->req_id;
|
||||||
|
|
||||||
res = rc->app_port->app->module->prepare_msg(task, &ap->r, &wmsg);
|
res = port->app->module->prepare_msg(task, &ap->r, &wmsg);
|
||||||
|
|
||||||
if (nxt_slow_path(res != NXT_OK)) {
|
if (nxt_slow_path(res != NXT_OK)) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500,
|
nxt_router_gen_error(task, c, 500,
|
||||||
"Failed to prepare message for application");
|
"Failed to prepare message for application");
|
||||||
goto fail;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
|
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
|
||||||
@@ -2258,16 +2433,13 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_conn_link_t *rc,
|
|||||||
wmsg.port->socket.fd);
|
wmsg.port->socket.fd);
|
||||||
|
|
||||||
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
|
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
|
||||||
-1, rc->req_id, reply_port->id, wmsg.write);
|
-1, ra->req_id, reply_port->id, wmsg.write);
|
||||||
|
|
||||||
if (nxt_slow_path(res != NXT_OK)) {
|
if (nxt_slow_path(res != NXT_OK)) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500,
|
nxt_router_gen_error(task, c, 500,
|
||||||
"Failed to send message to application");
|
"Failed to send message to application");
|
||||||
goto fail;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
fail:
|
|
||||||
port->mem_pool = port_mp;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -2378,6 +2550,8 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
|||||||
rc->app_port = NULL;
|
rc->app_port = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rc->conn = NULL;
|
||||||
|
|
||||||
nxt_event_engine_request_remove(task->thread->engine, rc);
|
nxt_event_engine_request_remove(task->thread->engine, rc);
|
||||||
|
|
||||||
} nxt_queue_loop;
|
} nxt_queue_loop;
|
||||||
|
|||||||
Reference in New Issue
Block a user