Controller: resending configuration to router after its restart.

Now router crash can be survived with less damage.
This commit is contained in:
Valentin Bartenev
2017-08-30 03:10:13 +03:00
parent f528cb393a
commit 55fe80600c

View File

@@ -40,6 +40,11 @@ typedef struct {
static void nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static nxt_int_t nxt_controller_conf_send(nxt_task_t *task,
nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
@@ -64,8 +69,6 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx,
static void nxt_controller_process_request(nxt_task_t *task,
nxt_controller_request_t *req);
static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
nxt_controller_request_t *req);
static void nxt_controller_conf_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_response(nxt_task_t *task,
@@ -134,35 +137,36 @@ static void
nxt_controller_process_new_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg)
{
nxt_mp_t *mp;
nxt_int_t rc;
nxt_runtime_t *rt;
nxt_conf_value_t *conf;
static const nxt_str_t json
= nxt_string("{ \"listeners\": {}, \"applications\": {} }");
nxt_port_new_port_handler(task, msg);
if (nxt_controller_conf.root != NULL
|| msg->new_port->type != NXT_PROCESS_ROUTER)
{
if (msg->new_port->type != NXT_PROCESS_ROUTER) {
return;
}
mp = nxt_mp_create(1024, 128, 256, 32);
conf = nxt_controller_conf.root;
if (nxt_slow_path(mp == NULL)) {
nxt_abort();
if (conf != NULL) {
rc = nxt_controller_conf_send(task, conf,
nxt_controller_conf_init_handler, NULL);
if (nxt_fast_path(rc == NXT_OK)) {
return;
}
conf = nxt_conf_json_parse_str(mp, &json);
nxt_mp_destroy(nxt_controller_conf.pool);
if (nxt_slow_path(conf == NULL)) {
if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
nxt_abort();
}
}
nxt_controller_conf.root = conf;
nxt_controller_conf.pool = mp;
if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
nxt_abort();
}
rt = task->thread->runtime;
@@ -172,6 +176,91 @@ nxt_controller_process_new_port_handler(nxt_task_t *task,
}
static nxt_int_t
nxt_controller_conf_default(void)
{
nxt_mp_t *mp;
nxt_conf_value_t *conf;
static const nxt_str_t json
= nxt_string("{ \"listeners\": {}, \"applications\": {} }");
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NXT_ERROR;
}
conf = nxt_conf_json_parse_str(mp, &json);
if (nxt_slow_path(conf == NULL)) {
return NXT_ERROR;
}
nxt_controller_conf.root = conf;
nxt_controller_conf.pool = mp;
return NXT_OK;
}
static void
nxt_controller_conf_init_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
if (msg->port_msg.type != NXT_PORT_MSG_RPC_READY) {
nxt_mp_destroy(nxt_controller_conf.pool);
if (nxt_slow_path(nxt_controller_conf_default() != NXT_OK)) {
nxt_abort();
}
}
}
static nxt_int_t
nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
nxt_port_rpc_handler_t handler, void *data)
{
size_t size;
uint32_t stream;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *router_port, *controller_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
if (nxt_slow_path(router_port == NULL)) {
return NXT_DECLINED;
}
controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
size = nxt_conf_json_length(conf, NULL);
b = nxt_port_mmap_get_buf(task, router_port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
stream = nxt_port_rpc_register_handler(task, controller_port,
handler, handler,
router_port->pid, data);
rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
return NXT_ERROR;
}
return NXT_OK;
}
nxt_int_t
nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
{
@@ -679,10 +768,8 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto invalid_conf;
}
req->conf.root = value;
req->conf.pool = mp;
rc = nxt_controller_conf_apply(task, req);
rc = nxt_controller_conf_send(task, value,
nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_mp_destroy(mp);
@@ -695,6 +782,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto alloc_fail;
}
req->conf.root = value;
req->conf.pool = mp;
nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
return;
}
@@ -746,10 +838,8 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto invalid_conf;
}
req->conf.root = value;
req->conf.pool = mp;
rc = nxt_controller_conf_apply(task, req);
rc = nxt_controller_conf_send(task, value,
nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_mp_destroy(mp);
@@ -762,6 +852,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
goto alloc_fail;
}
req->conf.root = value;
req->conf.pool = mp;
nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
return;
}
@@ -810,51 +905,6 @@ no_router:
}
static nxt_int_t
nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
{
size_t size;
uint32_t stream;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *router_port, *controller_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
if (nxt_slow_path(router_port == NULL)) {
return NXT_DECLINED;
}
controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
size = nxt_conf_json_length(req->conf.root, NULL);
b = nxt_port_mmap_get_buf(task, router_port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
stream = nxt_port_rpc_register_handler(task, controller_port,
nxt_controller_conf_handler,
nxt_controller_conf_handler,
router_port->pid, req);
rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
return NXT_ERROR;
}
nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
return NXT_OK;
}
static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)