Optimized request<->app link allocation.

Only purpose of request<->app link instance is to be enqueued in application
requests queue.

It is possible to avoid request<->app link allocation from memory pool in
case when spare application port is available.  Instance from local stack
can be used to prepare and send message to application.
This commit is contained in:
Max Romanov
2017-10-04 15:03:03 +03:00
parent 439bf7df11
commit 0faecee609

View File

@@ -302,24 +302,14 @@ nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app)
} }
static nxt_req_app_link_t * nxt_inline void
nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc) nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
nxt_req_conn_link_t *rc)
{ {
nxt_mp_t *mp;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_req_app_link_t *ra;
mp = rc->ap->mem_pool;
engine = task->thread->engine; engine = task->thread->engine;
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
if (nxt_slow_path(ra == NULL)) {
return NULL;
}
nxt_debug(task, "ra stream #%uD create", rc->stream);
nxt_memzero(ra, sizeof(nxt_req_app_link_t)); nxt_memzero(ra, sizeof(nxt_req_app_link_t));
ra->stream = rc->stream; ra->stream = rc->stream;
@@ -327,13 +317,36 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
ra->rc = rc; ra->rc = rc;
rc->ra = ra; rc->ra = ra;
ra->reply_port = engine->port; ra->reply_port = engine->port;
ra->ap = rc->ap;
ra->mem_pool = mp;
ra->work.handler = NULL; ra->work.handler = NULL;
ra->work.task = &engine->task; ra->work.task = &engine->task;
ra->work.obj = ra; ra->work.obj = ra;
ra->work.data = engine; ra->work.data = engine;
}
nxt_inline nxt_req_app_link_t *
nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
{
nxt_mp_t *mp;
nxt_req_app_link_t *ra;
mp = ra_src->ap->mem_pool;
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
if (nxt_slow_path(ra == NULL)) {
ra_src->rc->ra = NULL;
ra_src->rc = NULL;
return NULL;
}
nxt_router_ra_init(task, ra, ra_src->rc);
ra->mem_pool = mp;
return ra; return ra;
} }
@@ -388,7 +401,9 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
ra->app_port = NULL; ra->app_port = NULL;
} }
if (ra->mem_pool != NULL) {
nxt_mp_release(ra->mem_pool, ra); nxt_mp_release(ra->mem_pool, ra);
}
} }
@@ -435,7 +450,9 @@ nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
ra->app_port = NULL; ra->app_port = NULL;
} }
if (ra->mem_pool != NULL) {
nxt_mp_release(ra->mem_pool, ra); nxt_mp_release(ra->mem_pool, ra);
}
} }
@@ -494,7 +511,9 @@ nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra, int code,
ra->app_port = NULL; ra->app_port = NULL;
} }
if (ra->mem_pool != NULL) {
nxt_mp_release(ra->mem_pool, ra); nxt_mp_release(ra->mem_pool, ra);
}
} }
@@ -2807,14 +2826,17 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
nxt_timer_add(engine, &c->read_timer, app->timeout); nxt_timer_add(engine, &c->read_timer, app->timeout);
} }
can_start_worker = 0;
nxt_thread_mutex_lock(&app->mutex); nxt_thread_mutex_lock(&app->mutex);
if (!nxt_queue_is_empty(&app->ports)) { if (!nxt_queue_is_empty(&app->ports)) {
port = nxt_router_app_get_port_unsafe(app, &use_delta); port = nxt_router_app_get_port_unsafe(app, &use_delta);
can_start_worker = 0;
} else { } else {
ra = nxt_router_ra_create(task, ra);
if (nxt_fast_path(ra != NULL)) {
nxt_queue_insert_tail(&app->requests, &ra->link); nxt_queue_insert_tail(&app->requests, &ra->link);
can_start_worker = (app->workers + app->pending_workers) < can_start_worker = (app->workers + app->pending_workers) <
@@ -2822,12 +2844,19 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
if (can_start_worker) { if (can_start_worker) {
app->pending_workers++; app->pending_workers++;
} }
}
port = NULL; port = NULL;
} }
nxt_thread_mutex_unlock(&app->mutex); nxt_thread_mutex_unlock(&app->mutex);
if (nxt_slow_path(ra == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"req<->app link");
return NXT_ERROR;
}
if (port != NULL) { if (port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app); nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
@@ -2839,6 +2868,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
return NXT_OK; return NXT_OK;
} }
nxt_debug(task, "ra stream #%uD allocated", ra->stream);
if (!can_start_worker) { if (!can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers", nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app); &app->name, app);
@@ -3060,7 +3091,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_int_t res; nxt_int_t res;
nxt_port_t *port; nxt_port_t *port;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_req_app_link_t *ra; nxt_req_app_link_t ra_local, *ra;
nxt_req_conn_link_t *rc; nxt_req_conn_link_t *rc;
engine = task->thread->engine; engine = task->thread->engine;
@@ -3088,16 +3119,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
rc->ap = ap; rc->ap = ap;
c->socket.data = NULL; c->socket.data = NULL;
ra = nxt_router_ra_create(task, rc); ra = &ra_local;
nxt_router_ra_init(task, ra, rc);
if (nxt_slow_path(ra == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"req<->app link");
return;
}
ra->ap = ap;
res = nxt_router_app_port(task, ra); res = nxt_router_app_port(task, ra);