Fixing application timeout.

Application timeout limits maximum time of worker response in processing
particular request.  Not including the time required to start worker,
time in request queue etc.
This commit is contained in:
Max Romanov
2017-12-27 17:47:18 +03:00
parent ab138c9166
commit bef2ec483e
4 changed files with 368 additions and 113 deletions

View File

@@ -66,6 +66,7 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
nxt_queue_init(&port->messages); nxt_queue_init(&port->messages);
nxt_thread_mutex_create(&port->write_mutex); nxt_thread_mutex_create(&port->write_mutex);
nxt_queue_init(&port->pending_requests);
} else { } else {
nxt_mp_destroy(mp); nxt_mp_destroy(mp);

View File

@@ -168,6 +168,7 @@ struct nxt_port_s {
uint32_t app_pending_responses; uint32_t app_pending_responses;
uint32_t app_responses; uint32_t app_responses;
nxt_queue_t pending_requests;
nxt_port_handler_t handler; nxt_port_handler_t handler;
nxt_port_handler_t *data; nxt_port_handler_t *data;
@@ -265,4 +266,9 @@ nxt_int_t nxt_port_post(nxt_task_t *task, nxt_port_t *port,
nxt_port_post_handler_t handler, void *data); nxt_port_post_handler_t handler, void *data);
void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i); void nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i);
nxt_inline void nxt_port_inc_use(nxt_port_t *port)
{
nxt_atomic_fetch_add(&port->use_count, 1);
}
#endif /* _NXT_PORT_H_INCLUDED_ */ #endif /* _NXT_PORT_H_INCLUDED_ */

View File

@@ -25,6 +25,11 @@ struct nxt_port_rpc_reg_s {
}; };
static void
nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
nxt_port_rpc_reg_t *reg);
static nxt_int_t static nxt_int_t
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data) nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
{ {
@@ -168,10 +173,17 @@ nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
nxt_assert(reg->data == ex); nxt_assert(reg->data == ex);
if (peer == -1 || reg->peer != -1) { if (nxt_slow_path(peer == reg->peer)) {
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to " return;
"change peer %PI->%PI", reg->stream, reg->peer, peer); }
if (reg->peer != -1) {
nxt_port_rpc_remove_from_peers(task, port, reg);
reg->peer = -1;
}
if (peer == -1) {
return; return;
} }
@@ -359,7 +371,6 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
msg.port_msg.pid = peer; msg.port_msg.pid = peer;
msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID; msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
msg.port_msg.last = 1;
peer_link = lhq.value; peer_link = lhq.value;
last = 0; last = 0;
@@ -375,20 +386,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
nxt_debug(task, "rpc: stream #%uD trigger error", stream); nxt_debug(task, "rpc: stream #%uD trigger error", stream);
msg.port_msg.stream = stream; msg.port_msg.stream = stream;
msg.port_msg.last = 1;
reg->error_handler(task, &msg, reg->data);
nxt_port_rpc_lhq_stream(&lhq, &stream);
lhq.pool = port->mem_pool;
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_log_error(NXT_LOG_ERR, task->log,
"rpc: stream #%uD failed to delete handler", stream);
return;
}
if (peer_link == peer_link->next) { if (peer_link == peer_link->next) {
nxt_assert(peer_link->prev == peer_link); nxt_assert(peer_link->prev == peer_link);
@@ -405,6 +403,27 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
peer_link = next_link; peer_link = next_link;
} }
reg->peer = -1;
reg->error_handler(task, &msg, reg->data);
/* Reset 'last' flag to preserve rpc handler. */
if (msg.port_msg.last == 0) {
continue;
}
nxt_port_rpc_lhq_stream(&lhq, &stream);
lhq.pool = port->mem_pool;
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_log_error(NXT_LOG_ERR, task->log,
"rpc: stream #%uD failed to delete handler", stream);
return;
}
nxt_mp_free(port->mem_pool, reg); nxt_mp_free(port->mem_pool, reg);
} }
} }

View File

