Controller: waiting for router before start to accept connections.
Previously, reconfiguration might fail right after the daemon start if the router process wasn't ready yet.
This commit is contained in:
@@ -38,6 +38,9 @@ typedef struct {
|
|||||||
} nxt_controller_response_t;
|
} nxt_controller_response_t;
|
||||||
|
|
||||||
|
|
||||||
|
static void nxt_controller_process_new_port_handler(nxt_task_t *task,
|
||||||
|
nxt_port_recv_msg_t *msg);
|
||||||
|
|
||||||
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
|
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
|
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
|
||||||
static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
|
static nxt_msec_t nxt_controller_conn_timeout_value(nxt_conn_t *c,
|
||||||
@@ -90,17 +93,28 @@ static const nxt_event_conn_state_t nxt_controller_conn_write_state;
|
|||||||
static const nxt_event_conn_state_t nxt_controller_conn_close_state;
|
static const nxt_event_conn_state_t nxt_controller_conn_close_state;
|
||||||
|
|
||||||
|
|
||||||
|
nxt_port_handler_t nxt_controller_process_port_handlers[] = {
|
||||||
|
nxt_worker_process_quit_handler,
|
||||||
|
nxt_controller_process_new_port_handler,
|
||||||
|
nxt_port_change_log_file_handler,
|
||||||
|
nxt_port_mmap_handler,
|
||||||
|
nxt_port_data_handler,
|
||||||
|
nxt_port_remove_pid_handler,
|
||||||
|
NULL, /* NXT_PORT_MSG_READY */
|
||||||
|
NULL, /* NXT_PORT_MSG_START_WORKER */
|
||||||
|
NULL, /* NXT_PORT_MSG_SOCKET */
|
||||||
|
NULL, /* NXT_PORT_MSG_MODULES */
|
||||||
|
nxt_port_rpc_handler,
|
||||||
|
nxt_port_rpc_handler,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
nxt_int_t
|
nxt_int_t
|
||||||
nxt_controller_start(nxt_task_t *task, void *data)
|
nxt_controller_start(nxt_task_t *task, void *data)
|
||||||
{
|
{
|
||||||
nxt_mp_t *mp;
|
|
||||||
nxt_runtime_t *rt;
|
nxt_runtime_t *rt;
|
||||||
nxt_conf_value_t *conf;
|
|
||||||
nxt_http_fields_hash_t *hash;
|
nxt_http_fields_hash_t *hash;
|
||||||
|
|
||||||
static const nxt_str_t json
|
|
||||||
= nxt_string("{ \"listeners\": {}, \"applications\": {} }");
|
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
|
hash = nxt_http_fields_hash_create(nxt_controller_request_fields,
|
||||||
@@ -110,29 +124,51 @@ nxt_controller_start(nxt_task_t *task, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
nxt_controller_fields_hash = hash;
|
nxt_controller_fields_hash = hash;
|
||||||
|
nxt_queue_init(&nxt_controller_waiting_requests);
|
||||||
|
|
||||||
if (nxt_listen_event(task, rt->controller_socket) == NULL) {
|
return NXT_OK;
|
||||||
return NXT_ERROR;
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_controller_process_new_port_handler(nxt_task_t *task,
|
||||||
|
nxt_port_recv_msg_t *msg)
|
||||||
|
{
|
||||||
|
nxt_mp_t *mp;
|
||||||
|
nxt_runtime_t *rt;
|
||||||
|
nxt_conf_value_t *conf;
|
||||||
|
|
||||||
|
static const nxt_str_t json
|
||||||
|
= nxt_string("{ \"listeners\": {}, \"applications\": {} }");
|
||||||
|
|
||||||
|
nxt_port_new_port_handler(task, msg);
|
||||||
|
|
||||||
|
if (nxt_controller_conf.root != NULL
|
||||||
|
|| msg->new_port->type != NXT_PROCESS_ROUTER)
|
||||||
|
{
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mp = nxt_mp_create(1024, 128, 256, 32);
|
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||||
|
|
||||||
if (nxt_slow_path(mp == NULL)) {
|
if (nxt_slow_path(mp == NULL)) {
|
||||||
return NXT_ERROR;
|
nxt_abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
conf = nxt_conf_json_parse_str(mp, &json);
|
conf = nxt_conf_json_parse_str(mp, &json);
|
||||||
|
|
||||||
if (conf == NULL) {
|
if (nxt_slow_path(conf == NULL)) {
|
||||||
return NXT_ERROR;
|
nxt_abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_controller_conf.root = conf;
|
nxt_controller_conf.root = conf;
|
||||||
nxt_controller_conf.pool = mp;
|
nxt_controller_conf.pool = mp;
|
||||||
|
|
||||||
nxt_queue_init(&nxt_controller_waiting_requests);
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
return NXT_OK;
|
if (nxt_slow_path(nxt_listen_event(task, rt->controller_socket) == NULL)) {
|
||||||
|
nxt_abort();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -98,6 +98,10 @@ nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
|
|||||||
nxt_pid_t pid, nxt_port_id_t port_id);
|
nxt_pid_t pid, nxt_port_id_t port_id);
|
||||||
|
|
||||||
|
|
||||||
|
void nxt_worker_process_quit_handler(nxt_task_t *task,
|
||||||
|
nxt_port_recv_msg_t *msg);
|
||||||
|
|
||||||
|
|
||||||
#if (NXT_HAVE_SETPROCTITLE)
|
#if (NXT_HAVE_SETPROCTITLE)
|
||||||
|
|
||||||
#define nxt_process_title(task, fmt, ...) \
|
#define nxt_process_title(task, fmt, ...) \
|
||||||
|
|||||||
@@ -12,8 +12,6 @@
|
|||||||
|
|
||||||
|
|
||||||
static void nxt_worker_process_quit(nxt_task_t *task);
|
static void nxt_worker_process_quit(nxt_task_t *task);
|
||||||
static void nxt_worker_process_quit_handler(nxt_task_t *task,
|
|
||||||
nxt_port_recv_msg_t *msg);
|
|
||||||
static void nxt_worker_process_signal_handler(nxt_task_t *task, void *obj,
|
static void nxt_worker_process_signal_handler(nxt_task_t *task, void *obj,
|
||||||
void *data);
|
void *data);
|
||||||
static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj,
|
static void nxt_worker_process_sigterm_handler(nxt_task_t *task, void *obj,
|
||||||
@@ -22,22 +20,6 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
|
|||||||
void *data);
|
void *data);
|
||||||
|
|
||||||
|
|
||||||
nxt_port_handler_t nxt_controller_process_port_handlers[] = {
|
|
||||||
nxt_worker_process_quit_handler,
|
|
||||||
nxt_port_new_port_handler,
|
|
||||||
nxt_port_change_log_file_handler,
|
|
||||||
nxt_port_mmap_handler,
|
|
||||||
nxt_port_data_handler,
|
|
||||||
nxt_port_remove_pid_handler,
|
|
||||||
NULL, /* NXT_PORT_MSG_READY */
|
|
||||||
NULL, /* NXT_PORT_MSG_START_WORKER */
|
|
||||||
NULL, /* NXT_PORT_MSG_SOCKET */
|
|
||||||
NULL, /* NXT_PORT_MSG_MODULES */
|
|
||||||
nxt_port_rpc_handler,
|
|
||||||
nxt_port_rpc_handler,
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
nxt_port_handler_t nxt_worker_process_port_handlers[] = {
|
nxt_port_handler_t nxt_worker_process_port_handlers[] = {
|
||||||
nxt_worker_process_quit_handler,
|
nxt_worker_process_quit_handler,
|
||||||
nxt_port_new_port_handler,
|
nxt_port_new_port_handler,
|
||||||
@@ -169,7 +151,7 @@ nxt_worker_process_signal_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
void
|
||||||
nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_worker_process_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
nxt_worker_process_quit(task);
|
nxt_worker_process_quit(task);
|
||||||
|
|||||||
Reference in New Issue
Block a user