Responding with error in case of first process start failure.
After shared application port introducing, request queue in router was removed and requests may stuck forever waiting for another process start.
This commit is contained in:
142
src/nxt_router.c
142
src/nxt_router.c
@@ -204,6 +204,8 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
|
||||
nxt_apr_action_t action);
|
||||
static void nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
|
||||
nxt_request_rpc_data_t *req_rpc_data);
|
||||
static void nxt_router_http_request_error(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
static void nxt_router_http_request_done(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
|
||||
@@ -539,6 +541,8 @@ nxt_inline void
|
||||
nxt_request_rpc_data_unlink(nxt_task_t *task,
|
||||
nxt_request_rpc_data_t *req_rpc_data)
|
||||
{
|
||||
nxt_app_t *app;
|
||||
nxt_bool_t unlinked;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
nxt_router_msg_cancel(task, req_rpc_data);
|
||||
@@ -550,12 +554,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
|
||||
req_rpc_data->app_port = NULL;
|
||||
}
|
||||
|
||||
if (req_rpc_data->app != NULL) {
|
||||
nxt_router_app_use(task, req_rpc_data->app, -1);
|
||||
|
||||
req_rpc_data->app = NULL;
|
||||
}
|
||||
|
||||
app = req_rpc_data->app;
|
||||
r = req_rpc_data->request;
|
||||
|
||||
if (r != NULL) {
|
||||
@@ -565,6 +564,31 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
|
||||
|
||||
r->req_rpc_data = NULL;
|
||||
req_rpc_data->request = NULL;
|
||||
|
||||
if (app != NULL) {
|
||||
unlinked = 0;
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
if (r->app_link.next != NULL) {
|
||||
nxt_queue_remove(&r->app_link);
|
||||
r->app_link.next = NULL;
|
||||
|
||||
unlinked = 1;
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
if (unlinked) {
|
||||
nxt_mp_release(r->mem_pool);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (app != NULL) {
|
||||
nxt_router_app_use(task, app, -1);
|
||||
|
||||
req_rpc_data->app = NULL;
|
||||
}
|
||||
|
||||
if (req_rpc_data->msg_info.body_fd != -1) {
|
||||
@@ -1492,6 +1516,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
nxt_queue_init(&app->ports);
|
||||
nxt_queue_init(&app->spare_ports);
|
||||
nxt_queue_init(&app->idle_ports);
|
||||
nxt_queue_init(&app->ack_waiting_req);
|
||||
|
||||
app->name.length = name.length;
|
||||
nxt_memcpy(app->name.start, name.start, name.length);
|
||||
@@ -3784,7 +3809,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
|
||||
{
|
||||
int res;
|
||||
nxt_app_t *app;
|
||||
nxt_bool_t start_process;
|
||||
nxt_bool_t start_process, unlinked;
|
||||
nxt_port_t *app_port, *main_app_port, *idle_port;
|
||||
nxt_queue_link_t *idle_lnk;
|
||||
nxt_http_request_t *r;
|
||||
@@ -3797,19 +3822,31 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
|
||||
msg->port_msg.pid);
|
||||
|
||||
app = req_rpc_data->app;
|
||||
r = req_rpc_data->request;
|
||||
|
||||
start_process = 0;
|
||||
unlinked = 0;
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
if (r->app_link.next != NULL) {
|
||||
nxt_queue_remove(&r->app_link);
|
||||
r->app_link.next = NULL;
|
||||
|
||||
unlinked = 1;
|
||||
}
|
||||
|
||||
app_port = nxt_port_hash_find(&app->port_hash, msg->port_msg.pid,
|
||||
msg->port_msg.reply_port);
|
||||
if (nxt_slow_path(app_port == NULL)) {
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
r = req_rpc_data->request;
|
||||
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
|
||||
|
||||
if (unlinked) {
|
||||
nxt_mp_release(r->mem_pool);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3857,6 +3894,10 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
if (unlinked) {
|
||||
nxt_mp_release(r->mem_pool);
|
||||
}
|
||||
|
||||
if (start_process) {
|
||||
nxt_router_start_app_process(task, app);
|
||||
}
|
||||
@@ -3877,15 +3918,11 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
|
||||
task->thread->engine->port->id, NULL);
|
||||
|
||||
if (nxt_slow_path(res != NXT_OK)) {
|
||||
r = req_rpc_data->request;
|
||||
|
||||
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
if (app->timeout != 0) {
|
||||
r = req_rpc_data->request;
|
||||
|
||||
r->timer.handler = nxt_router_app_timeout;
|
||||
r->timer_data = req_rpc_data;
|
||||
nxt_timer_add(task->thread->engine, &r->timer, app->timeout);
|
||||
@@ -4028,8 +4065,10 @@ static void
|
||||
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
void *data)
|
||||
{
|
||||
nxt_app_t *app;
|
||||
nxt_app_joint_t *app_joint;
|
||||
nxt_app_t *app;
|
||||
nxt_app_joint_t *app_joint;
|
||||
nxt_queue_link_t *link;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
app_joint = data;
|
||||
|
||||
@@ -4047,15 +4086,43 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
|
||||
nxt_debug(task, "app '%V' %p start error", &app->name, app);
|
||||
|
||||
link = NULL;
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
nxt_assert(app->pending_processes != 0);
|
||||
|
||||
app->pending_processes--;
|
||||
|
||||
if (app->processes == 0 && !nxt_queue_is_empty(&app->ack_waiting_req)) {
|
||||
link = nxt_queue_first(&app->ack_waiting_req);
|
||||
|
||||
nxt_queue_remove(link);
|
||||
link->next = NULL;
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
/* TODO req_app_link to cancel first pending message */
|
||||
while (link != NULL) {
|
||||
r = nxt_container_of(link, nxt_http_request_t, app_link);
|
||||
|
||||
nxt_event_engine_post(r->engine, &r->err_work);
|
||||
|
||||
link = NULL;
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
if (app->processes == 0 && app->pending_processes == 0
|
||||
&& !nxt_queue_is_empty(&app->ack_waiting_req))
|
||||
{
|
||||
link = nxt_queue_first(&app->ack_waiting_req);
|
||||
|
||||
nxt_queue_remove(link);
|
||||
link->next = NULL;
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4541,8 +4608,9 @@ static void
|
||||
nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
|
||||
nxt_request_rpc_data_t *req_rpc_data)
|
||||
{
|
||||
nxt_bool_t start_process;
|
||||
nxt_port_t *port;
|
||||
nxt_bool_t start_process;
|
||||
nxt_port_t *port;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
start_process = 0;
|
||||
|
||||
@@ -4558,8 +4626,22 @@ nxt_router_app_port_get(nxt_task_t *task, nxt_app_t *app,
|
||||
start_process = 1;
|
||||
}
|
||||
|
||||
r = req_rpc_data->request;
|
||||
|
||||
/*
|
||||
* Put request into application-wide list to be able to cancel request
|
||||
* if something goes wrong with application processes.
|
||||
*/
|
||||
nxt_queue_insert_tail(&app->ack_waiting_req, &r->app_link);
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
/*
|
||||
* Retain request memory pool while request is linked in ack_waiting_req
|
||||
* to guarantee request structure memory is accessble.
|
||||
*/
|
||||
nxt_mp_retain(r->mem_pool);
|
||||
|
||||
req_rpc_data->app_port = port;
|
||||
req_rpc_data->apr_action = NXT_APR_REQUEST_FAILED;
|
||||
|
||||
@@ -4602,6 +4684,11 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
|
||||
r->timer.log = engine->task.log;
|
||||
r->timer.bias = NXT_TIMER_DEFAULT_BIAS;
|
||||
|
||||
r->engine = engine;
|
||||
r->err_work.handler = nxt_router_http_request_error;
|
||||
r->err_work.task = task;
|
||||
r->err_work.obj = r;
|
||||
|
||||
req_rpc_data->stream = nxt_port_rpc_ex_stream(req_rpc_data);
|
||||
req_rpc_data->app = app;
|
||||
req_rpc_data->msg_info.body_fd = -1;
|
||||
@@ -4621,6 +4708,25 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_http_request_error(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_http_request_t *r;
|
||||
|
||||
r = obj;
|
||||
|
||||
nxt_debug(task, "router http request error (rpc_data %p)", r->req_rpc_data);
|
||||
|
||||
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
|
||||
|
||||
if (r->req_rpc_data != NULL) {
|
||||
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
|
||||
}
|
||||
|
||||
nxt_mp_release(r->mem_pool);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
@@ -4630,7 +4736,7 @@ nxt_router_http_request_done(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
nxt_debug(task, "router http request done (rpc_data %p)", r->req_rpc_data);
|
||||
|
||||
if (r->req_rpc_data) {
|
||||
if (r->req_rpc_data != NULL) {
|
||||
nxt_request_rpc_data_unlink(task, r->req_rpc_data);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user