Changing worker selection precedence.
This patch increase precedence of non-started worker over busy worker. 1. idle worker; 2. start new worker; 3. busy worker, but can accept request in advance;
This commit is contained in:
@@ -166,7 +166,7 @@ struct nxt_port_s {
|
|||||||
/* Maximum interleave of message parts. */
|
/* Maximum interleave of message parts. */
|
||||||
uint32_t max_share;
|
uint32_t max_share;
|
||||||
|
|
||||||
uint32_t app_requests;
|
uint32_t app_pending_responses;
|
||||||
uint32_t app_responses;
|
uint32_t app_responses;
|
||||||
|
|
||||||
nxt_port_handler_t handler;
|
nxt_port_handler_t handler;
|
||||||
|
|||||||
@@ -2519,6 +2519,19 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
nxt_inline nxt_bool_t
|
||||||
|
nxt_router_app_first_port_busy(nxt_app_t *app)
|
||||||
|
{
|
||||||
|
nxt_port_t *port;
|
||||||
|
nxt_queue_link_t *lnk;
|
||||||
|
|
||||||
|
lnk = nxt_queue_first(&app->ports);
|
||||||
|
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
||||||
|
|
||||||
|
return port->app_pending_responses > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_inline nxt_port_t *
|
nxt_inline nxt_port_t *
|
||||||
nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
|
nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
|
||||||
{
|
{
|
||||||
@@ -2530,12 +2543,10 @@ nxt_router_app_get_port_unsafe(nxt_app_t *app, int *use_delta)
|
|||||||
|
|
||||||
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
||||||
|
|
||||||
port->app_requests++;
|
port->app_pending_responses++;
|
||||||
|
|
||||||
if (app->live &&
|
if (app->max_pending_responses == 0
|
||||||
(app->max_pending_responses == 0 ||
|
|| port->app_pending_responses < app->max_pending_responses)
|
||||||
(port->app_requests - port->app_responses) <
|
|
||||||
app->max_pending_responses) )
|
|
||||||
{
|
{
|
||||||
nxt_queue_insert_tail(&app->ports, lnk);
|
nxt_queue_insert_tail(&app->ports, lnk);
|
||||||
|
|
||||||
@@ -2560,7 +2571,7 @@ nxt_router_app_get_idle_port(nxt_app_t *app)
|
|||||||
|
|
||||||
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
|
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
|
||||||
|
|
||||||
if (port->app_requests > port->app_responses) {
|
if (port->app_pending_responses > 0) {
|
||||||
port = NULL;
|
port = NULL;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
@@ -2618,18 +2629,32 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
nxt_thread_mutex_lock(&app->mutex);
|
nxt_thread_mutex_lock(&app->mutex);
|
||||||
|
|
||||||
port->app_requests -= request_failed;
|
port->app_pending_responses -= request_failed + got_response;
|
||||||
port->app_responses += got_response;
|
port->app_responses += got_response;
|
||||||
|
|
||||||
if (app->live != 0 &&
|
if (app->live != 0 &&
|
||||||
port->pair[1] != -1 &&
|
port->pair[1] != -1 &&
|
||||||
port->app_link.next == NULL &&
|
(app->max_pending_responses == 0
|
||||||
(app->max_pending_responses == 0 ||
|
|| port->app_pending_responses < app->max_pending_responses))
|
||||||
(port->app_requests - port->app_responses) <
|
|
||||||
app->max_pending_responses) )
|
|
||||||
{
|
{
|
||||||
|
if (port->app_link.next == NULL) {
|
||||||
|
if (port->app_pending_responses > 0) {
|
||||||
nxt_queue_insert_tail(&app->ports, &port->app_link);
|
nxt_queue_insert_tail(&app->ports, &port->app_link);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_queue_insert_head(&app->ports, &port->app_link);
|
||||||
|
}
|
||||||
|
|
||||||
use_delta++;
|
use_delta++;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
if (port->app_pending_responses == 0
|
||||||
|
&& nxt_queue_first(&app->ports) != &port->app_link)
|
||||||
|
{
|
||||||
|
nxt_queue_remove(&port->app_link);
|
||||||
|
nxt_queue_insert_head(&app->ports, &port->app_link);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (app->live != 0 &&
|
if (app->live != 0 &&
|
||||||
@@ -2650,7 +2675,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
|
|||||||
ra_use_delta = 0;
|
ra_use_delta = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
send_quit = app->live == 0 && port->app_requests == port->app_responses;
|
send_quit = app->live == 0 && port->app_pending_responses > 0;
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&app->mutex);
|
nxt_thread_mutex_unlock(&app->mutex);
|
||||||
|
|
||||||
@@ -2750,7 +2775,6 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
|
|||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
nxt_socket_conf_joint_t *joint;
|
nxt_socket_conf_joint_t *joint;
|
||||||
|
|
||||||
port = NULL;
|
|
||||||
use_delta = 1;
|
use_delta = 1;
|
||||||
c = ra->rc->conn;
|
c = ra->rc->conn;
|
||||||
|
|
||||||
@@ -2776,27 +2800,28 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
|
|||||||
nxt_timer_add(engine, &c->read_timer, app->timeout);
|
nxt_timer_add(engine, &c->read_timer, app->timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
can_start_worker = 0;
|
|
||||||
|
|
||||||
nxt_thread_mutex_lock(&app->mutex);
|
nxt_thread_mutex_lock(&app->mutex);
|
||||||
|
|
||||||
if (!nxt_queue_is_empty(&app->ports)) {
|
can_start_worker = (app->workers + app->pending_workers)
|
||||||
port = nxt_router_app_get_port_unsafe(app, &use_delta);
|
< app->max_workers;
|
||||||
|
|
||||||
} else {
|
if (nxt_queue_is_empty(&app->ports)
|
||||||
|
|| (can_start_worker && nxt_router_app_first_port_busy(app)) )
|
||||||
|
{
|
||||||
ra = nxt_router_ra_create(task, ra);
|
ra = nxt_router_ra_create(task, ra);
|
||||||
|
|
||||||
if (nxt_fast_path(ra != NULL)) {
|
if (nxt_fast_path(ra != NULL)) {
|
||||||
nxt_queue_insert_tail(&app->requests, &ra->link);
|
nxt_queue_insert_tail(&app->requests, &ra->link);
|
||||||
|
|
||||||
can_start_worker = (app->workers + app->pending_workers) <
|
|
||||||
app->max_workers;
|
|
||||||
if (can_start_worker) {
|
if (can_start_worker) {
|
||||||
app->pending_workers++;
|
app->pending_workers++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
port = NULL;
|
port = NULL;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
port = nxt_router_app_get_port_unsafe(app, &use_delta);
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&app->mutex);
|
nxt_thread_mutex_unlock(&app->mutex);
|
||||||
|
|||||||
Reference in New Issue
Block a user