Improved reconfiguration requests serialization.

Previously, only applying of updated configuration was serialized,
while the changes themselves could be done in parallel on the same
configuration.  That resulted in inconsistent behaviour.
This commit is contained in:
Valentin Bartenev
2017-08-28 10:20:40 +03:00
parent 3ab49d14c0
commit 1a5ec7fd08

View File

@@ -63,9 +63,8 @@ static void nxt_controller_process_request(nxt_task_t *task,
nxt_controller_request_t *req);
static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
nxt_controller_request_t *req);
static void nxt_controller_process_waiting(nxt_task_t *task);
static nxt_int_t nxt_controller_conf_pass(nxt_task_t *task,
nxt_conf_value_t *conf);
static void nxt_controller_conf_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_response(nxt_task_t *task,
nxt_controller_request_t *req, nxt_controller_response_t *resp);
static u_char *nxt_controller_date(u_char *buf, nxt_realtime_t *now,
@@ -81,10 +80,8 @@ static nxt_http_fields_hash_entry_t nxt_controller_request_fields[] = {
static nxt_http_fields_hash_t *nxt_controller_fields_hash;
static nxt_controller_conf_t nxt_controller_conf;
static nxt_queue_t nxt_controller_waiting_requests;
static nxt_controller_request_t *nxt_controller_current_request;
static nxt_controller_conf_t nxt_controller_conf;
static nxt_queue_t nxt_controller_waiting_requests;
static const nxt_event_conn_state_t nxt_controller_conn_read_state;
@@ -584,6 +581,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
if (nxt_str_eq(&req->parser.method, "PUT", 3)) {
if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
return;
}
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
@@ -654,6 +656,11 @@ nxt_controller_process_request(nxt_task_t *task, nxt_controller_request_t *req)
if (nxt_str_eq(&req->parser.method, "DELETE", 6)) {
if (!nxt_queue_is_empty(&nxt_controller_waiting_requests)) {
nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
return;
}
if (path.length == 1) {
mp = nxt_mp_create(1024, 128, 256, 32);
@@ -745,20 +752,38 @@ invalid_conf:
static nxt_int_t
nxt_controller_conf_apply(nxt_task_t *task, nxt_controller_request_t *req)
{
nxt_int_t rc;
size_t size;
uint32_t stream;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *router_port, *controller_port;
nxt_runtime_t *rt;
if (nxt_controller_current_request != NULL) {
nxt_queue_insert_tail(&nxt_controller_waiting_requests, &req->link);
return NXT_OK;
}
rt = task->thread->runtime;
rc = nxt_controller_conf_pass(task, req->conf.root);
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
size = nxt_conf_json_length(req->conf.root, NULL);
b = nxt_port_mmap_get_buf(task, router_port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, req->conf.root, NULL);
stream = nxt_port_rpc_register_handler(task, controller_port,
nxt_controller_conf_handler,
nxt_controller_conf_handler,
router_port->pid, req);
rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
return NXT_ERROR;
}
nxt_controller_current_request = req;
nxt_queue_insert_head(&nxt_controller_waiting_requests, &req->link);
return NXT_OK;
}
@@ -768,16 +793,18 @@ static void
nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_queue_t queue;
nxt_controller_request_t *req;
nxt_controller_response_t resp;
req = data;
nxt_debug(task, "controller conf ready: %*s",
nxt_buf_mem_used_size(&msg->buf->mem), msg->buf->mem.pos);
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
nxt_queue_remove(&req->link);
req = nxt_controller_current_request;
nxt_controller_current_request = NULL;
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
if (msg->port_msg.type == NXT_PORT_MSG_RPC_READY) {
nxt_mp_destroy(nxt_controller_conf.pool);
@@ -797,76 +824,17 @@ nxt_controller_conf_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_controller_response(task, req, &resp);
nxt_controller_process_waiting(task);
}
nxt_queue_init(&queue);
nxt_queue_add(&queue, &nxt_controller_waiting_requests);
nxt_queue_init(&nxt_controller_waiting_requests);
static void
nxt_controller_process_waiting(nxt_task_t *task)
{
nxt_controller_request_t *req;
nxt_controller_response_t resp;
nxt_queue_each(req, &nxt_controller_waiting_requests,
nxt_controller_request_t, link)
{
nxt_queue_remove(&req->link);
if (nxt_fast_path(nxt_controller_conf_apply(task, req) == NXT_OK)) {
return;
}
nxt_mp_destroy(req->conf.pool);
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
resp.status = 500;
resp.title = (u_char *) "Memory allocation failed.";
resp.offset = -1;
nxt_controller_response(task, req, &resp);
nxt_queue_each(req, &queue, nxt_controller_request_t, link) {
nxt_controller_process_request(task, req);
} nxt_queue_loop;
}
static nxt_int_t
nxt_controller_conf_pass(nxt_task_t *task, nxt_conf_value_t *conf)
{
size_t size;
uint32_t stream;
nxt_int_t rc;
nxt_buf_t *b;
nxt_port_t *router_port, *controller_port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
controller_port = rt->port_by_type[NXT_PROCESS_CONTROLLER];
size = nxt_conf_json_length(conf, NULL);
b = nxt_port_mmap_get_buf(task, router_port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);
stream = nxt_port_rpc_register_handler(task, controller_port,
nxt_controller_conf_handler,
nxt_controller_conf_handler,
router_port->pid, NULL);
rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1,
stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream);
}
return rc;
}
static void
nxt_controller_response(nxt_task_t *task, nxt_controller_request_t *req,
nxt_controller_response_t *resp)