Application restart introduced.
When processing a restart request, the router sends a QUIT message to all existing processes of the application. Then, a new shared application port is created to ensure that new requests won't be handled by the old processes of the application.
This commit is contained in:
219
src/nxt_router.c
219
src/nxt_router.c
@@ -18,6 +18,8 @@
|
||||
#include <nxt_app_queue.h>
|
||||
#include <nxt_port_queue.h>
|
||||
|
||||
#define NXT_SHARED_PORT_ID 0xFFFFu
|
||||
|
||||
typedef struct {
|
||||
nxt_str_t type;
|
||||
uint32_t processes;
|
||||
@@ -67,6 +69,12 @@ typedef struct {
|
||||
} nxt_app_rpc_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_app_joint_t *app_joint;
|
||||
uint32_t generation;
|
||||
} nxt_app_joint_rpc_t;
|
||||
|
||||
|
||||
static nxt_int_t nxt_router_prefork(nxt_task_t *task, nxt_process_t *process,
|
||||
nxt_mp_t *mp);
|
||||
static nxt_int_t nxt_router_start(nxt_task_t *task, nxt_process_data_t *data);
|
||||
@@ -79,6 +87,8 @@ static void nxt_router_new_port_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
static void nxt_router_conf_data_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
static void nxt_router_app_restart_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
static void nxt_router_remove_pid_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
static void nxt_router_access_log_reopen_handler(nxt_task_t *task,
|
||||
@@ -281,6 +291,7 @@ static const nxt_port_handlers_t nxt_router_process_port_handlers = {
|
||||
.mmap = nxt_port_mmap_handler,
|
||||
.get_mmap = nxt_router_get_mmap_handler,
|
||||
.data = nxt_router_conf_data_handler,
|
||||
.app_restart = nxt_router_app_restart_handler,
|
||||
.remove_pid = nxt_router_remove_pid_handler,
|
||||
.access_log = nxt_router_access_log_reopen_handler,
|
||||
.rpc_ready = nxt_port_rpc_handler,
|
||||
@@ -379,14 +390,15 @@ static void
|
||||
nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
|
||||
void *data)
|
||||
{
|
||||
size_t size;
|
||||
uint32_t stream;
|
||||
nxt_mp_t *mp;
|
||||
nxt_int_t ret;
|
||||
nxt_app_t *app;
|
||||
nxt_buf_t *b;
|
||||
nxt_port_t *main_port;
|
||||
nxt_runtime_t *rt;
|
||||
size_t size;
|
||||
uint32_t stream;
|
||||
nxt_mp_t *mp;
|
||||
nxt_int_t ret;
|
||||
nxt_app_t *app;
|
||||
nxt_buf_t *b;
|
||||
nxt_port_t *main_port;
|
||||
nxt_runtime_t *rt;
|
||||
nxt_app_joint_rpc_t *app_joint_rpc;
|
||||
|
||||
app = data;
|
||||
|
||||
@@ -407,30 +419,29 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
|
||||
*b->mem.free++ = '\0';
|
||||
nxt_buf_cpystr(b, &app->conf);
|
||||
|
||||
nxt_router_app_joint_use(task, app->joint, 1);
|
||||
|
||||
stream = nxt_port_rpc_register_handler(task, port,
|
||||
nxt_router_app_port_ready,
|
||||
nxt_router_app_port_error,
|
||||
-1, app->joint);
|
||||
|
||||
if (nxt_slow_path(stream == 0)) {
|
||||
nxt_router_app_joint_use(task, app->joint, -1);
|
||||
|
||||
app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
|
||||
nxt_router_app_port_ready,
|
||||
nxt_router_app_port_error,
|
||||
sizeof(nxt_app_joint_rpc_t));
|
||||
if (nxt_slow_path(app_joint_rpc == NULL)) {
|
||||
goto failed;
|
||||
}
|
||||
|
||||
stream = nxt_port_rpc_ex_stream(app_joint_rpc);
|
||||
|
||||
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_START_PROCESS,
|
||||
-1, stream, port->id, b);
|
||||
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
nxt_port_rpc_cancel(task, port, stream);
|
||||
|
||||
nxt_router_app_joint_use(task, app->joint, -1);
|
||||
|
||||
goto failed;
|
||||
}
|
||||
|
||||
app_joint_rpc->app_joint = app->joint;
|
||||
app_joint_rpc->generation = app->generation;
|
||||
|
||||
nxt_router_app_joint_use(task, app->joint, 1);
|
||||
|
||||
nxt_router_app_use(task, app, -1);
|
||||
|
||||
return;
|
||||
@@ -504,6 +515,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
|
||||
{
|
||||
nxt_buf_t *b, *next;
|
||||
nxt_bool_t cancelled;
|
||||
nxt_port_t *app_port;
|
||||
nxt_msg_info_t *msg_info;
|
||||
|
||||
msg_info = &req_rpc_data->msg_info;
|
||||
@@ -512,13 +524,20 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_request_rpc_data_t *req_rpc_data)
|
||||
return 0;
|
||||
}
|
||||
|
||||
cancelled = nxt_app_queue_cancel(req_rpc_data->app->shared_port->queue,
|
||||
msg_info->tracking_cookie,
|
||||
req_rpc_data->stream);
|
||||
app_port = req_rpc_data->app_port;
|
||||
|
||||
if (cancelled) {
|
||||
nxt_debug(task, "stream #%uD: cancelled by router",
|
||||
req_rpc_data->stream);
|
||||
if (app_port != NULL && app_port->id == NXT_SHARED_PORT_ID) {
|
||||
cancelled = nxt_app_queue_cancel(app_port->queue,
|
||||
msg_info->tracking_cookie,
|
||||
req_rpc_data->stream);
|
||||
|
||||
if (cancelled) {
|
||||
nxt_debug(task, "stream #%uD: cancelled by router",
|
||||
req_rpc_data->stream);
|
||||
}
|
||||
|
||||
} else {
|
||||
cancelled = 0;
|
||||
}
|
||||
|
||||
for (b = msg_info->buf; b != NULL; b = next) {
|
||||
@@ -793,6 +812,90 @@ cleanup:
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_app_restart_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_app_t *app;
|
||||
nxt_int_t ret;
|
||||
nxt_str_t app_name;
|
||||
nxt_port_t *port, *reply_port, *shared_port, *old_shared_port;
|
||||
nxt_port_msg_type_t reply;
|
||||
|
||||
reply_port = nxt_runtime_port_find(task->thread->runtime,
|
||||
msg->port_msg.pid,
|
||||
msg->port_msg.reply_port);
|
||||
if (nxt_slow_path(reply_port == NULL)) {
|
||||
nxt_alert(task, "app_restart_handler: reply port not found");
|
||||
return;
|
||||
}
|
||||
|
||||
app_name.length = nxt_buf_mem_used_size(&msg->buf->mem);
|
||||
app_name.start = msg->buf->mem.pos;
|
||||
|
||||
nxt_debug(task, "app_restart_handler: %V", &app_name);
|
||||
|
||||
app = nxt_router_app_find(&nxt_router->apps, &app_name);
|
||||
|
||||
if (nxt_fast_path(app != NULL)) {
|
||||
shared_port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
|
||||
NXT_PROCESS_APP);
|
||||
if (nxt_slow_path(shared_port == NULL)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
ret = nxt_port_socket_init(task, shared_port, 0);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
nxt_port_use(task, shared_port, -1);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
ret = nxt_router_app_queue_init(task, shared_port);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
nxt_port_write_close(shared_port);
|
||||
nxt_port_read_close(shared_port);
|
||||
nxt_port_use(task, shared_port, -1);
|
||||
goto fail;
|
||||
}
|
||||
|
||||
nxt_port_write_enable(task, shared_port);
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
nxt_queue_each(port, &app->ports, nxt_port_t, app_link) {
|
||||
|
||||
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1,
|
||||
0, 0, NULL);
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
app->generation++;
|
||||
|
||||
shared_port->app = app;
|
||||
|
||||
old_shared_port = app->shared_port;
|
||||
old_shared_port->app = NULL;
|
||||
|
||||
app->shared_port = shared_port;
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
nxt_port_close(task, old_shared_port);
|
||||
nxt_port_use(task, old_shared_port, -1);
|
||||
|
||||
reply = NXT_PORT_MSG_RPC_READY_LAST;
|
||||
|
||||
} else {
|
||||
|
||||
fail:
|
||||
|
||||
reply = NXT_PORT_MSG_RPC_ERROR;
|
||||
}
|
||||
|
||||
nxt_port_socket_write(task, reply_port, reply, -1, msg->port_msg.stream,
|
||||
0, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_app_process_remove_pid(nxt_task_t *task, nxt_port_t *port,
|
||||
void *data)
|
||||
@@ -1607,7 +1710,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
app_joint->free_app_work.task = &engine->task;
|
||||
app_joint->free_app_work.obj = app_joint;
|
||||
|
||||
port = nxt_port_new(task, (nxt_port_id_t) -1, nxt_pid,
|
||||
port = nxt_port_new(task, NXT_SHARED_PORT_ID, nxt_pid,
|
||||
NXT_PROCESS_APP);
|
||||
if (nxt_slow_path(port == NULL)) {
|
||||
return NXT_ERROR;
|
||||
@@ -4233,11 +4336,16 @@ static void
|
||||
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
void *data)
|
||||
{
|
||||
nxt_app_t *app;
|
||||
nxt_port_t *port;
|
||||
nxt_app_joint_t *app_joint;
|
||||
nxt_app_t *app;
|
||||
nxt_bool_t start_process;
|
||||
nxt_port_t *port;
|
||||
nxt_app_joint_t *app_joint;
|
||||
nxt_app_joint_rpc_t *app_joint_rpc;
|
||||
|
||||
app_joint = data;
|
||||
nxt_assert(data != NULL);
|
||||
|
||||
app_joint_rpc = data;
|
||||
app_joint = app_joint_rpc->app_joint;
|
||||
port = msg->u.new_port;
|
||||
|
||||
nxt_assert(app_joint != NULL);
|
||||
@@ -4257,14 +4365,37 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
return;
|
||||
}
|
||||
|
||||
port->app = app;
|
||||
port->main_app_port = port;
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
nxt_assert(app->pending_processes != 0);
|
||||
|
||||
app->pending_processes--;
|
||||
|
||||
if (nxt_slow_path(app->generation != app_joint_rpc->generation)) {
|
||||
nxt_debug(task, "new port ready for restarted app, send QUIT");
|
||||
|
||||
start_process = !task->thread->engine->shutdown
|
||||
&& nxt_router_app_can_start(app)
|
||||
&& nxt_router_app_need_start(app);
|
||||
|
||||
if (start_process) {
|
||||
app->pending_processes++;
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&app->mutex);
|
||||
|
||||
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
|
||||
|
||||
if (start_process) {
|
||||
nxt_router_start_app_process(task, app);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
port->app = app;
|
||||
port->main_app_port = port;
|
||||
|
||||
app->processes++;
|
||||
nxt_port_hash_add(&app->port_hash, port);
|
||||
app->port_hash_count++;
|
||||
@@ -4318,12 +4449,16 @@ static void
|
||||
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
void *data)
|
||||
{
|
||||
nxt_app_t *app;
|
||||
nxt_app_joint_t *app_joint;
|
||||
nxt_queue_link_t *link;
|
||||
nxt_http_request_t *r;
|
||||
nxt_app_t *app;
|
||||
nxt_app_joint_t *app_joint;
|
||||
nxt_queue_link_t *link;
|
||||
nxt_http_request_t *r;
|
||||
nxt_app_joint_rpc_t *app_joint_rpc;
|
||||
|
||||
app_joint = data;
|
||||
nxt_assert(data != NULL);
|
||||
|
||||
app_joint_rpc = data;
|
||||
app_joint = app_joint_rpc->app_joint;
|
||||
|
||||
nxt_assert(app_joint != NULL);
|
||||
|
||||
@@ -4490,7 +4625,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
|
||||
port->pid, port->id,
|
||||
(int) inc_use, (int) got_response);
|
||||
|
||||
if (port == app->shared_port) {
|
||||
if (port->id == NXT_SHARED_PORT_ID) {
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
app->active_requests -= got_response + dec_requests;
|
||||
@@ -4860,6 +4995,8 @@ nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
|
||||
app->shared_port->app = NULL;
|
||||
nxt_port_close(task, app->shared_port);
|
||||
nxt_port_use(task, app->shared_port, -1);
|
||||
|
||||
app->shared_port = NULL;
|
||||
}
|
||||
|
||||
nxt_thread_mutex_destroy(&app->mutex);
|
||||
|
||||
Reference in New Issue
Block a user