Controller waits READY message from router.
This required to avoid racing condition when controller receive router port before router receives controller port.
This commit is contained in:
@@ -40,6 +40,9 @@ typedef struct {
|
||||
|
||||
static void nxt_controller_process_new_port_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
static void nxt_controller_send_current_conf(nxt_task_t *task);
|
||||
static void nxt_controller_router_ready_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg);
|
||||
static nxt_int_t nxt_controller_conf_default(void);
|
||||
static void nxt_controller_conf_init_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg, void *data);
|
||||
@@ -87,6 +90,7 @@ static nxt_http_field_proc_t nxt_controller_request_fields[] = {
|
||||
static nxt_lvlhsh_t nxt_controller_fields_hash;
|
||||
|
||||
static nxt_uint_t nxt_controller_listening;
|
||||
static nxt_uint_t nxt_controller_router_ready;
|
||||
static nxt_controller_conf_t nxt_controller_conf;
|
||||
static nxt_queue_t nxt_controller_waiting_requests;
|
||||
|
||||
@@ -98,14 +102,15 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state;
|
||||
|
||||
|
||||
nxt_port_handlers_t nxt_controller_process_port_handlers = {
|
||||
.quit = nxt_worker_process_quit_handler,
|
||||
.new_port = nxt_controller_process_new_port_handler,
|
||||
.change_file = nxt_port_change_log_file_handler,
|
||||
.mmap = nxt_port_mmap_handler,
|
||||
.data = nxt_port_data_handler,
|
||||
.remove_pid = nxt_port_remove_pid_handler,
|
||||
.rpc_ready = nxt_port_rpc_handler,
|
||||
.rpc_error = nxt_port_rpc_handler,
|
||||
.quit = nxt_worker_process_quit_handler,
|
||||
.new_port = nxt_controller_process_new_port_handler,
|
||||
.change_file = nxt_port_change_log_file_handler,
|
||||
.mmap = nxt_port_mmap_handler,
|
||||
.process_ready = nxt_controller_router_ready_handler,
|
||||
.data = nxt_port_data_handler,
|
||||
.remove_pid = nxt_port_remove_pid_handler,
|
||||
.rpc_ready = nxt_port_rpc_handler,
|
||||
.rpc_error = nxt_port_rpc_handler,
|
||||
};
|
||||
|
||||
|
||||
@@ -201,17 +206,26 @@ nxt_controller_start(nxt_task_t *task, void *data)
|
||||
static void
|
||||
nxt_controller_process_new_port_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_port_new_port_handler(task, msg);
|
||||
|
||||
if (msg->u.new_port->type != NXT_PROCESS_ROUTER
|
||||
|| !nxt_controller_router_ready)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_controller_send_current_conf(task);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_controller_send_current_conf(nxt_task_t *task)
|
||||
{
|
||||
nxt_int_t rc;
|
||||
nxt_runtime_t *rt;
|
||||
nxt_conf_value_t *conf;
|
||||
|
||||
nxt_port_new_port_handler(task, msg);
|
||||
|
||||
if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
|
||||
return;
|
||||
}
|
||||
|
||||
conf = nxt_controller_conf.root;
|
||||
|
||||
if (conf != NULL) {
|
||||
@@ -243,6 +257,25 @@ nxt_controller_process_new_port_handler(nxt_task_t *task,
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_controller_router_ready_handler(nxt_task_t *task,
|
||||
nxt_port_recv_msg_t *msg)
|
||||
{
|
||||
nxt_port_t *router_port;
|
||||
nxt_runtime_t *rt;
|
||||
|
||||
rt = task->thread->runtime;
|
||||
|
||||
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
|
||||
|
||||
nxt_controller_router_ready = 1;
|
||||
|
||||
if (router_port != NULL) {
|
||||
nxt_controller_send_current_conf(task);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_controller_conf_default(void)
|
||||
{
|
||||
@@ -316,7 +349,7 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
|
||||
|
||||
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
|
||||
|
||||
if (nxt_slow_path(router_port == NULL)) {
|
||||
if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) {
|
||||
return NXT_DECLINED;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user