@@ -48,14 +48,15 @@ typedef struct {
struct nxt_req_app_link_s { struct nxt_req_app_link_s {
uint32_t stream; uint32_t stream;
nxt_atomic_t use_count;
nxt_port_t *app_port; nxt_port_t *app_port;
nxt_pid_t app_pid;
nxt_port_t *reply_port; nxt_port_t *reply_port;
nxt_app_parse_ctx_t *ap; nxt_app_parse_ctx_t *ap;
nxt_msg_info_t msg_info; nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc; nxt_req_conn_link_t *rc;
nxt_queue_link_t link; /* for nxt_app_t.requests */ nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
nxt_mp_t *mem_pool; nxt_mp_t *mem_pool;
nxt_work_t work; nxt_work_t work;
@@ -73,6 +74,24 @@ typedef struct {
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app); static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
nxt_router_ra_inc_use(nxt_req_app_link_t *ra)
{
nxt_atomic_fetch_add(&ra->use_count, 1);
}
nxt_inline void
nxt_router_ra_dec_use(nxt_req_app_link_t *ra)
{
int c;
c = nxt_atomic_fetch_add(&ra->use_count, -1);
nxt_assert(c > 1);
}
static void nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i);
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task); static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data); static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conf_ready(nxt_task_t *task, static void nxt_router_conf_ready(nxt_task_t *task,
@@ -153,6 +172,7 @@ static void nxt_router_app_port_error(nxt_task_t *task,
static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app); static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response); uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -162,7 +182,7 @@ static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
void *data); void *data);
static void nxt_router_process_http_request(nxt_task_t *task, static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap); nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task, static void nxt_router_app_prepare_request(nxt_task_t *task,
nxt_req_app_link_t *ra); nxt_req_app_link_t *ra);
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg); nxt_app_wmsg_t *wmsg);
@@ -319,7 +339,7 @@ nxt_router_ra_init(nxt_task_t *task, nxt_req_app_link_t *ra,
nxt_memzero(ra, sizeof(nxt_req_app_link_t)); nxt_memzero(ra, sizeof(nxt_req_app_link_t));
ra->stream = rc->stream; ra->stream = rc->stream;
ra->app_pid = -1; ra->use_count = 1;
ra->rc = rc; ra->rc = rc;
rc->ra = ra; rc->ra = ra;
ra->reply_port = engine->port; ra->reply_port = engine->port;
@@ -338,6 +358,10 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_app_link_t *ra_src)
nxt_mp_t *mp; nxt_mp_t *mp;
nxt_req_app_link_t *ra; nxt_req_app_link_t *ra;
if (ra_src->mem_pool != NULL) {
return ra_src;
}
mp = ra_src->ap->mem_pool; mp = ra_src->ap->mem_pool;
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t)); ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
@@ -394,25 +418,38 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
static void static void
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data) nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra);
static void
nxt_router_ra_update_peer_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_req_app_link_t *ra;
ra = obj;
nxt_router_ra_update_peer(task, ra);
nxt_router_ra_use(task, ra, -1);
}
static void
nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
{ {
nxt_req_app_link_t *ra;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc; nxt_req_conn_link_t *rc;
ra = obj; engine = ra->work.data;
engine = data;
if (task->thread->engine != engine) { if (task->thread->engine != engine) {
if (ra->app_port != NULL) { nxt_router_ra_inc_use(ra);
ra->app_pid = ra->app_port->pid;
}
ra->work.handler = nxt_router_ra_release; ra->work.handler = nxt_router_ra_update_peer_handler;
ra->work.task = &engine->task; ra->work.task = &engine->task;
ra->work.next = NULL; ra->work.next = NULL;
nxt_debug(task, "ra stream #%uD post release to %p", nxt_debug(task, "ra stream #%uD post update peer to %p",
ra->stream, engine); ra->stream, engine);
nxt_event_engine_post(engine, &ra->work); nxt_event_engine_post(engine, &ra->work);
@@ -420,22 +457,47 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
return; return;
} }
nxt_debug(task, "ra stream #%uD update peer", ra->stream);
rc = ra->rc;
if (rc != NULL && ra->app_port != NULL) {
nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_port->pid);
}
nxt_router_ra_use(task, ra, -1);
}
static void
nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
{
nxt_conn_t *c;
nxt_req_conn_link_t *rc;
nxt_assert(task->thread->engine == ra->work.data);
nxt_assert(ra->use_count == 0);
nxt_debug(task, "ra stream #%uD release", ra->stream); nxt_debug(task, "ra stream #%uD release", ra->stream);
rc = ra->rc; rc = ra->rc;
if (rc != NULL) { if (rc != NULL) {
if (ra->app_pid != -1) { c = rc->conn;
nxt_port_rpc_ex_set_peer(task, engine->port, rc, ra->app_pid);
}
if (nxt_slow_path(ra->err_code != 0)) { if (nxt_slow_path(ra->err_code != 0)) {
nxt_router_gen_error(task, rc->conn, ra->err_code, ra->err_str); nxt_router_gen_error(task, c, ra->err_code, ra->err_str);
} else { } else {
rc->app_port = ra->app_port; rc->app_port = ra->app_port;
rc->msg_info = ra->msg_info; rc->msg_info = ra->msg_info;
if (rc->app->timeout != 0) {
c->read_timer.handler = nxt_router_app_timeout;
nxt_timer_add(task->thread->engine, &c->read_timer,
rc->app->timeout);
}
ra->app_port = NULL; ra->app_port = NULL;
ra->msg_info.buf = NULL; ra->msg_info.buf = NULL;
} }
@@ -458,6 +520,52 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
} }
static void
nxt_router_ra_release_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_req_app_link_t *ra;
ra = obj;
nxt_assert(ra->work.data == data);
nxt_atomic_fetch_add(&ra->use_count, -1);
nxt_router_ra_release(task, ra);
}
static void
nxt_router_ra_use(nxt_task_t *task, nxt_req_app_link_t *ra, int i)
{
int c;
nxt_event_engine_t *engine;
c = nxt_atomic_fetch_add(&ra->use_count, i);
if (i < 0 && c == -i) {
engine = ra->work.data;
if (task->thread->engine == engine) {
nxt_router_ra_release(task, ra);
return;
}
nxt_router_ra_inc_use(ra);
ra->work.handler = nxt_router_ra_release_handler;
ra->work.task = &engine->task;
ra->work.next = NULL;
nxt_debug(task, "ra stream #%uD post release to %p",
ra->stream, engine);
nxt_event_engine_post(engine, &ra->work);
}
}
nxt_inline void nxt_inline void
nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str) nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
{ {
@@ -467,9 +575,25 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
} }
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
if (lnk->next != NULL) {
nxt_queue_remove(lnk);
lnk->next = NULL;
return 1;
}
return 0;
}
nxt_inline void nxt_inline void
nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc) nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
{ {
int ra_use_delta;
nxt_req_app_link_t *ra; nxt_req_app_link_t *ra;
if (rc->app_port != NULL) { if (rc->app_port != NULL) {
@@ -486,22 +610,25 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
rc->ra = NULL; rc->ra = NULL;
ra->rc = NULL; ra->rc = NULL;
ra_use_delta = 0;
nxt_thread_mutex_lock(&rc->app->mutex); nxt_thread_mutex_lock(&rc->app->mutex);
if (ra->link.next != NULL) { if (ra->link_app_requests.next == NULL
nxt_queue_remove(&ra->link); && ra->link_port_pending.next == NULL)
{
ra->link.next = NULL; ra = NULL;
} else { } else {
ra = NULL; ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
} }
nxt_thread_mutex_unlock(&rc->app->mutex); nxt_thread_mutex_unlock(&rc->app->mutex);
}
if (ra != NULL) { if (ra != NULL) {
nxt_router_ra_release(task, ra, ra->work.data); nxt_router_ra_use(task, ra, ra_use_delta);
}
} }
if (rc->app != NULL) { if (rc->app != NULL) {
@@ -2266,6 +2393,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
size_t dump_size; size_t dump_size;
nxt_buf_t *b, *last; nxt_buf_t *b, *last;
nxt_conn_t *c; nxt_conn_t *c;
nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc; nxt_req_conn_link_t *rc;
b = msg->buf; b = msg->buf;
@@ -2287,6 +2415,10 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
b = NULL; b = NULL;
} }
engine = task->thread->engine;
nxt_timer_disable(engine, &c->read_timer);
if (msg->port_msg.last != 0) { if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf"); nxt_debug(task, "router data create last buf");
@@ -2298,6 +2430,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_buf_chain_add(&b, last); nxt_buf_chain_add(&b, last);
nxt_router_rc_unlink(task, rc); nxt_router_rc_unlink(task, rc);
} else {
if (rc->app->timeout != 0) {
c->read_timer.handler = nxt_router_app_timeout;
nxt_timer_add(engine, &c->read_timer, rc->app->timeout);
}
} }
if (b == NULL) { if (b == NULL) {
@@ -2327,10 +2465,41 @@ static void
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data) void *data)
{ {
nxt_int_t res;
nxt_port_t *port;
nxt_bool_t cancelled;
nxt_req_app_link_t *ra;
nxt_req_conn_link_t *rc; nxt_req_conn_link_t *rc;
rc = data; rc = data;
ra = rc->ra;
if (ra != NULL) {
cancelled = nxt_router_msg_cancel(task, &ra->msg_info, ra->stream);
if (cancelled) {
nxt_router_ra_inc_use(ra);
res = nxt_router_app_port(task, ra);
if (res == NXT_OK) {
port = ra->app_port;
nxt_assert(port != NULL);
nxt_port_rpc_ex_set_peer(task, task->thread->engine->port, rc,
port->pid);
nxt_router_app_prepare_request(task, ra);
}
msg->port_msg.last = 0;
return;
}
}
nxt_router_gen_error(task, rc->conn, 500, nxt_router_gen_error(task, rc->conn, 500,
"Application terminated unexpectedly"); "Application terminated unexpectedly");
@@ -2478,7 +2647,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_queue_remove(lnk); nxt_queue_remove(lnk);
lnk->next = NULL; lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
} else { } else {
ra = NULL; ra = NULL;
@@ -2491,7 +2660,7 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
&app->name, app, ra->stream); &app->name, app, ra->stream);
nxt_router_ra_error(ra, 500, "Failed to start application worker"); nxt_router_ra_error(ra, 500, "Failed to start application worker");
nxt_router_ra_release(task, ra, ra->work.data); nxt_router_ra_use(task, ra, -1);
} }
nxt_router_app_use(task, app, -1); nxt_router_app_use(task, app, -1);
@@ -2533,7 +2702,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app)
nxt_inline nxt_port_t * nxt_inline nxt_port_t *
nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta) nxt_router_app_get_port_unsafe(nxt_app_t *app)
{ {
nxt_port_t *port; nxt_port_t *port;
nxt_queue_link_t *lnk; nxt_queue_link_t *lnk;
@@ -2550,10 +2719,10 @@ nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
{ {
nxt_queue_insert_tail(&app->ports, lnk); nxt_queue_insert_tail(&app->ports, lnk);
nxt_port_inc_use(port);
} else { } else {
lnk->next = NULL; lnk->next = NULL;
(*use_delta)--;
} }
return port; return port;
@@ -2606,7 +2775,7 @@ nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "app '%V' %p process next stream #%uD", nxt_debug(task, "app '%V' %p process next stream #%uD",
&app->name, app, ra->stream); &app->name, app, ra->stream);
nxt_router_process_http_request_mp(task, ra); nxt_router_app_prepare_request(task, ra);
} }
@@ -2614,19 +2783,16 @@ static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port, nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response) uint32_t request_failed, uint32_t got_response)
{ {
int use_delta, ra_use_delta;
nxt_app_t *app; nxt_app_t *app;
nxt_bool_t send_quit; nxt_bool_t send_quit;
nxt_queue_link_t *lnk; nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra; nxt_req_app_link_t *ra, *next_ra;
nxt_assert(port != NULL); nxt_assert(port != NULL);
nxt_assert(port->app != NULL); nxt_assert(port->app != NULL);
app = port->app; app = port->app;
use_delta = (request_failed == 0 && got_response == 0) ? 0 : -1;
nxt_thread_mutex_lock(&app->mutex); nxt_thread_mutex_lock(&app->mutex);
port->app_pending_responses -= request_failed + got_response; port->app_pending_responses -= request_failed + got_response;
@@ -2645,7 +2811,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_insert_head(&app->ports, &port->app_link); nxt_queue_insert_head(&app->ports, &port->app_link);
} }
use_delta++; nxt_port_inc_use(port);
} else { } else {
if (port->app_pending_responses == 0 if (port->app_pending_responses == 0
@@ -2665,21 +2831,46 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk); nxt_queue_remove(lnk);
lnk->next = NULL; lnk->next = NULL;
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link); ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
ra_use_delta = 1; ra->app_port = nxt_router_app_get_port_unsafe(app);
ra->app_port = nxt_router_app_get_port_unsafe(app, &ra_use_delta);
if (ra->app_port->app_pending_responses > 1) {
nxt_queue_insert_tail(&ra->app_port->pending_requests,
&ra->link_port_pending);
nxt_router_ra_inc_use(ra);
}
} else { } else {
ra = NULL; ra = NULL;
ra_use_delta = 0; }
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
{
lnk = nxt_queue_first(&port->pending_requests);
nxt_queue_remove(lnk);
lnk->next = NULL;
next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
link_port_pending);
} else {
next_ra = NULL;
} }
send_quit = app->live == 0 && port->app_pending_responses > 0; send_quit = app->live == 0 && port->app_pending_responses > 0;
nxt_thread_mutex_unlock(&app->mutex); nxt_thread_mutex_unlock(&app->mutex);
if (next_ra != NULL) {
nxt_router_ra_use(task, next_ra, -1);
}
if (ra != NULL) { if (ra != NULL) {
nxt_router_ra_use(task, ra, -1);
nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request, nxt_router_app_process_request,
&task->thread->engine->task, app, ra); &task->thread->engine->task, app, ra);
@@ -2710,12 +2901,8 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
adjust_use: adjust_use:
if (use_delta != 0) { if (request_failed > 0 || got_response > 0) {
nxt_port_use(task, port, use_delta); nxt_port_use(task, port, -1);
}
if (ra_use_delta != 0) {
nxt_port_use(task, ra->app_port, ra_use_delta);
} }
} }
@@ -2766,42 +2953,47 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
static nxt_int_t static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
{ {
int use_delta; int failed_port_use_delta;
nxt_int_t res; nxt_int_t res;
nxt_app_t *app; nxt_app_t *app;
nxt_bool_t can_start_worker; nxt_bool_t can_start_worker;
nxt_conn_t *c; nxt_conn_t *c;
nxt_port_t *port; nxt_port_t *port, *failed_port;
nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
use_delta = 1;
c = ra->rc->conn; c = ra->rc->conn;
app = ra->rc->app;
joint = c->joint; failed_port_use_delta = 0;
app = joint->socket_conf->application;
if (app == NULL) {
nxt_router_gen_error(task, c, 500,
"Application is NULL in socket_conf");
return NXT_ERROR;
}
ra->rc->app = app;
nxt_router_app_use(task, app, 1);
engine = task->thread->engine;
nxt_timer_disable(engine, &c->read_timer);
if (app->timeout != 0) {
c->read_timer.handler = nxt_router_app_timeout;
nxt_timer_add(engine, &c->read_timer, app->timeout);
}
nxt_thread_mutex_lock(&app->mutex); nxt_thread_mutex_lock(&app->mutex);
if (nxt_queue_chk_remove(&ra->link_app_requests))
{
nxt_router_ra_dec_use(ra);
}
if (nxt_queue_chk_remove(&ra->link_port_pending))
{
nxt_router_ra_dec_use(ra);
}
if (ra->app_port != NULL) {
failed_port = ra->app_port;
failed_port_use_delta--;
failed_port->app_pending_responses--;
if (failed_port->app_link.next != NULL) {
nxt_queue_remove(&failed_port->app_link);
failed_port->app_link.next = NULL;
failed_port_use_delta--;
}
} else {
failed_port = NULL;
}
can_start_worker = (app->workers + app->pending_workers) can_start_worker = (app->workers + app->pending_workers)
< app->max_workers; < app->max_workers;
@@ -2811,7 +3003,12 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
ra = nxt_router_ra_create(task, ra); ra = nxt_router_ra_create(task, ra);
if (nxt_fast_path(ra != NULL)) { if (nxt_fast_path(ra != NULL)) {
nxt_queue_insert_tail(&app->requests, &ra->link); nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
nxt_router_ra_inc_use(ra);
nxt_debug(task, "ra stream #%uD enqueue to app->requests",
ra->stream);
if (can_start_worker) { if (can_start_worker) {
app->pending_workers++; app->pending_workers++;
@@ -2821,14 +3018,37 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
port = NULL; port = NULL;
} else { } else {
port = nxt_router_app_get_port_unsafe(app, &use_delta); port = nxt_router_app_get_port_unsafe(app);
if (port->app_pending_responses > 1) {
ra = nxt_router_ra_create(task, ra);
if (nxt_fast_path(ra != NULL)) {
nxt_queue_insert_tail(&port->pending_requests,
&ra->link_port_pending);
nxt_router_ra_inc_use(ra);
nxt_debug(task, "ra stream #%uD enqueue to "
"port->pending_requests", ra->stream);
}
}
} }
nxt_thread_mutex_unlock(&app->mutex); nxt_thread_mutex_unlock(&app->mutex);
if (failed_port_use_delta != 0) {
nxt_port_use(task, failed_port, failed_port_use_delta);
}
if (nxt_slow_path(ra == NULL)) { if (nxt_slow_path(ra == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate " nxt_router_gen_error(task, c, 500, "Failed to allocate "
"req<->app link"); "req<->app link");
if (port != NULL) {
nxt_port_use(task, port, -1);
}
return NXT_ERROR; return NXT_ERROR;
} }
@@ -2837,14 +3057,9 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
ra->app_port = port; ra->app_port = port;
if (use_delta != 0) {
nxt_port_use(task, port, use_delta);
}
return NXT_OK; return NXT_OK;
} }
nxt_debug(task, "ra stream #%uD allocated", ra->stream);
if (!can_start_worker) { if (!can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers", nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app); &app->name, app);
@@ -3127,11 +3342,22 @@ static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap) nxt_app_parse_ctx_t *ap)
{ {
nxt_int_t res; nxt_int_t res;
nxt_port_t *port; nxt_app_t *app;
nxt_event_engine_t *engine; nxt_port_t *port;
nxt_req_app_link_t ra_local, *ra; nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc; nxt_req_app_link_t ra_local, *ra;
nxt_req_conn_link_t *rc;
nxt_socket_conf_joint_t *joint;
joint = c->joint;
app = joint->socket_conf->application;
if (app == NULL) {
nxt_router_gen_error(task, c, 500,
"Application is NULL in socket_conf");
return;
}
engine = task->thread->engine; engine = task->thread->engine;
@@ -3149,6 +3375,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
rc->stream = nxt_port_rpc_ex_stream(rc); rc->stream = nxt_port_rpc_ex_stream(rc);
rc->conn = c; rc->conn = c;
rc->app = app;
nxt_router_app_use(task, app, 1);
nxt_timer_disable(engine, &c->read_timer);
nxt_queue_insert_tail(&c->requests, &rc->link); nxt_queue_insert_tail(&c->requests, &rc->link);
@@ -3167,16 +3398,14 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
return; return;
} }
ra = rc->ra;
port = ra->app_port; port = ra->app_port;
if (nxt_slow_path(port == NULL)) { nxt_assert(port != NULL);
nxt_router_gen_error(task, c, 500, "Application port not found");
return;
}
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
nxt_router_process_http_request_mp(task, ra); nxt_router_app_prepare_request(task, ra);
} }
@@ -3187,7 +3416,7 @@ nxt_router_dummy_buf_completion(nxt_task_t *task, void *obj, void *data)
static void static void
nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra) nxt_router_app_prepare_request(nxt_task_t *task, nxt_req_app_link_t *ra)
{ {
uint32_t request_failed; uint32_t request_failed;
nxt_buf_t *b; nxt_buf_t *b;
@@ -3266,7 +3495,7 @@ release_port:
nxt_router_app_port_release(task, port, request_failed, 0); nxt_router_app_port_release(task, port, request_failed, 0);
nxt_router_ra_release(task, ra, ra->work.data); nxt_router_ra_update_peer(task, ra);
} }