Router: processing application configuration.

This commit is contained in:
Igor Sysoev
2017-07-07 21:09:45 +03:00
parent fd0a4ff064
commit b54bcef53d
3 changed files with 274 additions and 42 deletions

View File

@@ -9,6 +9,14 @@
#define _NXT_APPLICATION_H_INCLUDED_
typedef enum {
NXT_APP_PYTHON = 0,
NXT_APP_PHP,
NXT_APP_RUBY,
NXT_APP_GO,
} nxt_app_type_t;
typedef struct {
nxt_str_t name;
nxt_str_t value;

View File

@@ -7,12 +7,16 @@
#include <nxt_router.h>
#include <nxt_conf.h>
#include <nxt_application.h>
typedef struct {
nxt_str_t application_type;
uint32_t application_workers;
nxt_str_t type;
uint32_t workers;
} nxt_router_app_conf_t;
typedef struct {
nxt_str_t application;
} nxt_router_listener_conf_t;
@@ -23,6 +27,9 @@ static void nxt_router_listen_sockets_sort(nxt_router_t *router,
static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
static nxt_app_t *nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name);
static nxt_app_t *nxt_router_listener_application(nxt_router_temp_conf_t *tmcf,
nxt_str_t *name);
static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
@@ -48,6 +55,8 @@ static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine);
static void nxt_router_apps_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
@@ -100,6 +109,7 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_queue_init(&router->engines);
nxt_queue_init(&router->sockets);
nxt_queue_init(&router->apps);
nxt_router = router;
@@ -144,6 +154,8 @@ nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router,
return ret;
}
nxt_router_apps_sort(router, tmcf);
nxt_router_engines_post(tmcf);
nxt_queue_add(&router->sockets, &tmcf->updating);
@@ -200,6 +212,8 @@ nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
nxt_queue_init(&tmcf->updating);
nxt_queue_init(&tmcf->pending);
nxt_queue_init(&tmcf->creating);
nxt_queue_init(&tmcf->apps);
nxt_queue_init(&tmcf->previous);
return tmcf;
@@ -217,7 +231,7 @@ fail:
static nxt_conf_map_t nxt_router_conf[] = {
{
nxt_string("threads"),
nxt_string("listeners_threads"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_conf_t, threads),
},
@@ -228,17 +242,30 @@ static nxt_conf_map_t nxt_router_conf[] = {
};
static nxt_conf_map_t nxt_router_listener_conf[] = {
static nxt_conf_map_t nxt_router_app_conf[] = {
{
nxt_string("_application_type"),
nxt_string("type"),
NXT_CONF_MAP_STR,
offsetof(nxt_router_listener_conf_t, application_type),
offsetof(nxt_router_app_conf_t, type),
},
{
nxt_string("_application_workers"),
nxt_string("workers"),
NXT_CONF_MAP_INT32,
offsetof(nxt_router_listener_conf_t, application_workers),
offsetof(nxt_router_app_conf_t, workers),
},
{
nxt_null_string, 0, 0,
},
};
static nxt_conf_map_t nxt_router_listener_conf[] = {
{
nxt_string("application"),
NXT_CONF_MAP_STR,
offsetof(nxt_router_listener_conf_t, application),
},
{
@@ -276,17 +303,25 @@ static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
{
u_char *p;
size_t size;
nxt_mp_t *mp;
uint32_t next;
nxt_int_t ret;
nxt_str_t name;
nxt_app_t *app, *prev;
nxt_app_type_t type;
nxt_sockaddr_t *sa;
nxt_conf_value_t *conf, *listeners, *router, *http, *listener;
nxt_queue_link_t *qlk, *nqlk;
nxt_conf_value_t *conf, *http;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_socket_conf_t *skcf;
nxt_router_app_conf_t apcf;
nxt_router_listener_conf_t lscf;
static nxt_str_t router_path = nxt_string("/router");
static nxt_str_t http_path = nxt_string("/http");
static nxt_str_t applications_path = nxt_string("/applications");
static nxt_str_t listeners_path = nxt_string("/listeners");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end);
@@ -295,16 +330,9 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NXT_ERROR;
}
router = nxt_conf_get_path(conf, &router_path);
if (router == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block");
return NXT_ERROR;
}
ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf);
ret = nxt_conf_map_object(conf, nxt_router_conf, tmcf->conf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "router map error");
nxt_log(task, NXT_LOG_CRIT, "root map error");
return NXT_ERROR;
}
@@ -312,24 +340,109 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
tmcf->conf->threads = nxt_ncpu;
}
http = nxt_conf_get_path(conf, &http_path);
if (http == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block");
applications = nxt_conf_get_path(conf, &applications_path);
if (applications == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"applications\" block");
return NXT_ERROR;
}
listeners = nxt_conf_get_path(conf, &listeners_path);
if (listeners == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block");
return NXT_ERROR;
}
mp = tmcf->conf->mem_pool;
next = 0;
for ( ;; ) {
application = nxt_conf_next_object_member(applications, &name, &next);
if (application == NULL) {
break;
}
nxt_debug(task, "application \"%V\"", &name);
app = nxt_zalloc(sizeof(nxt_app_t));
if (app == NULL) {
goto fail;
}
size = nxt_conf_json_length(application, NULL);
app->conf.start = nxt_malloc(size);
if (app->conf.start == NULL) {
nxt_free(app);
goto fail;
}
p = nxt_conf_json_print(app->conf.start, application, NULL);
app->conf.length = p - app->conf.start;
nxt_debug(task, "application conf \"%V\"", &app->conf);
prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
nxt_free(app->conf.start);
nxt_free(app);
nxt_queue_remove(&prev->link);
nxt_queue_insert_tail(&tmcf->previous, &prev->link);
continue;
}
ret = nxt_conf_map_object(application, nxt_router_app_conf, &apcf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "application map error");
goto app_fail;
}
nxt_debug(task, "application type: %V", &apcf.type);
nxt_debug(task, "application workers: %D", apcf.workers);
if (nxt_str_eq(&apcf.type, "python", 6)) {
type = NXT_APP_PYTHON;
} else if (nxt_str_eq(&apcf.type, "php", 3)) {
type = NXT_APP_PHP;
} else if (nxt_str_eq(&apcf.type, "ruby", 4)) {
type = NXT_APP_RUBY;
} else if (nxt_str_eq(&apcf.type, "go", 2)) {
type = NXT_APP_GO;
} else {
nxt_log(task, NXT_LOG_CRIT, "unsupported application type: \"%V\"",
&apcf.type);
goto app_fail;
}
ret = nxt_thread_mutex_create(&app->mutex);
if (ret != NXT_OK) {
goto app_fail;
}
app->name = name;
app->type = type;
app->max_workers = apcf.workers;
app->live = 1;
nxt_queue_insert_tail(&tmcf->apps, &app->link);
}
http = nxt_conf_get_path(conf, &http_path);
#if 0
if (http == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"http\" block");
return NXT_ERROR;
}
#endif
listeners = nxt_conf_get_path(conf, &listeners_path);
if (listeners == NULL) {
nxt_log(task, NXT_LOG_CRIT, "no \"listeners\" block");
return NXT_ERROR;
}
next = 0;
mp = tmcf->conf->mem_pool;
for ( ;; ) {
listener = nxt_conf_next_object_member(listeners, &name, &next);
if (listener == NULL) {
@@ -339,7 +452,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
sa = nxt_sockaddr_parse(mp, &name);
if (sa == NULL) {
nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
return NXT_ERROR;
goto fail;
}
sa->type = SOCK_STREAM;
@@ -349,31 +462,95 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf = nxt_router_socket_conf(task, mp, sa);
if (skcf == NULL) {
return NXT_ERROR;
goto fail;
}
ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "listener map error");
return NXT_ERROR;
goto fail;
}
nxt_debug(task, "router type: %V", &lscf.application_type);
nxt_debug(task, "router workers: %D", lscf.application_workers);
nxt_debug(task, "application: %V", &lscf.application);
// STUB, default values if http block is not defined.
skcf->header_buffer_size = 2048;
skcf->large_header_buffer_size = 8192;
skcf->header_read_timeout = 5000;
if (http != NULL) {
ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf);
if (ret != NXT_OK) {
nxt_log(task, NXT_LOG_CRIT, "http map error");
return NXT_ERROR;
goto fail;
}
}
skcf->listen.handler = nxt_router_conn_init;
skcf->router_conf = tmcf->conf;
skcf->application = nxt_router_listener_application(tmcf,
&lscf.application);
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
}
return NXT_OK;
app_fail:
nxt_free(app->conf.start);
nxt_free(app);
fail:
for (qlk = nxt_queue_first(&tmcf->apps);
qlk != nxt_queue_tail(&tmcf->apps);
qlk = nqlk)
{
nqlk = nxt_queue_next(qlk);
app = nxt_queue_link_data(qlk, nxt_app_t, link);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
}
return NXT_ERROR;
}
static nxt_app_t *
nxt_router_app_find(nxt_queue_t *queue, nxt_str_t *name)
{
nxt_app_t *app;
nxt_queue_link_t *qlk;
for (qlk = nxt_queue_first(queue);
qlk != nxt_queue_tail(queue);
qlk = nxt_queue_next(qlk))
{
app = nxt_queue_link_data(qlk, nxt_app_t, link);
if (nxt_strstr_eq(name, &app->name)) {
return app;
}
}
return NULL;
}
static nxt_app_t *
nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
{
nxt_app_t *app;
app = nxt_router_app_find(&tmcf->apps, name);
if (app == NULL) {
app = nxt_router_app_find(&tmcf->conf->router->apps, name);
}
return app;
}
@@ -840,6 +1017,29 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
}
static void
nxt_router_apps_sort(nxt_router_t *router, nxt_router_temp_conf_t *tmcf)
{
nxt_app_t *app;
nxt_queue_link_t *qlk, *nqlk;
for (qlk = nxt_queue_first(&router->apps);
qlk != nxt_queue_tail(&router->apps);
qlk = nqlk)
{
nqlk = nxt_queue_next(qlk);
app = nxt_queue_link_data(qlk, nxt_app_t, link);
nxt_queue_remove(&app->link);
// RELEASE APP
}
nxt_queue_add(&router->apps, &tmcf->previous);
nxt_queue_add(&router->apps, &tmcf->apps);
}
static void
nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
{

View File

@@ -11,6 +11,7 @@
#include <nxt_main.h>
#include <nxt_runtime.h>
#include <nxt_master_process.h>
#include <nxt_application.h>
typedef struct {
@@ -18,6 +19,7 @@ typedef struct {
nxt_queue_t engines;
nxt_queue_t sockets; /* of nxt_socket_conf_t */
nxt_queue_t apps; /* of nxt_app_t */
} nxt_router_t;
@@ -45,6 +47,9 @@ typedef struct {
nxt_queue_t keeping; /* of nxt_socket_conf_t */
nxt_queue_t deleting; /* of nxt_socket_conf_t */
nxt_queue_t apps; /* of nxt_app_t */
nxt_queue_t previous; /* of nxt_app_t */
uint32_t new_threads;
nxt_array_t *engines;
@@ -53,6 +58,23 @@ typedef struct {
} nxt_router_temp_conf_t;
typedef struct {
nxt_thread_mutex_t mutex;
nxt_queue_t ports;
nxt_str_t name;
uint32_t workers;
uint32_t max_workers;
nxt_app_type_t type:8;
uint8_t live; /* 1 bit */
nxt_queue_link_t link;
nxt_str_t conf;
} nxt_app_t;
typedef struct {
uint32_t count;
nxt_socket_t fd;
@@ -66,6 +88,8 @@ typedef struct {
nxt_router_conf_t *router_conf;
nxt_sockaddr_t *sockaddr;
nxt_app_t *application;
nxt_listen_socket_t listen;
size_t header_buffer_size;