From 179819dbee58242f95c2b866c67a4d6a0a863ef6 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Thu, 26 Apr 2018 16:44:20 +0300 Subject: [PATCH] Controller waits READY message from router. This required to avoid racing condition when controller receive router port before router receives controller port. --- src/nxt_controller.c | 63 ++++++++++++++++++++++++++++++---------- src/nxt_router.c | 34 ++++++++++++++++++++++ src/nxt_worker_process.c | 13 --------- 3 files changed, 82 insertions(+), 28 deletions(-) diff --git a/src/nxt_controller.c b/src/nxt_controller.c index 0b863f0c..17627e83 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -40,6 +40,9 @@ typedef struct { static void nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg); +static void nxt_controller_send_current_conf(nxt_task_t *task); +static void nxt_controller_router_ready_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg); static nxt_int_t nxt_controller_conf_default(void); static void nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data); @@ -87,6 +90,7 @@ static nxt_http_field_proc_t nxt_controller_request_fields[] = { static nxt_lvlhsh_t nxt_controller_fields_hash; static nxt_uint_t nxt_controller_listening; +static nxt_uint_t nxt_controller_router_ready; static nxt_controller_conf_t nxt_controller_conf; static nxt_queue_t nxt_controller_waiting_requests; @@ -98,14 +102,15 @@ static const nxt_event_conn_state_t nxt_controller_conn_close_state; nxt_port_handlers_t nxt_controller_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, - .new_port = nxt_controller_process_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_port_data_handler, - .remove_pid = nxt_port_remove_pid_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_controller_process_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .process_ready = nxt_controller_router_ready_handler, + .data = nxt_port_data_handler, + .remove_pid = nxt_port_remove_pid_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, }; @@ -201,17 +206,26 @@ nxt_controller_start(nxt_task_t *task, void *data) static void nxt_controller_process_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) +{ + nxt_port_new_port_handler(task, msg); + + if (msg->u.new_port->type != NXT_PROCESS_ROUTER + || !nxt_controller_router_ready) + { + return; + } + + nxt_controller_send_current_conf(task); +} + + +static void +nxt_controller_send_current_conf(nxt_task_t *task) { nxt_int_t rc; nxt_runtime_t *rt; nxt_conf_value_t *conf; - nxt_port_new_port_handler(task, msg); - - if (msg->u.new_port->type != NXT_PROCESS_ROUTER) { - return; - } - conf = nxt_controller_conf.root; if (conf != NULL) { @@ -243,6 +257,25 @@ nxt_controller_process_new_port_handler(nxt_task_t *task, } +static void +nxt_controller_router_ready_handler(nxt_task_t *task, + nxt_port_recv_msg_t *msg) +{ + nxt_port_t *router_port; + nxt_runtime_t *rt; + + rt = task->thread->runtime; + + router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; + + nxt_controller_router_ready = 1; + + if (router_port != NULL) { + nxt_controller_send_current_conf(task); + } +} + + static nxt_int_t nxt_controller_conf_default(void) { @@ -316,7 +349,7 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, router_port = rt->port_by_type[NXT_PROCESS_ROUTER]; - if (nxt_slow_path(router_port == NULL)) { + if (nxt_slow_path(router_port == NULL || !nxt_controller_router_ready)) { return NXT_DECLINED; } diff --git a/src/nxt_router.c b/src/nxt_router.c index ddedaa59..48ad55cd 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -100,6 +100,9 @@ struct nxt_port_select_state_s { typedef struct nxt_port_select_state_s nxt_port_select_state_t; +static void nxt_router_greet_controller(nxt_task_t *task, + nxt_port_t *controller_port); + static void nxt_router_port_select(nxt_task_t *task, nxt_port_select_state_t *state); @@ -275,10 +278,24 @@ static nxt_app_prepare_msg_t nxt_app_prepare_msg[] = { }; +nxt_port_handlers_t nxt_router_process_port_handlers = { + .quit = nxt_worker_process_quit_handler, + .new_port = nxt_router_new_port_handler, + .change_file = nxt_port_change_log_file_handler, + .mmap = nxt_port_mmap_handler, + .data = nxt_router_conf_data_handler, + .remove_pid = nxt_router_remove_pid_handler, + .access_log = nxt_router_access_log_reopen_handler, + .rpc_ready = nxt_port_rpc_handler, + .rpc_error = nxt_port_rpc_handler, +}; + + nxt_int_t nxt_router_start(nxt_task_t *task, void *data) { nxt_int_t ret; + nxt_port_t *controller_port; nxt_router_t *router; nxt_runtime_t *rt; @@ -300,10 +317,23 @@ nxt_router_start(nxt_task_t *task, void *data) nxt_router = router; + controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER]; + if (controller_port != NULL) { + nxt_router_greet_controller(task, controller_port); + } + return NXT_OK; } +static void +nxt_router_greet_controller(nxt_task_t *task, nxt_port_t *controller_port) +{ + nxt_port_socket_write(task, controller_port, NXT_PORT_MSG_PROCESS_READY, + -1, 0, 0, NULL); +} + + static void nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port, void *data) @@ -749,6 +779,10 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) { nxt_port_new_port_handler(task, msg); + if (msg->u.new_port->type == NXT_PROCESS_CONTROLLER) { + nxt_router_greet_controller(task, msg->u.new_port); + } + if (msg->port_msg.stream == 0) { return; } diff --git a/src/nxt_worker_process.c b/src/nxt_worker_process.c index 0e23aea0..bf35f2b5 100644 --- a/src/nxt_worker_process.c +++ b/src/nxt_worker_process.c @@ -30,19 +30,6 @@ nxt_port_handlers_t nxt_app_process_port_handlers = { }; -nxt_port_handlers_t nxt_router_process_port_handlers = { - .quit = nxt_worker_process_quit_handler, - .new_port = nxt_router_new_port_handler, - .change_file = nxt_port_change_log_file_handler, - .mmap = nxt_port_mmap_handler, - .data = nxt_router_conf_data_handler, - .remove_pid = nxt_router_remove_pid_handler, - .access_log = nxt_router_access_log_reopen_handler, - .rpc_ready = nxt_port_rpc_handler, - .rpc_error = nxt_port_rpc_handler, -}; - - nxt_port_handlers_t nxt_discovery_process_port_handlers = { .quit = nxt_worker_process_quit_handler, .new_port = nxt_port_new_port_handler,