1086 lines
27 KiB
C
1086 lines
27 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) Valentin V. Bartenev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_router.h>
|
|
|
|
|
|
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
|
|
nxt_router_t *router);
|
|
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
|
|
nxt_router_temp_conf_t *tmcf);
|
|
|
|
static nxt_int_t nxt_router_stub_conf(nxt_task_t *task,
|
|
nxt_router_temp_conf_t *tmcf);
|
|
static nxt_int_t nxt_router_listen_sockets_stub_create(nxt_task_t *task,
|
|
nxt_router_temp_conf_t *tmcf);
|
|
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
|
|
nxt_mem_pool_t *mp, nxt_sockaddr_t *sa);
|
|
static nxt_sockaddr_t *nxt_router_listen_sockaddr_stub(nxt_task_t *task,
|
|
nxt_mem_pool_t *mp, uint32_t port);
|
|
|
|
static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
|
|
nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
|
|
const nxt_event_interface_t *interface);
|
|
static nxt_int_t nxt_router_engine_conf_create(nxt_task_t *task,
|
|
nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
|
|
nxt_router_engine_conf_t *recf);
|
|
static nxt_int_t nxt_router_engine_conf_update(nxt_task_t *task,
|
|
nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
|
|
nxt_router_engine_conf_t *recf);
|
|
static nxt_int_t nxt_router_engine_conf_delete(nxt_task_t *task,
|
|
nxt_mem_pool_t *mp, nxt_router_temp_conf_t *tmcf,
|
|
nxt_router_engine_conf_t *recf);
|
|
static nxt_int_t nxt_router_engine_joints_create(nxt_task_t *task,
|
|
nxt_mem_pool_t *mp, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
|
|
nxt_array_t *array, nxt_work_handler_t handler);
|
|
static nxt_int_t nxt_router_engine_joints_delete(nxt_task_t *task,
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array);
|
|
|
|
static nxt_int_t nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
nxt_router_temp_conf_t *tmcf);
|
|
static nxt_int_t nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
nxt_event_engine_t *engine);
|
|
|
|
static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
|
|
static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
|
|
|
|
static void nxt_router_thread_start(void *data);
|
|
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_router_listen_socket_delete(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_router_listen_socket_release(nxt_task_t *task,
|
|
nxt_socket_conf_joint_t *joint);
|
|
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_router_conf_release(nxt_task_t *task,
|
|
nxt_socket_conf_joint_t *joint);
|
|
|
|
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
|
|
static nxt_msec_t nxt_router_conn_timeout_value(nxt_event_conn_t *c,
|
|
uintptr_t data);
|
|
|
|
|
|
nxt_int_t
|
|
nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
|
|
{
|
|
nxt_int_t ret;
|
|
nxt_router_t *router;
|
|
nxt_router_temp_conf_t *tmcf;
|
|
const nxt_event_interface_t *interface;
|
|
|
|
router = nxt_zalloc(sizeof(nxt_router_t));
|
|
if (nxt_slow_path(router == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
nxt_queue_init(&router->engines);
|
|
nxt_queue_init(&router->sockets);
|
|
|
|
/**/
|
|
|
|
tmcf = nxt_router_temp_conf(task, router);
|
|
if (nxt_slow_path(tmcf == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
ret = nxt_router_stub_conf(task, tmcf);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
nxt_router_listen_sockets_sort(router, tmcf);
|
|
|
|
ret = nxt_router_listen_sockets_stub_create(task, tmcf);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
interface = nxt_service_get(rt->services, "engine", NULL);
|
|
|
|
ret = nxt_router_engines_create(task, router, tmcf, interface);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
ret = nxt_router_threads_create(task, rt, tmcf);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
nxt_router_engines_post(tmcf);
|
|
|
|
nxt_queue_add(&router->sockets, &tmcf->updating);
|
|
nxt_queue_add(&router->sockets, &tmcf->creating);
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_router_temp_conf_t *
|
|
nxt_router_temp_conf(nxt_task_t *task, nxt_router_t *router)
|
|
{
|
|
nxt_mem_pool_t *mp, *tmp;
|
|
nxt_router_conf_t *rtcf;
|
|
nxt_router_temp_conf_t *tmcf;
|
|
|
|
mp = nxt_mem_pool_create(1024);
|
|
if (nxt_slow_path(mp == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
rtcf = nxt_mem_zalloc(mp, sizeof(nxt_router_conf_t));
|
|
if (nxt_slow_path(rtcf == NULL)) {
|
|
goto fail;
|
|
}
|
|
|
|
rtcf->mem_pool = mp;
|
|
rtcf->router = router;
|
|
rtcf->count = 1;
|
|
|
|
tmp = nxt_mem_pool_create(1024);
|
|
if (nxt_slow_path(tmp == NULL)) {
|
|
goto fail;
|
|
}
|
|
|
|
tmcf = nxt_mem_zalloc(tmp, sizeof(nxt_router_temp_conf_t));
|
|
if (nxt_slow_path(tmcf == NULL)) {
|
|
goto temp_fail;
|
|
}
|
|
|
|
tmcf->mem_pool = tmp;
|
|
tmcf->conf = rtcf;
|
|
|
|
tmcf->engines = nxt_array_create(tmcf->mem_pool, 4,
|
|
sizeof(nxt_router_engine_conf_t));
|
|
if (nxt_slow_path(tmcf->engines == NULL)) {
|
|
goto temp_fail;
|
|
}
|
|
|
|
nxt_queue_init(&tmcf->deleting);
|
|
nxt_queue_init(&tmcf->keeping);
|
|
nxt_queue_init(&tmcf->updating);
|
|
nxt_queue_init(&tmcf->pending);
|
|
nxt_queue_init(&tmcf->creating);
|
|
|
|
return tmcf;
|
|
|
|
temp_fail:
|
|
|
|
nxt_mem_pool_destroy(tmp);
|
|
|
|
fail:
|
|
|
|
nxt_mem_pool_destroy(mp);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_stub_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
|
|
{
|
|
nxt_sockaddr_t *sa;
|
|
nxt_mem_pool_t *mp;
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
tmcf->conf->threads = 1;
|
|
|
|
mp = tmcf->conf->mem_pool;
|
|
|
|
sa = nxt_router_listen_sockaddr_stub(task, mp, 8000);
|
|
skcf = nxt_router_socket_conf(task, mp, sa);
|
|
|
|
skcf->listen.handler = nxt_router_conn_init;
|
|
skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
|
|
+ sizeof(nxt_event_conn_proxy_t)
|
|
+ sizeof(nxt_event_conn_t)
|
|
+ 4 * sizeof(nxt_buf_t);
|
|
|
|
skcf->header_buffer_size = 2048;
|
|
skcf->large_header_buffer_size = 8192;
|
|
skcf->header_read_timeout = 5000;
|
|
|
|
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
|
|
|
|
sa = nxt_router_listen_sockaddr_stub(task, mp, 8001);
|
|
skcf = nxt_router_socket_conf(task, mp, sa);
|
|
|
|
skcf->listen.handler = nxt_stream_connection_init;
|
|
skcf->listen.mem_pool_size = nxt_listen_socket_pool_min_size(&skcf->listen)
|
|
+ sizeof(nxt_event_conn_proxy_t)
|
|
+ sizeof(nxt_event_conn_t)
|
|
+ 4 * sizeof(nxt_buf_t);
|
|
|
|
skcf->header_read_timeout = 5000;
|
|
|
|
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_socket_conf_t *
|
|
nxt_router_socket_conf(nxt_task_t *task, nxt_mem_pool_t *mp, nxt_sockaddr_t *sa)
|
|
{
|
|
nxt_socket_conf_t *conf;
|
|
|
|
conf = nxt_mem_zalloc(mp, sizeof(nxt_socket_conf_t));
|
|
if (nxt_slow_path(conf == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
conf->listen.sockaddr = sa;
|
|
|
|
conf->listen.socket = -1;
|
|
conf->listen.backlog = NXT_LISTEN_BACKLOG;
|
|
conf->listen.flags = NXT_NONBLOCK;
|
|
conf->listen.read_after_accept = 1;
|
|
|
|
return conf;
|
|
}
|
|
|
|
|
|
static nxt_sockaddr_t *
|
|
nxt_router_listen_sockaddr_stub(nxt_task_t *task, nxt_mem_pool_t *mp,
|
|
uint32_t port)
|
|
{
|
|
nxt_sockaddr_t *sa;
|
|
struct sockaddr_in sin;
|
|
|
|
nxt_memzero(&sin, sizeof(struct sockaddr_in));
|
|
|
|
sin.sin_family = AF_INET;
|
|
sin.sin_port = htons(port);
|
|
|
|
sa = nxt_sockaddr_create(mp, (struct sockaddr *) &sin,
|
|
sizeof(struct sockaddr_in), NXT_INET_ADDR_STR_LEN);
|
|
if (nxt_slow_path(sa == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
sa->type = SOCK_STREAM;
|
|
|
|
nxt_sockaddr_text(sa);
|
|
|
|
return sa;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_listen_sockets_sort(nxt_router_t *router,
|
|
nxt_router_temp_conf_t *tmcf)
|
|
{
|
|
nxt_queue_link_t *nqlk, *oqlk, *next;
|
|
nxt_socket_conf_t *nskcf, *oskcf;
|
|
|
|
for (nqlk = nxt_queue_first(&tmcf->pending);
|
|
nqlk != nxt_queue_tail(&tmcf->pending);
|
|
nqlk = next)
|
|
{
|
|
next = nxt_queue_next(nqlk);
|
|
nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
|
|
|
|
for (oqlk = nxt_queue_first(&router->sockets);
|
|
oqlk != nxt_queue_tail(&router->sockets);
|
|
oqlk = nxt_queue_next(oqlk))
|
|
{
|
|
oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
|
|
|
|
if (nxt_sockaddr_cmp(nskcf->listen.sockaddr,
|
|
oskcf->listen.sockaddr))
|
|
{
|
|
nxt_queue_remove(oqlk);
|
|
nxt_queue_insert_tail(&tmcf->keeping, oqlk);
|
|
|
|
nxt_queue_remove(nqlk);
|
|
nxt_queue_insert_tail(&tmcf->updating, nqlk);
|
|
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
nxt_queue_add(&tmcf->deleting, &router->sockets);
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_listen_sockets_stub_create(nxt_task_t *task,
|
|
nxt_router_temp_conf_t *tmcf)
|
|
{
|
|
nxt_queue_link_t *qlk, *nqlk;
|
|
nxt_socket_conf_t *skcf;
|
|
|
|
for (qlk = nxt_queue_first(&tmcf->pending);
|
|
qlk != nxt_queue_tail(&tmcf->pending);
|
|
qlk = nqlk)
|
|
{
|
|
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
|
|
if (nxt_listen_socket_create(task, &skcf->listen, 0) != NXT_OK) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
nqlk = nxt_queue_next(qlk);
|
|
nxt_queue_remove(qlk);
|
|
nxt_queue_insert_tail(&tmcf->creating, qlk);
|
|
}
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_engines_create(nxt_task_t *task, nxt_router_t *router,
|
|
nxt_router_temp_conf_t *tmcf, const nxt_event_interface_t *interface)
|
|
{
|
|
nxt_int_t ret;
|
|
nxt_uint_t n, threads;
|
|
nxt_mem_pool_t *mp;
|
|
nxt_queue_link_t *qlk;
|
|
nxt_router_engine_conf_t *recf;
|
|
|
|
mp = tmcf->conf->mem_pool;
|
|
threads = tmcf->conf->threads;
|
|
|
|
tmcf->engines = nxt_array_create(tmcf->mem_pool, threads,
|
|
sizeof(nxt_router_engine_conf_t));
|
|
if (nxt_slow_path(tmcf->engines == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
n = 0;
|
|
|
|
for (qlk = nxt_queue_first(&router->engines);
|
|
qlk != nxt_queue_tail(&router->engines);
|
|
qlk = nxt_queue_next(qlk))
|
|
{
|
|
recf = nxt_array_zero_add(tmcf->engines);
|
|
if (nxt_slow_path(recf == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
recf->engine = nxt_queue_link_data(qlk, nxt_event_engine_t, link);
|
|
// STUB
|
|
recf->task = recf->engine->task;
|
|
|
|
if (n < threads) {
|
|
ret = nxt_router_engine_conf_update(task, mp, tmcf, recf);
|
|
|
|
} else {
|
|
ret = nxt_router_engine_conf_delete(task, mp, tmcf, recf);
|
|
}
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
n++;
|
|
}
|
|
|
|
tmcf->new_threads = n;
|
|
|
|
while (n < threads) {
|
|
recf = nxt_array_zero_add(tmcf->engines);
|
|
if (nxt_slow_path(recf == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
recf->engine = nxt_event_engine_create(task, interface, NULL, 0, 0);
|
|
if (nxt_slow_path(recf->engine == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
// STUB
|
|
recf->task = recf->engine->task;
|
|
|
|
ret = nxt_router_engine_conf_create(task, mp, tmcf, recf);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
n++;
|
|
}
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_engine_conf_create(nxt_task_t *task, nxt_mem_pool_t *mp,
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
|
|
{
|
|
nxt_int_t ret;
|
|
|
|
recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
if (nxt_slow_path(recf->creating == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
|
|
recf->creating, nxt_router_listen_socket_create);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
return nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
|
|
recf->creating, nxt_router_listen_socket_create);
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_engine_conf_update(nxt_task_t *task, nxt_mem_pool_t *mp,
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
|
|
{
|
|
nxt_int_t ret;
|
|
|
|
recf->creating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
if (nxt_slow_path(recf->creating == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->creating,
|
|
recf->creating, nxt_router_listen_socket_create);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
recf->updating = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
if (nxt_slow_path(recf->updating == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
ret = nxt_router_engine_joints_create(task, mp, recf, &tmcf->updating,
|
|
recf->updating, nxt_router_listen_socket_update);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
if (nxt_slow_path(recf->deleting == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
|
|
recf->deleting);
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_engine_conf_delete(nxt_task_t *task, nxt_mem_pool_t *mp,
|
|
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf)
|
|
{
|
|
nxt_int_t ret;
|
|
|
|
recf->deleting = nxt_array_create(tmcf->mem_pool, 4, sizeof(nxt_work_t));
|
|
if (nxt_slow_path(recf->deleting == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
ret = nxt_router_engine_joints_delete(task, recf, &tmcf->updating,
|
|
recf->deleting);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
|
|
return nxt_router_engine_joints_delete(task, recf, &tmcf->deleting,
|
|
recf->deleting);
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_engine_joints_create(nxt_task_t *task, nxt_mem_pool_t *mp,
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array,
|
|
nxt_work_handler_t handler)
|
|
{
|
|
nxt_work_t *work;
|
|
nxt_queue_link_t *qlk;
|
|
nxt_socket_conf_joint_t *joint;
|
|
|
|
for (qlk = nxt_queue_first(sockets);
|
|
qlk != nxt_queue_tail(sockets);
|
|
qlk = nxt_queue_next(qlk))
|
|
{
|
|
work = nxt_array_add(array);
|
|
if (nxt_slow_path(work == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
work->next = NULL;
|
|
work->handler = handler;
|
|
work->task = &recf->task;
|
|
work->obj = recf->engine;
|
|
|
|
joint = nxt_mem_alloc(mp, sizeof(nxt_socket_conf_joint_t));
|
|
if (nxt_slow_path(joint == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
work->data = joint;
|
|
|
|
joint->count = 1;
|
|
joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
}
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_engine_joints_delete(nxt_task_t *task,
|
|
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array)
|
|
{
|
|
nxt_work_t *work;
|
|
nxt_queue_link_t *qlk;
|
|
|
|
for (qlk = nxt_queue_first(sockets);
|
|
qlk != nxt_queue_tail(sockets);
|
|
qlk = nxt_queue_next(qlk))
|
|
{
|
|
work = nxt_array_add(array);
|
|
if (nxt_slow_path(work == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
work->next = NULL;
|
|
work->handler = nxt_router_listen_socket_delete;
|
|
work->task = &recf->task;
|
|
work->obj = recf->engine;
|
|
work->data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
|
|
}
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_threads_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
nxt_router_temp_conf_t *tmcf)
|
|
{
|
|
nxt_int_t ret;
|
|
nxt_uint_t i, threads;
|
|
nxt_router_engine_conf_t *recf;
|
|
|
|
recf = tmcf->engines->elts;
|
|
threads = tmcf->conf->threads;
|
|
|
|
for (i = tmcf->new_threads; i < threads; i++) {
|
|
ret = nxt_router_thread_create(task, rt, recf[i].engine);
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
|
|
nxt_event_engine_t *engine)
|
|
{
|
|
nxt_int_t ret;
|
|
nxt_thread_link_t *link;
|
|
nxt_thread_handle_t handle;
|
|
|
|
link = nxt_zalloc(sizeof(nxt_thread_link_t));
|
|
|
|
if (nxt_slow_path(link == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
link->start = nxt_router_thread_start;
|
|
link->engine = engine;
|
|
link->work.handler = nxt_router_thread_exit_handler;
|
|
link->work.task = task;
|
|
link->work.data = link;
|
|
|
|
nxt_queue_insert_tail(&rt->engines, &engine->link);
|
|
|
|
ret = nxt_thread_create(&handle, link);
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
nxt_queue_remove(&engine->link);
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
|
|
{
|
|
nxt_uint_t n;
|
|
nxt_router_engine_conf_t *recf;
|
|
|
|
recf = tmcf->engines->elts;
|
|
|
|
for (n = tmcf->engines->nelts; n != 0; n--) {
|
|
nxt_router_engine_post(recf);
|
|
recf++;
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_engine_post(nxt_router_engine_conf_t *recf)
|
|
{
|
|
nxt_uint_t n;
|
|
nxt_work_t *work;
|
|
|
|
work = recf->creating->elts;
|
|
|
|
for (n = recf->creating->nelts; n != 0; n--) {
|
|
nxt_event_engine_post(recf->engine, work);
|
|
work++;
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_thread_start(void *data)
|
|
{
|
|
nxt_thread_t *thread;
|
|
nxt_thread_link_t *link;
|
|
nxt_event_engine_t *engine;
|
|
|
|
link = data;
|
|
engine = link->engine;
|
|
|
|
thread = nxt_thread();
|
|
|
|
/* STUB */
|
|
thread->runtime = engine->task.thread->runtime;
|
|
|
|
engine->task.thread = thread;
|
|
engine->task.log = thread->log;
|
|
thread->engine = engine;
|
|
thread->fiber = &engine->fibers->fiber;
|
|
|
|
engine->mem_pool = nxt_mem_cache_pool_create(4096, 1024, 1024, 64);
|
|
|
|
nxt_event_engine_start(engine);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_listen_event_t *listen;
|
|
nxt_listen_socket_t *ls;
|
|
nxt_socket_conf_joint_t *joint;
|
|
|
|
joint = data;
|
|
|
|
ls = &joint->socket_conf->listen;
|
|
|
|
listen = nxt_listen_event(task, ls);
|
|
if (nxt_slow_path(listen == NULL)) {
|
|
nxt_router_listen_socket_release(task, joint);
|
|
return;
|
|
}
|
|
|
|
listen->socket.data = joint;
|
|
}
|
|
|
|
|
|
nxt_inline nxt_listen_event_t *
|
|
nxt_router_listen_event(nxt_queue_t *listen_connections,
|
|
nxt_socket_conf_t *skcf)
|
|
{
|
|
nxt_socket_t socket;
|
|
nxt_queue_link_t *link;
|
|
nxt_listen_event_t *listen;
|
|
|
|
socket = skcf->listen.socket;
|
|
|
|
for (link = nxt_queue_first(listen_connections);
|
|
link != nxt_queue_tail(listen_connections);
|
|
link = nxt_queue_next(link))
|
|
{
|
|
listen = nxt_queue_link_data(link, nxt_listen_event_t, link);
|
|
|
|
if (socket == listen->socket.fd) {
|
|
return listen;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_event_engine_t *engine;
|
|
nxt_listen_event_t *listen;
|
|
nxt_socket_conf_joint_t *joint, *old;
|
|
|
|
engine = obj;
|
|
joint = data;
|
|
|
|
listen = nxt_router_listen_event(&engine->listen_connections,
|
|
joint->socket_conf);
|
|
|
|
old = listen->socket.data;
|
|
listen->socket.data = joint;
|
|
|
|
nxt_router_conf_release(task, old);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_socket_conf_t *skcf;
|
|
nxt_listen_event_t *listen;
|
|
nxt_event_engine_t *engine;
|
|
|
|
engine = obj;
|
|
skcf = data;
|
|
|
|
listen = nxt_router_listen_event(&engine->listen_connections, skcf);
|
|
|
|
nxt_fd_event_delete(engine, &listen->socket);
|
|
|
|
listen->timer.handler = nxt_router_listen_socket_close;
|
|
listen->timer.work_queue = &engine->fast_work_queue;
|
|
|
|
nxt_timer_add(engine, &listen->timer, 0);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_timer_t *timer;
|
|
nxt_listen_event_t *listen;
|
|
nxt_socket_conf_joint_t *joint;
|
|
|
|
timer = obj;
|
|
listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
|
|
joint = listen->socket.data;
|
|
|
|
nxt_queue_remove(&listen->link);
|
|
nxt_free(listen);
|
|
|
|
nxt_router_listen_socket_release(task, joint);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_listen_socket_release(nxt_task_t *task,
|
|
nxt_socket_conf_joint_t *joint)
|
|
{
|
|
nxt_socket_t s;
|
|
nxt_listen_socket_t *ls;
|
|
nxt_thread_spinlock_t *lock;
|
|
|
|
s = -1;
|
|
ls = &joint->socket_conf->listen;
|
|
lock = &joint->socket_conf->router_conf->router->lock;
|
|
|
|
nxt_thread_spin_lock(lock);
|
|
|
|
if (--ls->count == 0) {
|
|
s = ls->socket;
|
|
ls->socket = -1;
|
|
}
|
|
|
|
nxt_thread_spin_unlock(lock);
|
|
|
|
if (s != -1) {
|
|
nxt_socket_close(task, s);
|
|
}
|
|
|
|
nxt_router_conf_release(task, joint);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
|
|
{
|
|
nxt_socket_conf_t *skcf;
|
|
nxt_router_conf_t *rtcf;
|
|
nxt_thread_spinlock_t *lock;
|
|
|
|
nxt_debug(task, "conf joint count: %D", joint->count);
|
|
|
|
if (--joint->count != 0) {
|
|
return;
|
|
}
|
|
|
|
nxt_queue_remove(&joint->link);
|
|
|
|
skcf = joint->socket_conf;
|
|
rtcf = skcf->router_conf;
|
|
lock = &rtcf->router->lock;
|
|
|
|
nxt_thread_spin_lock(lock);
|
|
|
|
if (--skcf->count != 0) {
|
|
rtcf = NULL;
|
|
|
|
} else {
|
|
nxt_queue_remove(&skcf->link);
|
|
|
|
if (--rtcf->count != 0) {
|
|
rtcf = NULL;
|
|
}
|
|
}
|
|
|
|
nxt_thread_spin_unlock(lock);
|
|
|
|
if (rtcf != NULL) {
|
|
nxt_mem_pool_destroy(rtcf->mem_pool);
|
|
}
|
|
|
|
if (nxt_queue_is_empty(&joint->engine->joints)) {
|
|
nxt_thread_exit(task->thread);
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_thread_link_t *link;
|
|
nxt_event_engine_t *engine;
|
|
nxt_thread_handle_t handle;
|
|
|
|
handle = (nxt_thread_handle_t) obj;
|
|
link = data;
|
|
|
|
nxt_thread_wait(handle);
|
|
|
|
engine = link->engine;
|
|
|
|
nxt_queue_remove(&engine->link);
|
|
|
|
nxt_mem_cache_pool_destroy(engine->mem_pool);
|
|
|
|
nxt_event_engine_free(engine);
|
|
|
|
nxt_free(link);
|
|
|
|
// TODO: free port
|
|
}
|
|
|
|
|
|
static const nxt_event_conn_state_t nxt_router_conn_read_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_router_conn_http_header_parse,
|
|
.close_handler = nxt_router_conn_close,
|
|
.error_handler = nxt_router_conn_error,
|
|
|
|
.timer_handler = nxt_router_conn_timeout,
|
|
.timer_value = nxt_router_conn_timeout_value,
|
|
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
size_t size;
|
|
nxt_event_conn_t *c;
|
|
nxt_event_engine_t *engine;
|
|
nxt_socket_conf_joint_t *joint;
|
|
|
|
c = obj;
|
|
joint = data;
|
|
|
|
nxt_debug(task, "router conn init");
|
|
|
|
joint->count++;
|
|
|
|
size = joint->socket_conf->header_buffer_size;
|
|
c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
|
|
|
c->socket.data = NULL;
|
|
|
|
engine = task->thread->engine;
|
|
c->read_work_queue = &engine->fast_work_queue;
|
|
c->write_work_queue = &engine->fast_work_queue;
|
|
|
|
c->read_state = &nxt_router_conn_read_state;
|
|
|
|
nxt_event_conn_read(engine, c);
|
|
}
|
|
|
|
|
|
static const nxt_event_conn_state_t nxt_router_conn_write_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_router_conn_close,
|
|
.close_handler = nxt_router_conn_close,
|
|
.error_handler = nxt_router_conn_error,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
size_t size;
|
|
nxt_int_t ret;
|
|
nxt_buf_t *b;
|
|
nxt_event_conn_t *c;
|
|
nxt_socket_conf_joint_t *joint;
|
|
nxt_http_request_parse_t *rp;
|
|
|
|
c = obj;
|
|
rp = data;
|
|
|
|
nxt_debug(task, "router conn http header parse");
|
|
|
|
if (rp == NULL) {
|
|
rp = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_http_request_parse_t));
|
|
if (nxt_slow_path(rp == NULL)) {
|
|
nxt_router_conn_close(task, c, data);
|
|
return;
|
|
}
|
|
|
|
c->socket.data = rp;
|
|
}
|
|
|
|
ret = nxt_http_parse_request(rp, &c->read->mem);
|
|
|
|
nxt_debug(task, "http parse request: %d", ret);
|
|
|
|
switch (nxt_expect(NXT_DONE, ret)) {
|
|
|
|
case NXT_DONE:
|
|
break;
|
|
|
|
case NXT_ERROR:
|
|
nxt_router_conn_close(task, c, data);
|
|
return;
|
|
|
|
default: /* NXT_AGAIN */
|
|
|
|
if (c->read->mem.free == c->read->mem.end) {
|
|
joint = c->listen->socket.data;
|
|
size = joint->socket_conf->large_header_buffer_size,
|
|
|
|
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
|
if (nxt_slow_path(b == NULL)) {
|
|
nxt_router_conn_close(task, c, data);
|
|
return;
|
|
}
|
|
|
|
size = c->read->mem.free - c->read->mem.pos;
|
|
nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
|
|
|
|
b->mem.free += size;
|
|
c->read = b;
|
|
}
|
|
|
|
nxt_event_conn_read(task->thread->engine, c);
|
|
return;
|
|
}
|
|
|
|
c->write = c->read;
|
|
c->write->mem.pos = c->write->mem.start;
|
|
c->write_state = &nxt_router_conn_write_state;
|
|
|
|
nxt_event_conn_write(task->thread->engine, c);
|
|
}
|
|
|
|
|
|
static const nxt_event_conn_state_t nxt_router_conn_close_state
|
|
nxt_aligned(64) =
|
|
{
|
|
.ready_handler = nxt_router_conn_free,
|
|
};
|
|
|
|
|
|
static void
|
|
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_event_conn_t *c;
|
|
|
|
c = obj;
|
|
|
|
nxt_debug(task, "router conn close");
|
|
|
|
c->write_state = &nxt_router_conn_close_state;
|
|
|
|
nxt_event_conn_close(task->thread->engine, c);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_event_conn_t *c;
|
|
nxt_socket_conf_joint_t *joint;
|
|
|
|
c = obj;
|
|
|
|
nxt_debug(task, "router conn close done");
|
|
|
|
joint = c->listen->socket.data;
|
|
nxt_router_conf_release(task, joint);
|
|
|
|
nxt_mem_pool_destroy(c->mem_pool);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_event_conn_t *c;
|
|
|
|
c = obj;
|
|
|
|
nxt_debug(task, "router conn error");
|
|
|
|
c->write_state = &nxt_router_conn_close_state;
|
|
|
|
nxt_event_conn_close(task->thread->engine, c);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_timer_t *timer;
|
|
nxt_event_conn_t *c;
|
|
|
|
timer = obj;
|
|
|
|
nxt_debug(task, "router conn timeout");
|
|
|
|
c = nxt_event_read_timer_conn(timer);
|
|
|
|
c->write_state = &nxt_router_conn_close_state;
|
|
|
|
nxt_event_conn_close(task->thread->engine, c);
|
|
}
|
|
|
|
|
|
static nxt_msec_t
|
|
nxt_router_conn_timeout_value(nxt_event_conn_t *c, uintptr_t data)
|
|
{
|
|
nxt_socket_conf_joint_t *joint;
|
|
|
|
joint = c->listen->socket.data;
|
|
|
|
return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
|
|
}
|