|
|
|
|
@@ -6,37 +6,43 @@
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
#include <nxt_router.h>
|
|
|
|
|
#include <nxt_conf.h>
|
|
|
|
|
#include <nxt_application.h>
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
nxt_str_t application_type;
|
|
|
|
|
uint32_t application_workers;
|
|
|
|
|
} nxt_router_listener_conf_t;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
|
|
|
|
|
nxt_router_t *router);
|
|
|
|
|
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf);
|
|
|
|
|
|
|
|
|
|
static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf);
|
|
|
|
|
static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
|
|
|
|
|
static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf);
|
|
|
|
|
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_sockaddr_t *sa);
|
|
|
|
|
static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
|
|
|
|
|
nxt_mp_t *mp, uint32_t port);
|
|
|
|
|
|
|
|
|
|
static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
|
|
|
|
|
nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
const nxt_event_interface_t *interface);
|
|
|
|
|
static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
|
|
|
|
|
static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
|
|
|
|
|
static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf);
|
|
|
|
|
static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
static nxt_int_t nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
nxt_router_engine_conf_t *recf);
|
|
|
|
|
static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
nxt_router_engine_conf_t *recf);
|
|
|
|
|
static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
nxt_router_engine_conf_t *recf);
|
|
|
|
|
static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
|
|
|
|
|
static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp,
|
|
|
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
|
|
|
|
|
nxt_work_handler_t handler);
|
|
|
|
|
static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
|
|
|
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
|
|
|
|
|
static nxt_int_t nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf,
|
|
|
|
|
nxt_queue_t *sockets);
|
|
|
|
|
|
|
|
|
|
static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf);
|
|
|
|
|
@@ -78,10 +84,8 @@ static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
|
|
|
|
|
nxt_int_t
|
|
|
|
|
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
|
|
|
|
|
{
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_router_t *router;
|
|
|
|
|
nxt_router_temp_conf_t *tmcf;
|
|
|
|
|
const nxt_event_interface_t *interface;
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_router_t *router;
|
|
|
|
|
|
|
|
|
|
ret = nxt_app_http_init(task, rt);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
@@ -96,14 +100,24 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
|
|
|
|
|
nxt_queue_init(&router->engines);
|
|
|
|
|
nxt_queue_init(&router->sockets);
|
|
|
|
|
|
|
|
|
|
/**/
|
|
|
|
|
return NXT_OK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
nxt_int_t
|
|
|
|
|
nxt_router_new_conf(nxt_task_t *task, nxt_runtime_t *rt, nxt_router_t *router,
|
|
|
|
|
u_char *start, u_char *end)
|
|
|
|
|
{
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_router_temp_conf_t *tmcf;
|
|
|
|
|
const nxt_event_interface_t *interface;
|
|
|
|
|
|
|
|
|
|
tmcf = nxt_router_temp_conf(task, router);
|
|
|
|
|
if (nxt_slow_path(tmcf == NULL)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_stub_conf(task, tmcf);
|
|
|
|
|
ret = nxt_router_conf_create(task, tmcf, start, end);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
@@ -132,6 +146,8 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
|
|
|
|
|
nxt_queue_add(&router->sockets, &tmcf->updating);
|
|
|
|
|
nxt_queue_add(&router->sockets, &tmcf->creating);
|
|
|
|
|
|
|
|
|
|
// nxt_mp_destroy(tmcf->mem_pool);
|
|
|
|
|
|
|
|
|
|
return NXT_OK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -196,34 +212,159 @@ fail:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
|
|
|
|
|
{
|
|
|
|
|
nxt_mp_t *mp;
|
|
|
|
|
nxt_sockaddr_t *sa;
|
|
|
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
|
static nxt_conf_map_t nxt_router_conf[] = {
|
|
|
|
|
{
|
|
|
|
|
nxt_string("threads"),
|
|
|
|
|
NXT_CONF_MAP_INT32,
|
|
|
|
|
offsetof(nxt_router_conf_t, threads),
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
tmcf->conf->threads = 1;
|
|
|
|
|
{
|
|
|
|
|
nxt_null_string, 0, 0,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_conf_map_t nxt_router_listener_conf[] = {
|
|
|
|
|
{
|
|
|
|
|
nxt_string("_application_type"),
|
|
|
|
|
NXT_CONF_MAP_STR,
|
|
|
|
|
offsetof(nxt_router_listener_conf_t, application_type),
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
nxt_string("_application_workers"),
|
|
|
|
|
NXT_CONF_MAP_INT32,
|
|
|
|
|
offsetof(nxt_router_listener_conf_t, application_workers),
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
nxt_null_string, 0, 0,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_conf_map_t nxt_router_http_conf[] = {
|
|
|
|
|
{
|
|
|
|
|
nxt_string("header_buffer_size"),
|
|
|
|
|
NXT_CONF_MAP_SIZE,
|
|
|
|
|
offsetof(nxt_socket_conf_t, header_buffer_size),
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
nxt_string("large_header_buffer_size"),
|
|
|
|
|
NXT_CONF_MAP_SIZE,
|
|
|
|
|
offsetof(nxt_socket_conf_t, large_header_buffer_size),
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
nxt_string("header_read_timeout"),
|
|
|
|
|
NXT_CONF_MAP_MSEC,
|
|
|
|
|
offsetof(nxt_socket_conf_t, header_read_timeout),
|
|
|
|
|
},
|
|
|
|
|
|
|
|
|
|
{
|
|
|
|
|
nxt_null_string, 0, 0,
|
|
|
|
|
},
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
u_char *start, u_char *end)
|
|
|
|
|
{
|
|
|
|
|
nxt_mp_t *mp;
|
|
|
|
|
uint32_t next;
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_str_t name;
|
|
|
|
|
nxt_sockaddr_t *sa;
|
|
|
|
|
nxt_conf_value_t *conf, *listeners, *router, *http, *listener;
|
|
|
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
|
nxt_router_listener_conf_t lscf;
|
|
|
|
|
|
|
|
|
|
static nxt_str_t router_path = nxt_string("/router");
|
|
|
|
|
static nxt_str_t http_path = nxt_string("/http");
|
|
|
|
|
static nxt_str_t listeners_path = nxt_string("/listeners");
|
|
|
|
|
|
|
|
|
|
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end);
|
|
|
|
|
if (conf == NULL) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
router = nxt_conf_get_path(conf, &router_path);
|
|
|
|
|
|
|
|
|
|
if (router == NULL) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "no \"/router\" block");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_conf_map_object(router, nxt_router_conf, tmcf->conf);
|
|
|
|
|
if (ret != NXT_OK) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "router map error");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
http = nxt_conf_get_path(conf, &http_path);
|
|
|
|
|
|
|
|
|
|
if (http == NULL) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "no \"/http\" block");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
listeners = nxt_conf_get_path(conf, &listeners_path);
|
|
|
|
|
|
|
|
|
|
if (listeners == NULL) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "no \"/listeners\" block");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
mp = tmcf->conf->mem_pool;
|
|
|
|
|
|
|
|
|
|
sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
|
|
|
|
|
skcf = nxt_router_socket_conf(task, mp, sa);
|
|
|
|
|
next = 0;
|
|
|
|
|
|
|
|
|
|
skcf->listen.handler = nxt_router_conn_init;
|
|
|
|
|
skcf->header_buffer_size = 2048;
|
|
|
|
|
skcf->large_header_buffer_size = 8192;
|
|
|
|
|
skcf->header_read_timeout = 5000;
|
|
|
|
|
for ( ;; ) {
|
|
|
|
|
listener = nxt_conf_next_object_member(listeners, &name, &next);
|
|
|
|
|
if (listener == NULL) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
|
|
|
|
|
sa = nxt_sockaddr_parse(mp, &name);
|
|
|
|
|
if (sa == NULL) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
|
|
|
|
|
skcf = nxt_router_socket_conf(task, mp, sa);
|
|
|
|
|
sa->type = SOCK_STREAM;
|
|
|
|
|
|
|
|
|
|
skcf->listen.handler = nxt_stream_connection_init;
|
|
|
|
|
skcf->header_read_timeout = 5000;
|
|
|
|
|
nxt_debug(task, "router listener: \"%*s\"",
|
|
|
|
|
sa->length, nxt_sockaddr_start(sa));
|
|
|
|
|
|
|
|
|
|
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
|
|
|
|
|
skcf = nxt_router_socket_conf(task, mp, sa);
|
|
|
|
|
if (skcf == NULL) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_conf_map_object(listener, nxt_router_listener_conf, &lscf);
|
|
|
|
|
if (ret != NXT_OK) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "listener map error");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nxt_debug(task, "router type: %V", &lscf.application_type);
|
|
|
|
|
nxt_debug(task, "router workers: %D", lscf.application_workers);
|
|
|
|
|
|
|
|
|
|
ret = nxt_conf_map_object(http, nxt_router_http_conf, skcf);
|
|
|
|
|
if (ret != NXT_OK) {
|
|
|
|
|
nxt_log(task, NXT_LOG_CRIT, "http map error");
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
skcf->listen.handler = nxt_router_conn_init;
|
|
|
|
|
skcf->router_conf = tmcf->conf;
|
|
|
|
|
|
|
|
|
|
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return NXT_OK;
|
|
|
|
|
}
|
|
|
|
|
@@ -239,6 +380,8 @@ nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
conf->sockaddr = sa;
|
|
|
|
|
|
|
|
|
|
conf->listen.sockaddr = sa;
|
|
|
|
|
conf->listen.socklen = sa->socklen;
|
|
|
|
|
conf->listen.address_length = sa->length;
|
|
|
|
|
@@ -252,31 +395,6 @@ nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_sockaddr_t *
|
|
|
|
|
nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mp_t *mp, uint32_t port)
|
|
|
|
|
{
|
|
|
|
|
nxt_sockaddr_t *sa;
|
|
|
|
|
struct sockaddr_in sin;
|
|
|
|
|
|
|
|
|
|
nxt_memzero(&sin, sizeof(struct sockaddr_in));
|
|
|
|
|
|
|
|
|
|
sin.sin_family = AF_INET;
|
|
|
|
|
sin.sin_port = htons(port);
|
|
|
|
|
|
|
|
|
|
sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
|
|
|
|
|
sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
|
|
|
|
|
if (nxt_slow_path(sa == NULL)) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sa->type = SOCK_STREAM;
|
|
|
|
|
|
|
|
|
|
nxt_sockaddr_text(sa);
|
|
|
|
|
|
|
|
|
|
return sa;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
nxt_router_listen_sockets_sort(nxt_router_t *router,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf)
|
|
|
|
|
@@ -297,9 +415,10 @@ nxt_router_listen_sockets_sort(nxt_router_t *router,
|
|
|
|
|
{
|
|
|
|
|
oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
|
|
|
|
|
|
|
|
|
|
if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
|
|
|
|
|
oskcf->listen.sockaddr))
|
|
|
|
|
{
|
|
|
|
|
if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
|
|
|
|
|
nskcf->socket = oskcf->socket;
|
|
|
|
|
nskcf->listen.socket = oskcf->listen.socket;
|
|
|
|
|
|
|
|
|
|
nxt_queue_remove(oqlk);
|
|
|
|
|
nxt_queue_insert_tail(&tmcf->keeping, oqlk);
|
|
|
|
|
|
|
|
|
|
@@ -312,6 +431,7 @@ nxt_router_listen_sockets_sort(nxt_router_t *router,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nxt_queue_add(&tmcf->deleting, &router->sockets);
|
|
|
|
|
nxt_queue_init(&router->sockets);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -319,19 +439,40 @@ static nxt_int_t
|
|
|
|
|
nxt_router_listen_sockets_stub_create(nxt_task_t *task,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf)
|
|
|
|
|
{
|
|
|
|
|
nxt_queue_link_t *qlk, *nqlk;
|
|
|
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_socket_t s;
|
|
|
|
|
nxt_queue_link_t *qlk, *nqlk;
|
|
|
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
|
nxt_router_socket_t *rtsk;
|
|
|
|
|
|
|
|
|
|
for (qlk = nxt_queue_first(&tmcf->pending);
|
|
|
|
|
qlk != nxt_queue_tail(&tmcf->pending);
|
|
|
|
|
qlk = nqlk)
|
|
|
|
|
{
|
|
|
|
|
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
|
|
|
|
|
|
|
|
if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
|
|
|
|
|
rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
|
|
|
|
|
if (nxt_slow_path(rtsk == NULL)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
rtsk->count = 0;
|
|
|
|
|
|
|
|
|
|
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
|
|
|
skcf->socket = rtsk;
|
|
|
|
|
|
|
|
|
|
s = nxt_listen_socket_create0(task, skcf->sockaddr, NXT_NONBLOCK);
|
|
|
|
|
if (nxt_slow_path(s == -1)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
skcf->listen.socket = s;
|
|
|
|
|
|
|
|
|
|
rtsk->fd = s;
|
|
|
|
|
|
|
|
|
|
nqlk = nxt_queue_next(qlk);
|
|
|
|
|
nxt_queue_remove(qlk);
|
|
|
|
|
nxt_queue_insert_tail(&tmcf->creating, qlk);
|
|
|
|
|
@@ -345,13 +486,11 @@ static nxt_int_t
|
|
|
|
|
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
|
|
|
|
|
{
|
|
|
|
|
nxt_mp_t *mp;
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_uint_t n, threads;
|
|
|
|
|
nxt_queue_link_t *qlk;
|
|
|
|
|
nxt_router_engine_conf_t *recf;
|
|
|
|
|
|
|
|
|
|
mp = tmcf->conf->mem_pool;
|
|
|
|
|
threads = tmcf->conf->threads;
|
|
|
|
|
|
|
|
|
|
tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
|
|
|
|
|
@@ -371,15 +510,15 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
|
|
|
|
|
recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link0);
|
|
|
|
|
// STUB
|
|
|
|
|
recf->task = recf->engine->task;
|
|
|
|
|
|
|
|
|
|
if (n < threads) {
|
|
|
|
|
ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
|
|
|
|
|
ret = nxt_router_engine_conf_update(tmcf, recf);
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
|
|
|
|
|
ret = nxt_router_engine_conf_delete(tmcf, recf);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
@@ -404,11 +543,13 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
|
|
|
|
|
// STUB
|
|
|
|
|
recf->task = recf->engine->task;
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
|
|
|
|
|
ret = nxt_router_engine_conf_create(tmcf, recf);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nxt_queue_insert_tail(&router->engines, &recf->engine->link0);
|
|
|
|
|
|
|
|
|
|
n++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -417,39 +558,61 @@ nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_engine_conf_create(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
|
|
|
|
|
nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
nxt_router_engine_conf_t *recf)
|
|
|
|
|
{
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_mp_t *mp;
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_thread_spinlock_t *lock;
|
|
|
|
|
|
|
|
|
|
recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
|
|
|
if (nxt_slow_path(recf->creating == NULL)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
|
|
|
|
|
mp = tmcf->conf->mem_pool;
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating,
|
|
|
|
|
recf->creating, nxt_router_listen_socket_create);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
|
|
|
|
|
ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating,
|
|
|
|
|
recf->creating, nxt_router_listen_socket_create);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lock = &tmcf->conf->router->lock;
|
|
|
|
|
|
|
|
|
|
nxt_thread_spin_lock(lock);
|
|
|
|
|
|
|
|
|
|
nxt_router_engine_socket_count(&tmcf->creating);
|
|
|
|
|
nxt_router_engine_socket_count(&tmcf->updating);
|
|
|
|
|
|
|
|
|
|
nxt_thread_spin_unlock(lock);
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
|
|
|
|
|
nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
nxt_router_engine_conf_t *recf)
|
|
|
|
|
{
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_mp_t *mp;
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
nxt_thread_spinlock_t *lock;
|
|
|
|
|
|
|
|
|
|
recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
|
|
|
if (nxt_slow_path(recf->creating == NULL)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
|
|
|
|
|
mp = tmcf->conf->mem_pool;
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_joints_create(mp, recf, &tmcf->creating,
|
|
|
|
|
recf->creating, nxt_router_listen_socket_create);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
@@ -460,7 +623,7 @@ nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
|
|
|
|
|
ret = nxt_router_engine_joints_create(mp, recf, &tmcf->updating,
|
|
|
|
|
recf->updating, nxt_router_listen_socket_update);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
@@ -471,14 +634,26 @@ nxt_router_engine_conf_update(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
|
|
|
|
|
recf->deleting);
|
|
|
|
|
ret = nxt_router_engine_joints_delete(recf, &tmcf->deleting);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
lock = &tmcf->conf->router->lock;
|
|
|
|
|
|
|
|
|
|
nxt_thread_spin_lock(lock);
|
|
|
|
|
|
|
|
|
|
nxt_router_engine_socket_count(&tmcf->creating);
|
|
|
|
|
|
|
|
|
|
nxt_thread_spin_unlock(lock);
|
|
|
|
|
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
|
|
|
|
|
nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
|
|
|
|
|
nxt_router_engine_conf_t *recf)
|
|
|
|
|
{
|
|
|
|
|
nxt_int_t ret;
|
|
|
|
|
|
|
|
|
|
@@ -487,20 +662,18 @@ nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
|
|
|
|
|
recf->deleting);
|
|
|
|
|
ret = nxt_router_engine_joints_delete(recf, &tmcf->updating);
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
|
|
|
|
|
recf->deleting);
|
|
|
|
|
return nxt_router_engine_joints_delete(recf, &tmcf->deleting);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
|
|
|
|
|
nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_engine_conf_t *recf,
|
|
|
|
|
nxt_queue_t *sockets, nxt_array_t *array,
|
|
|
|
|
nxt_work_handler_t handler)
|
|
|
|
|
{
|
|
|
|
|
nxt_work_t *work;
|
|
|
|
|
@@ -531,15 +704,33 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
|
|
|
|
|
joint->count = 1;
|
|
|
|
|
joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
|
|
|
joint->engine = recf->engine;
|
|
|
|
|
|
|
|
|
|
nxt_queue_insert_tail(&joint->engine->joints, &joint->link);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return NXT_OK;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void
|
|
|
|
|
nxt_router_engine_socket_count(nxt_queue_t *sockets)
|
|
|
|
|
{
|
|
|
|
|
nxt_queue_link_t *qlk;
|
|
|
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
|
|
|
|
|
|
for (qlk = nxt_queue_first(sockets);
|
|
|
|
|
qlk != nxt_queue_tail(sockets);
|
|
|
|
|
qlk = nxt_queue_next(qlk))
|
|
|
|
|
{
|
|
|
|
|
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
|
|
|
skcf->socket->count++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static nxt_int_t
|
|
|
|
|
nxt_router_engine_joints_delete(nxt_task_t *task,
|
|
|
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
|
|
|
|
|
nxt_router_engine_joints_delete(nxt_router_engine_conf_t *recf,
|
|
|
|
|
nxt_queue_t *sockets)
|
|
|
|
|
{
|
|
|
|
|
nxt_work_t *work;
|
|
|
|
|
nxt_queue_link_t *qlk;
|
|
|
|
|
@@ -548,7 +739,7 @@ nxt_router_engine_joints_delete(nxt_task_t *task,
|
|
|
|
|
qlk != nxt_queue_tail(sockets);
|
|
|
|
|
qlk = nxt_queue_next(qlk))
|
|
|
|
|
{
|
|
|
|
|
work = nxt_array_add(array);
|
|
|
|
|
work = nxt_array_add(recf->deleting);
|
|
|
|
|
if (nxt_slow_path(work == NULL)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
}
|
|
|
|
|
@@ -611,7 +802,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
|
|
|
|
|
|
|
|
nxt_queue_insert_tail(&rt->engines, &engine->link);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
process = nxt_runtime_process_find(rt, nxt_pid);
|
|
|
|
|
if (nxt_slow_path(process == NULL)) {
|
|
|
|
|
return NXT_ERROR;
|
|
|
|
|
@@ -640,7 +830,6 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
|
|
|
|
|
|
|
|
nxt_runtime_port_add(rt, port);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ret = nxt_thread_create(&handle, link);
|
|
|
|
|
|
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
|
|
|
@@ -672,11 +861,31 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf)
|
|
|
|
|
nxt_uint_t n;
|
|
|
|
|
nxt_work_t *work;
|
|
|
|
|
|
|
|
|
|
work = recf->creating->elts;
|
|
|
|
|
if (recf->creating != NULL) {
|
|
|
|
|
work = recf->creating->elts;
|
|
|
|
|
|
|
|
|
|
for (n = recf->creating->nelts; n != 0; n--) {
|
|
|
|
|
nxt_event_engine_post(recf->engine, work);
|
|
|
|
|
work++;
|
|
|
|
|
for (n = recf->creating->nelts; n != 0; n--) {
|
|
|
|
|
nxt_event_engine_post(recf->engine, work);
|
|
|
|
|
work++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (recf->updating != NULL) {
|
|
|
|
|
work = recf->updating->elts;
|
|
|
|
|
|
|
|
|
|
for (n = recf->updating->nelts; n != 0; n--) {
|
|
|
|
|
nxt_event_engine_post(recf->engine, work);
|
|
|
|
|
work++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (recf->deleting != NULL) {
|
|
|
|
|
work = recf->deleting->elts;
|
|
|
|
|
|
|
|
|
|
for (n = recf->deleting->nelts; n != 0; n--) {
|
|
|
|
|
nxt_event_engine_post(recf->engine, work);
|
|
|
|
|
work++;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -750,19 +959,19 @@ nxt_inline nxt_listen_event_t *
|
|
|
|
|
nxt_router_listen_event(nxt_queue_t *listen_connections,
|
|
|
|
|
nxt_socket_conf_t *skcf)
|
|
|
|
|
{
|
|
|
|
|
nxt_socket_t socket;
|
|
|
|
|
nxt_queue_link_t *link;
|
|
|
|
|
nxt_socket_t fd;
|
|
|
|
|
nxt_queue_link_t *qlk;
|
|
|
|
|
nxt_listen_event_t *listen;
|
|
|
|
|
|
|
|
|
|
socket = skcf->listen.socket;
|
|
|
|
|
fd = skcf->socket->fd;
|
|
|
|
|
|
|
|
|
|
for (link = nxt_queue_first(listen_connections);
|
|
|
|
|
link != nxt_queue_tail(listen_connections);
|
|
|
|
|
link = nxt_queue_next(link))
|
|
|
|
|
for (qlk = nxt_queue_first(listen_connections);
|
|
|
|
|
qlk != nxt_queue_tail(listen_connections);
|
|
|
|
|
qlk = nxt_queue_next(qlk))
|
|
|
|
|
{
|
|
|
|
|
listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
|
|
|
|
|
listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
|
|
|
|
|
|
|
|
|
|
if (socket == listen->socket.fd) {
|
|
|
|
|
if (fd == listen->socket.fd) {
|
|
|
|
|
return listen;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -834,25 +1043,23 @@ static void
|
|
|
|
|
nxt_router_listen_socket_release(nxt_task_t *task,
|
|
|
|
|
nxt_socket_conf_joint_t *joint)
|
|
|
|
|
{
|
|
|
|
|
nxt_socket_t s;
|
|
|
|
|
nxt_listen_socket_t *ls;
|
|
|
|
|
nxt_router_socket_t *rtsk;
|
|
|
|
|
nxt_thread_spinlock_t *lock;
|
|
|
|
|
|
|
|
|
|
s = -1;
|
|
|
|
|
ls = &joint->socket_conf->listen;
|
|
|
|
|
rtsk = joint->socket_conf->socket;
|
|
|
|
|
lock = &joint->socket_conf->router_conf->router->lock;
|
|
|
|
|
|
|
|
|
|
nxt_thread_spin_lock(lock);
|
|
|
|
|
|
|
|
|
|
if (--ls->count == 0) {
|
|
|
|
|
s = ls->socket;
|
|
|
|
|
ls->socket = -1;
|
|
|
|
|
if (--rtsk->count != 0) {
|
|
|
|
|
rtsk = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nxt_thread_spin_unlock(lock);
|
|
|
|
|
|
|
|
|
|
if (s != -1) {
|
|
|
|
|
nxt_socket_close(task, s);
|
|
|
|
|
if (rtsk != NULL) {
|
|
|
|
|
nxt_socket_close(task, rtsk->fd);
|
|
|
|
|
nxt_free(rtsk);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
nxt_router_conf_release(task, joint);
|
|
|
|
|
@@ -894,6 +1101,7 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
|
|
|
|
|
nxt_thread_spin_unlock(lock);
|
|
|
|
|
|
|
|
|
|
if (rtcf != NULL) {
|
|
|
|
|
nxt_debug(task, "old router conf is destroyed");
|
|
|
|
|
nxt_mp_destroy(rtcf->mem_pool);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|