Fixed race condition while discovering modules.

Previously, the discovery process might exit before the main process
received a list of available modules.
This commit is contained in:
Valentin Bartenev
2018-02-14 16:33:35 +03:00
parent 94049bf30f
commit 27b00629e1
2 changed files with 52 additions and 21 deletions

View File

@@ -26,6 +26,10 @@ typedef struct {
static nxt_buf_t *nxt_discovery_modules(nxt_task_t *task, const char *path);
static nxt_int_t nxt_discovery_module(nxt_task_t *task, nxt_mp_t *mp,
nxt_array_t *modules, const char *name);
static void nxt_discovery_completion_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data);
static nxt_app_module_t *nxt_app_module_load(nxt_task_t *task,
const char *name);
@@ -46,8 +50,10 @@ static nxt_application_module_t *nxt_app;
nxt_int_t
nxt_discovery_start(nxt_task_t *task, void *data)
{
uint32_t stream;
nxt_buf_t *b;
nxt_port_t *main_port;
nxt_int_t ret;
nxt_port_t *main_port, *discovery_port;
nxt_runtime_t *rt;
nxt_debug(task, "DISCOVERY");
@@ -56,33 +62,29 @@ nxt_discovery_start(nxt_task_t *task, void *data)
b = nxt_discovery_modules(task, rt->modules);
if (nxt_slow_path(b == NULL)) {
exit(1);
return NXT_ERROR;
}
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
discovery_port = rt->port_by_type[NXT_PROCESS_DISCOVERY];
nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
0, -1, b);
stream = nxt_port_rpc_register_handler(task, discovery_port,
nxt_discovery_quit,
nxt_discovery_quit,
main_port->pid, NULL);
ret = nxt_port_socket_write(task, main_port, NXT_PORT_MSG_MODULES, -1,
stream, discovery_port->id, b);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, discovery_port, stream);
return NXT_ERROR;
}
return NXT_OK;
}
static void
nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
nxt_buf_t *b;
b = obj;
mp = b->data;
nxt_mp_destroy(mp);
exit(0);
}
static nxt_buf_t *
nxt_discovery_modules(nxt_task_t *task, const char *path)
{
@@ -279,6 +281,26 @@ fail:
}
static void
nxt_discovery_completion_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
nxt_buf_t *b;
b = obj;
mp = b->data;
nxt_mp_destroy(mp);
}
static void
nxt_discovery_quit(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
{
nxt_worker_process_quit_handler(task, msg);
}
nxt_int_t
nxt_app_start(nxt_task_t *task, void *data)
{

View File

@@ -1112,6 +1112,7 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_mp_t *mp;
nxt_int_t ret;
nxt_buf_t *b;
nxt_port_t *port;
nxt_runtime_t *rt;
nxt_conf_value_t *conf, *root, *value;
nxt_app_lang_module_t *lang;
@@ -1124,6 +1125,14 @@ nxt_main_port_modules_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return;
}
port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
msg->port_msg.reply_port);
if (nxt_fast_path(port != NULL)) {
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR, -1,
msg->port_msg.stream, 0, NULL);
}
b = msg->buf;
if (b == NULL) {