Controller: sending JSON configuration to router.

This commit is contained in:
Valentin Bartenev
2017-07-06 22:52:05 +03:00
parent 22d73057d9
commit c9fbd832ab
4 changed files with 288 additions and 33 deletions

View File

@@ -55,6 +55,8 @@ static nxt_int_t nxt_controller_request_content_length(void *ctx,
static void nxt_controller_process_request(nxt_task_t *task,
nxt_conn_t *c, nxt_controller_request_t *r);
static nxt_int_t nxt_controller_conf_apply(nxt_task_t *task,
nxt_conf_value_t *conf);
static nxt_int_t nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
nxt_controller_response_t *resp);
static nxt_buf_t *nxt_controller_response_body(nxt_controller_response_t *resp,
@@ -562,38 +564,6 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
nxt_memzero(&resp, sizeof(nxt_controller_response_t));
if (nxt_str_eq(&req->parser.method, "POST", 4)) {
nxt_port_t *port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
nxt_runtime_port_each(rt, port) {
if (nxt_pid == port->pid) {
continue;
}
if (port->type == NXT_PROCESS_ROUTER) {
nxt_buf_t *b, *src;
src = c->read;
b = nxt_port_mmap_get_buf(task, port,
nxt_buf_mem_used_size(&src->mem));
nxt_memcpy(b->mem.pos, src->mem.pos,
nxt_buf_mem_used_size(&src->mem));
b->mem.free += nxt_buf_mem_used_size(&src->mem);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA,
-1, 0, 0, b);
break;
}
} nxt_runtime_port_loop;
}
if (nxt_str_eq(&req->parser.method, "GET", 3)) {
value = nxt_conf_get_path(nxt_controller_conf.root, &path);
@@ -654,12 +624,19 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
}
if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
nxt_mp_destroy(mp);
status = 400;
nxt_str_set(&resp.json,
"{ \"error\": \"Invalid configuration.\" }");
goto done;
}
if (nxt_slow_path(nxt_controller_conf_apply(task, value) != NXT_OK)) {
nxt_mp_destroy(mp);
status = 500;
goto done;
}
nxt_mp_destroy(nxt_controller_conf.pool);
nxt_controller_conf.root = value;
@@ -715,12 +692,19 @@ nxt_controller_process_request(nxt_task_t *task, nxt_conn_t *c,
}
if (nxt_slow_path(nxt_conf_validate(value) != NXT_OK)) {
nxt_mp_destroy(mp);
status = 400;
nxt_str_set(&resp.json,
"{ \"error\": \"Invalid configuration.\" }");
goto done;
}
if (nxt_slow_path(nxt_controller_conf_apply(task, value) != NXT_OK)) {
nxt_mp_destroy(mp);
status = 500;
goto done;
}
nxt_mp_destroy(nxt_controller_conf.pool);
nxt_controller_conf.root = value;
@@ -768,6 +752,200 @@ done:
}
static nxt_int_t
nxt_controller_conf_apply(nxt_task_t *task, nxt_conf_value_t *conf)
{
size_t size;
uint32_t next, n;
nxt_mp_t *mp;
nxt_buf_t *b;
nxt_int_t rc;
nxt_str_t ls_name, name;
nxt_port_t *port;
nxt_uint_t lss_count;
nxt_runtime_t *rt;
nxt_conf_value_t *lss, *ls, *rtr, *rtr_lss, *rtr_ls, *app_name, *apps,
*app, *value;
static nxt_str_t lss_str = nxt_string("listeners");
static nxt_str_t app_str = nxt_string("application");
static nxt_str_t apps_str = nxt_string("applications");
static nxt_str_t type_str = nxt_string("type");
static nxt_str_t app_type_str = nxt_string("_application_type");
static nxt_str_t workers_str = nxt_string("workers");
static nxt_str_t app_workers_str = nxt_string("_application_workers");
static nxt_str_t rtr_str = nxt_string("router");
static nxt_str_t http_str = nxt_string("http");
static nxt_str_t rtr_json = nxt_string("{ \"threads\": 0 }");
static nxt_str_t http_json = nxt_string(
"{ \"header_buffer_size\": 2096,"
" \"large_header_buffer_size\": 16384, "
" \"header_read_timeout\": 60000 }"
);
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NXT_ERROR;
}
lss_count = 0;
lss = nxt_conf_get_object_member(conf, &lss_str, NULL);
if (lss != NULL) {
lss_count = nxt_conf_object_members_count(lss);
}
apps = nxt_conf_get_object_member(conf, &apps_str, NULL);
if (nxt_slow_path(apps == NULL)) {
goto error;
}
rtr_lss = nxt_conf_create_object(mp, lss_count);
if (nxt_slow_path(rtr_lss == NULL)) {
goto error;
}
n = 0;
next = 0;
for ( ;; ) {
ls = nxt_conf_next_object_member(lss, &ls_name, &next);
if (ls == NULL) {
break;
}
rtr_ls = nxt_conf_create_object(mp, 3);
if (nxt_slow_path(rtr_ls == NULL)) {
goto error;
}
app_name = nxt_conf_get_object_member(ls, &app_str, NULL);
if (nxt_slow_path(app_name == NULL)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr_ls, &app_str, app_name, 0);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
nxt_conf_get_string(app_name, &name);
app = nxt_conf_get_object_member(apps, &name, NULL);
if (nxt_slow_path(app == NULL)) {
goto error;
}
value = nxt_conf_get_object_member(app, &type_str, NULL);
if (nxt_slow_path(value == NULL)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr_ls, &app_type_str, value, 1);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
value = nxt_conf_get_object_member(app, &workers_str, NULL);
if (nxt_slow_path(value == NULL)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr_ls, &app_workers_str, value, 2);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr_lss, &ls_name, rtr_ls, n);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
n++;
}
rtr = nxt_conf_create_object(mp, 3);
if (nxt_slow_path(rtr == NULL)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr, &lss_str, rtr_lss, 0);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
value = nxt_conf_json_parse_str(mp, &rtr_json);
if (nxt_slow_path(value == NULL)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr, &rtr_str, value, 1);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
value = nxt_conf_json_parse_str(mp, &http_json);
if (nxt_slow_path(value == NULL)) {
goto error;
}
rc = nxt_conf_set_object_member(mp, rtr, &http_str, value, 2);
if (nxt_slow_path(rc != NXT_OK)) {
goto error;
}
rt = task->thread->runtime;
nxt_runtime_port_each(rt, port) {
if (port->type == NXT_PROCESS_ROUTER) {
break;
}
} nxt_runtime_port_loop;
size = nxt_conf_json_length(rtr, NULL);
b = nxt_port_mmap_get_buf(task, port, size);
b->mem.free = nxt_conf_json_print(b->mem.free, rtr, NULL);
nxt_mp_destroy(mp);
nxt_debug(task, "conf for router: \"%*s\"",
b->mem.free - b->mem.pos, b->mem.pos);
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, 0, b);
return NXT_OK;
error:
nxt_mp_destroy(mp);
return NXT_ERROR;
}
static nxt_int_t
nxt_controller_response(nxt_task_t *task, nxt_conn_t *c,
nxt_controller_response_t *resp)