Rescheduling of pending request after configured timeout.

New optional configuration parameter introduced: limits.reschedule_timeout.
Default value 1 second.  In the case when request is written to the port
socket 'in advance', it is called 'pending'.

On every completed request, the head of pending request is checked against
reschedule timeout.  If this request waiting for execution longer than
timeout, it is cancelled, new port selected for this request.
This commit is contained in:
Max Romanov
2017-12-27 17:48:04 +03:00
parent baa8c9387b
commit 5196cf4d50
4 changed files with 226 additions and 92 deletions

View File

@@ -78,6 +78,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = {
NULL,
NULL },
{ nxt_string("reschedule_timeout"),
NXT_CONF_INTEGER,
NULL,
NULL },
{ nxt_string("requests"),
NXT_CONF_INTEGER,
NULL,

View File

@@ -314,10 +314,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
reg = lhq.value;
if (reg->peer != -1) {
nxt_assert(reg->peer == msg->port_msg.pid);
}
if (type == _NXT_PORT_MSG_RPC_ERROR) {
reg->error_handler(task, msg, reg->data);

View File

@@ -13,6 +13,7 @@ typedef struct {
nxt_str_t type;
uint32_t workers;
nxt_msec_t timeout;
nxt_msec_t res_timeout;
uint32_t requests;
nxt_conf_value_t *limits_value;
} nxt_router_app_conf_t;
@@ -55,8 +56,11 @@ struct nxt_req_app_link_s {
nxt_msg_info_t msg_info;
nxt_req_conn_link_t *rc;
nxt_nsec_t res_time;
nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
nxt_queue_link_t link_port_pending; /* for nxt_port_t.pending_requests */
nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
nxt_mp_t *mem_pool;
nxt_work_t work;
@@ -72,6 +76,26 @@ typedef struct {
} nxt_socket_rpc_t;
struct nxt_port_select_state_s {
nxt_app_t *app;
nxt_req_app_link_t *ra;
nxt_port_t *failed_port;
int failed_port_use_delta;
nxt_bool_t can_start_worker;
nxt_req_app_link_t *shared_ra;
nxt_port_t *port;
};
typedef struct nxt_port_select_state_s nxt_port_select_state_t;
static void nxt_router_port_select(nxt_task_t *task,
nxt_port_select_state_t *state);
static nxt_int_t nxt_router_port_post_select(nxt_task_t *task,
nxt_port_select_state_t *state);
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
nxt_inline void
@@ -172,7 +196,8 @@ static void nxt_router_app_port_error(nxt_task_t *task,
static nxt_port_t * nxt_router_app_get_idle_port(nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
nxt_req_app_link_t *ra);
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
@@ -575,6 +600,21 @@ nxt_router_ra_error(nxt_req_app_link_t *ra, int code, const char* str)
}
nxt_inline void
nxt_router_ra_pending(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
{
nxt_queue_insert_tail(&ra->app_port->pending_requests,
&ra->link_port_pending);
nxt_queue_insert_tail(&app->pending, &ra->link_app_pending);
nxt_router_ra_inc_use(ra);
ra->res_time = nxt_thread_monotonic_time(task->thread) + app->res_timeout;
nxt_debug(task, "ra stream #%uD enqueue to pending_requests", ra->stream);
}
nxt_inline nxt_bool_t
nxt_queue_chk_remove(nxt_queue_link_t *lnk)
{
@@ -615,13 +655,15 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
nxt_thread_mutex_lock(&rc->app->mutex);
if (ra->link_app_requests.next == NULL
&& ra->link_port_pending.next == NULL)
&& ra->link_port_pending.next == NULL
&& ra->link_app_pending.next == NULL)
{
ra = NULL;
} else {
ra_use_delta -= nxt_queue_chk_remove(&ra->link_app_requests);
ra_use_delta -= nxt_queue_chk_remove(&ra->link_port_pending);
nxt_queue_chk_remove(&ra->link_app_pending);
}
nxt_thread_mutex_unlock(&rc->app->mutex);
@@ -973,6 +1015,12 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
offsetof(nxt_router_app_conf_t, timeout),
},
{
nxt_string("reschedule_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_router_app_conf_t, res_timeout),
},
{
nxt_string("requests"),
NXT_CONF_MAP_INT32,
@@ -1127,6 +1175,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.workers = 1;
apcf.timeout = 0;
apcf.res_timeout = 1000;
apcf.requests = 0;
apcf.limits_value = NULL;
@@ -1156,7 +1205,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers);
nxt_debug(task, "application timeout: %D", apcf.timeout);
nxt_debug(task, "application request timeout: %D", apcf.timeout);
nxt_debug(task, "application reschedule timeout: %D", apcf.res_timeout);
nxt_debug(task, "application requests: %D", apcf.requests);
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
@@ -1176,6 +1226,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_init(&app->ports);
nxt_queue_init(&app->requests);
nxt_queue_init(&app->pending);
app->name.length = name.length;
nxt_memcpy(app->name.start, name.start, name.length);
@@ -1183,6 +1234,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->type = lang->type;
app->max_workers = apcf.workers;
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->live = 1;
app->max_pending_responses = 2;
app->prepare_msg = nxt_app_prepare_msg[lang->type];
@@ -2481,7 +2533,7 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (cancelled) {
nxt_router_ra_inc_use(ra);
res = nxt_router_app_port(task, ra);
res = nxt_router_app_port(task, rc->app, ra);
if (res == NXT_OK) {
port = ra->app_port;
@@ -2702,7 +2754,7 @@ nxt_router_app_first_port_busy(nxt_app_t *app)
nxt_inline nxt_port_t *
nxt_router_app_get_port_unsafe(nxt_app_t *app)
nxt_router_pop_first_port(nxt_app_t *app)
{
nxt_port_t *port;
nxt_queue_link_t *lnk;
@@ -2783,14 +2835,17 @@ static void
nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response)
{
nxt_app_t *app;
nxt_bool_t send_quit;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra, *next_ra;
nxt_app_t *app;
nxt_bool_t send_quit, cancelled;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra, *pending_ra, *re_ra;
nxt_port_select_state_t state;
nxt_assert(port != NULL);
nxt_assert(port->app != NULL);
ra = NULL;
app = port->app;
nxt_thread_mutex_lock(&app->mutex);
@@ -2798,8 +2853,11 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->app_pending_responses -= request_failed + got_response;
port->app_responses += got_response;
if (app->live != 0
&& port->pair[1] != -1
if (nxt_slow_path(app->live == 0)) {
goto app_dead;
}
if (port->pair[1] != -1
&& (app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses))
{
@@ -2823,8 +2881,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
}
}
if (app->live != 0
&& !nxt_queue_is_empty(&app->ports)
if (!nxt_queue_is_empty(&app->ports)
&& !nxt_queue_is_empty(&app->requests))
{
lnk = nxt_queue_first(&app->requests);
@@ -2833,19 +2890,16 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_requests);
ra->app_port = nxt_router_app_get_port_unsafe(app);
ra->app_port = nxt_router_pop_first_port(app);
if (ra->app_port->app_pending_responses > 1) {
nxt_queue_insert_tail(&ra->app_port->pending_requests,
&ra->link_port_pending);
nxt_router_ra_inc_use(ra);
nxt_router_ra_pending(task, app, ra);
}
} else {
ra = NULL;
}
app_dead:
/* Pop first pending request for this port. */
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
{
@@ -2853,19 +2907,63 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
nxt_queue_remove(lnk);
lnk->next = NULL;
next_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
link_port_pending);
pending_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t,
link_port_pending);
nxt_assert(pending_ra->link_app_pending.next != NULL);
nxt_queue_remove(&pending_ra->link_app_pending);
pending_ra->link_app_pending.next = NULL;
} else {
next_ra = NULL;
pending_ra = NULL;
}
/* Try to cancel and re-schedule first stalled request for this app. */
if (got_response > 0 && !nxt_queue_is_empty(&app->pending)) {
lnk = nxt_queue_first(&app->pending);
re_ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link_app_pending);
if (re_ra->res_time <= nxt_thread_monotonic_time(task->thread)) {
nxt_debug(task, "app '%V' stalled request #%uD detected",
&app->name, re_ra->stream);
cancelled = nxt_router_msg_cancel(task, &re_ra->msg_info,
re_ra->stream);
if (cancelled) {
nxt_router_ra_inc_use(re_ra);
state.ra = re_ra;
state.app = app;
nxt_router_port_select(task, &state);
goto re_ra_cancelled;
}
}
}
re_ra = NULL;
re_ra_cancelled:
send_quit = app->live == 0 && port->app_pending_responses > 0;
nxt_thread_mutex_unlock(&app->mutex);
if (next_ra != NULL) {
nxt_router_ra_use(task, next_ra, -1);
if (pending_ra != NULL) {
nxt_router_ra_use(task, pending_ra, -1);
}
if (re_ra != NULL) {
if (nxt_router_port_post_select(task, &state) == NXT_OK) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_app_process_request,
&task->thread->engine->task, app, re_ra);
}
}
if (ra != NULL) {
@@ -2950,22 +3048,16 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
}
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
static void
nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
int failed_port_use_delta;
nxt_int_t res;
nxt_app_t *app;
nxt_bool_t can_start_worker;
nxt_conn_t *c;
nxt_port_t *port, *failed_port;
nxt_app_t *app;
nxt_req_app_link_t *ra;
c = ra->rc->conn;
app = ra->rc->app;
ra = state->ra;
app = state->app;
failed_port_use_delta = 0;
nxt_thread_mutex_lock(&app->mutex);
state->failed_port_use_delta = 0;
if (nxt_queue_chk_remove(&ra->link_app_requests))
{
@@ -2974,93 +3066,113 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
if (nxt_queue_chk_remove(&ra->link_port_pending))
{
nxt_assert(ra->link_app_pending.next != NULL);
nxt_queue_remove(&ra->link_app_pending);
ra->link_app_pending.next = NULL;
nxt_router_ra_dec_use(ra);
}
state->failed_port = ra->app_port;
if (ra->app_port != NULL) {
failed_port = ra->app_port;
failed_port_use_delta--;
state->failed_port_use_delta--;
failed_port->app_pending_responses--;
state->failed_port->app_pending_responses--;
if (failed_port->app_link.next != NULL) {
nxt_queue_remove(&failed_port->app_link);
failed_port->app_link.next = NULL;
failed_port_use_delta--;
if (nxt_queue_chk_remove(&state->failed_port->app_link)) {
state->failed_port_use_delta--;
}
} else {
failed_port = NULL;
ra->app_port = NULL;
}
can_start_worker = (app->workers + app->pending_workers)
< app->max_workers;
state->can_start_worker = (app->workers + app->pending_workers)
< app->max_workers;
state->port = NULL;
if (nxt_queue_is_empty(&app->ports)
|| (can_start_worker && nxt_router_app_first_port_busy(app)) )
|| (state->can_start_worker && nxt_router_app_first_port_busy(app)) )
{
ra = nxt_router_ra_create(task, ra);
if (nxt_fast_path(ra != NULL)) {
nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
nxt_router_ra_inc_use(ra);
nxt_debug(task, "ra stream #%uD enqueue to app->requests",
ra->stream);
if (can_start_worker) {
app->pending_workers++;
}
if (nxt_slow_path(ra == NULL)) {
goto fail;
}
port = NULL;
if (nxt_slow_path(state->failed_port != NULL)) {
nxt_queue_insert_head(&app->requests, &ra->link_app_requests);
} else {
nxt_queue_insert_tail(&app->requests, &ra->link_app_requests);
}
nxt_router_ra_inc_use(ra);
nxt_debug(task, "ra stream #%uD enqueue to app->requests", ra->stream);
if (state->can_start_worker) {
app->pending_workers++;
}
} else {
port = nxt_router_app_get_port_unsafe(app);
state->port = nxt_router_pop_first_port(app);
if (port->app_pending_responses > 1) {
if (state->port->app_pending_responses > 1) {
ra = nxt_router_ra_create(task, ra);
if (nxt_fast_path(ra != NULL)) {
nxt_queue_insert_tail(&port->pending_requests,
&ra->link_port_pending);
nxt_router_ra_inc_use(ra);
nxt_debug(task, "ra stream #%uD enqueue to "
"port->pending_requests", ra->stream);
if (nxt_slow_path(ra == NULL)) {
goto fail;
}
ra->app_port = state->port;
nxt_router_ra_pending(task, app, ra);
}
}
nxt_thread_mutex_unlock(&app->mutex);
fail:
if (failed_port_use_delta != 0) {
nxt_port_use(task, failed_port, failed_port_use_delta);
state->shared_ra = ra;
}
static nxt_int_t
nxt_router_port_post_select(nxt_task_t *task, nxt_port_select_state_t *state)
{
nxt_int_t res;
nxt_app_t *app;
nxt_req_app_link_t *ra;
ra = state->shared_ra;
app = state->app;
if (state->failed_port_use_delta != 0) {
nxt_port_use(task, state->failed_port, state->failed_port_use_delta);
}
if (nxt_slow_path(ra == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"req<->app link");
if (port != NULL) {
nxt_port_use(task, port, -1);
if (state->port != NULL) {
nxt_port_use(task, state->port, -1);
}
nxt_router_ra_error(state->ra, 500,
"Failed to allocate shared req<->app link");
nxt_router_ra_use(task, state->ra, -1);
return NXT_ERROR;
}
if (port != NULL) {
if (state->port != NULL) {
nxt_debug(task, "already have port for app '%V' %p ", &app->name, app);
ra->app_port = port;
ra->app_port = state->port;
return NXT_OK;
}
if (!can_start_worker) {
if (!state->can_start_worker) {
nxt_debug(task, "app '%V' %p too many running or pending workers",
&app->name, app);
@@ -3070,7 +3182,8 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
res = nxt_router_start_worker(task, app);
if (nxt_slow_path(res != NXT_OK)) {
nxt_router_gen_error(task, c, 500, "Failed to start worker");
nxt_router_ra_error(ra, 500, "Failed to start worker");
nxt_router_ra_use(task, ra, -1);
return NXT_ERROR;
}
@@ -3079,6 +3192,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
}
static nxt_int_t
nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
{
nxt_port_select_state_t state;
state.ra = ra;
state.app = app;
nxt_thread_mutex_lock(&app->mutex);
nxt_router_port_select(task, &state);
nxt_thread_mutex_unlock(&app->mutex);
return nxt_router_port_post_select(task, &state);
}
static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
@@ -3392,7 +3523,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
ra = &ra_local;
nxt_router_ra_init(task, ra, rc);
res = nxt_router_app_port(task, ra);
res = nxt_router_app_port(task, app, ra);
if (res != NXT_OK) {
return;

View File

@@ -81,7 +81,8 @@ struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */
nxt_queue_t requests; /* of nxt_req_conn_link_t */
nxt_queue_t requests; /* of nxt_req_app_link_t */
nxt_queue_t pending; /* of nxt_req_app_link_t */
nxt_str_t name;
uint32_t pending_workers;
@@ -90,6 +91,7 @@ struct nxt_app_s {
uint32_t max_pending_responses;
nxt_msec_t timeout;
nxt_nsec_t res_timeout;
nxt_app_type_t type:8;
uint8_t live; /* 1 bit */