Router: fixed segfault after configuration change.

This commit is contained in:
Igor Sysoev
2017-10-18 18:05:47 +03:00
parent 09ef66d39c
commit 67c066b026
7 changed files with 258 additions and 193 deletions

View File

@@ -155,7 +155,9 @@ struct nxt_conn_s {
nxt_task_t task;
nxt_log_t log;
nxt_listen_event_t *listen;
/* STUB: socket.data should be used later. */
void *joint;
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
const char *action;

View File

@@ -101,7 +101,6 @@ nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
lev->next = c;
c->socket.read_work_queue = lev->socket.read_work_queue;
c->socket.write_ready = 1;
c->listen = lev;
c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
if (nxt_fast_path(c->remote != NULL)) {

View File

@@ -369,7 +369,7 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
ls->sockaddr->type = sa->type;
nxt_sockaddr_text(ls->sockaddr);
nxt_listen_socket_remote_size(ls, sa);
nxt_listen_socket_remote_size(ls);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;

View File

@@ -177,9 +177,9 @@ fail:
void
nxt_listen_socket_remote_size(nxt_listen_socket_t *ls, nxt_sockaddr_t *sa)
nxt_listen_socket_remote_size(nxt_listen_socket_t *ls)
{
switch (sa->u.sockaddr.sa_family) {
switch (ls->sockaddr->u.sockaddr.sa_family) {
#if (NXT_INET6)

View File

@@ -18,6 +18,8 @@ typedef struct {
nxt_sockaddr_t *sockaddr;
uint32_t count;
uint8_t flags;
uint8_t read_after_accept; /* 1 bit */
@@ -56,8 +58,7 @@ NXT_EXPORT nxt_int_t nxt_listen_socket_create(nxt_task_t *task,
nxt_listen_socket_t *ls, nxt_bool_t bind_test);
NXT_EXPORT nxt_int_t nxt_listen_socket_update(nxt_task_t *task,
nxt_listen_socket_t *ls, nxt_listen_socket_t *prev);
NXT_EXPORT void nxt_listen_socket_remote_size(nxt_listen_socket_t *ls,
nxt_sockaddr_t *sa);
NXT_EXPORT void nxt_listen_socket_remote_size(nxt_listen_socket_t *ls);
NXT_EXPORT size_t nxt_listen_socket_pool_min_size(nxt_listen_socket_t *ls);

View File

@@ -75,8 +75,6 @@ static void nxt_router_conf_error(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf);
static void nxt_router_conf_send(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_port_msg_type_t type);
static void nxt_router_listen_sockets_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf);
static nxt_int_t nxt_router_conf_create(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, u_char *start, u_char *end);
@@ -89,8 +87,10 @@ static void nxt_router_listen_socket_ready(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_listen_socket_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp,
nxt_sockaddr_t *sa);
static nxt_socket_conf_t *nxt_router_socket_conf(nxt_task_t *task,
nxt_router_temp_conf_t *tmcf, nxt_str_t *name);
static nxt_int_t nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa);
static nxt_int_t nxt_router_engines_create(nxt_task_t *task,
nxt_router_t *router, nxt_router_temp_conf_t *tmcf,
@@ -101,7 +101,6 @@ static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf);
static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler);
@@ -133,10 +132,10 @@ static void nxt_router_worker_thread_quit(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_close(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
static void nxt_router_thread_exit_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_t *skcf);
static void nxt_router_conf_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint);
@@ -152,6 +151,7 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_process_http_request(nxt_task_t *task,
@@ -820,13 +820,13 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
qlk = nxt_queue_next(qlk))
{
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
s = skcf->listen.socket;
s = skcf->listen->socket;
if (s != -1) {
nxt_socket_close(task, s);
}
nxt_free(skcf->socket);
nxt_free(skcf->listen);
}
router = tmcf->conf->router;
@@ -960,7 +960,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_int_t ret;
nxt_str_t name;
nxt_app_t *app, *prev;
nxt_sockaddr_t *sa;
nxt_router_t *router;
nxt_conf_value_t *conf, *http;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
@@ -998,6 +998,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
return NXT_ERROR;
}
router = tmcf->conf->router;
next = 0;
for ( ;; ) {
@@ -1027,7 +1029,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_debug(task, "application conf \"%V\"", &app->conf);
prev = nxt_router_app_find(&tmcf->conf->router->apps, &name);
prev = nxt_router_app_find(&router->apps, &name);
if (prev != NULL && nxt_strstr_eq(&app->conf, &prev->conf)) {
nxt_free(app);
@@ -1126,18 +1128,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
break;
}
sa = nxt_sockaddr_parse(mp, &name);
if (sa == NULL) {
nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", &name);
goto fail;
}
sa->type = SOCK_STREAM;
nxt_debug(task, "router listener: \"%*s\"",
sa->length, nxt_sockaddr_start(sa));
skcf = nxt_router_socket_conf(task, mp, sa);
skcf = nxt_router_socket_conf(task, tmcf, &name);
if (skcf == NULL) {
goto fail;
}
@@ -1169,16 +1160,15 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
skcf->listen.handler = nxt_router_conn_init;
skcf->listen->handler = nxt_router_conn_init;
skcf->router_conf = tmcf->conf;
skcf->router_conf->count++;
skcf->application = nxt_router_listener_application(tmcf,
&lscf.application);
nxt_queue_insert_tail(&tmcf->pending, &skcf->link);
}
nxt_router_listen_sockets_sort(tmcf->conf->router, tmcf);
nxt_queue_add(&tmcf->deleting, &router->sockets);
nxt_queue_init(&router->sockets);
return NXT_OK;
@@ -1233,67 +1223,117 @@ nxt_router_listener_application(nxt_router_temp_conf_t *tmcf, nxt_str_t *name)
static nxt_socket_conf_t *
nxt_router_socket_conf(nxt_task_t *task, nxt_mp_t *mp, nxt_sockaddr_t *sa)
nxt_router_socket_conf(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t *name)
{
nxt_socket_conf_t *skcf;
size_t size;
nxt_int_t ret;
nxt_bool_t wildcard;
nxt_sockaddr_t *sa;
nxt_socket_conf_t *skcf;
nxt_listen_socket_t *ls;
skcf = nxt_mp_zget(mp, sizeof(nxt_socket_conf_t));
sa = nxt_sockaddr_parse(tmcf->mem_pool, name);
if (nxt_slow_path(sa == NULL)) {
nxt_log(task, NXT_LOG_CRIT, "invalid listener \"%V\"", name);
return NULL;
}
sa->type = SOCK_STREAM;
nxt_debug(task, "router listener: \"%*s\"",
sa->length, nxt_sockaddr_start(sa));
skcf = nxt_mp_zget(tmcf->conf->mem_pool, sizeof(nxt_socket_conf_t));
if (nxt_slow_path(skcf == NULL)) {
return NULL;
}
skcf->sockaddr = sa;
size = nxt_sockaddr_size(sa);
skcf->listen.sockaddr = sa;
ret = nxt_router_listen_socket_find(tmcf, skcf, sa);
nxt_listen_socket_remote_size(&skcf->listen, sa);
if (ret != NXT_OK) {
skcf->listen.socket = -1;
skcf->listen.backlog = NXT_LISTEN_BACKLOG;
skcf->listen.flags = NXT_NONBLOCK;
skcf->listen.read_after_accept = 1;
ls = nxt_zalloc(sizeof(nxt_listen_socket_t) + size);
if (nxt_slow_path(ls == NULL)) {
return NULL;
}
skcf->listen = ls;
ls->sockaddr = nxt_pointer_to(ls, sizeof(nxt_listen_socket_t));
nxt_memcpy(ls->sockaddr, sa, size);
nxt_listen_socket_remote_size(ls);
ls->socket = -1;
ls->backlog = NXT_LISTEN_BACKLOG;
ls->flags = NXT_NONBLOCK;
ls->read_after_accept = 1;
}
switch (sa->u.sockaddr.sa_family) {
#if (NXT_HAVE_UNIX_DOMAIN)
case AF_UNIX:
wildcard = 0;
break;
#endif
#if (NXT_INET6)
case AF_INET6:
wildcard = IN6_IS_ADDR_UNSPECIFIED(&sa->u.sockaddr_in6.sin6_addr);
break;
#endif
case AF_INET:
default:
wildcard = (sa->u.sockaddr_in.sin_addr.s_addr == INADDR_ANY);
break;
}
if (!wildcard) {
skcf->sockaddr = nxt_mp_zget(tmcf->conf->mem_pool, size);
if (nxt_slow_path(skcf->sockaddr == NULL)) {
return NULL;
}
nxt_memcpy(skcf->sockaddr, sa, size);
}
return skcf;
}
static void
nxt_router_listen_sockets_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf)
static nxt_int_t
nxt_router_listen_socket_find(nxt_router_temp_conf_t *tmcf,
nxt_socket_conf_t *nskcf, nxt_sockaddr_t *sa)
{
nxt_queue_link_t *nqlk, *oqlk, *next;
nxt_socket_conf_t *nskcf, *oskcf;
nxt_router_t *router;
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
for (nqlk = nxt_queue_first(&tmcf->pending);
nqlk != nxt_queue_tail(&tmcf->pending);
nqlk = next)
router = tmcf->conf->router;
for (qlk = nxt_queue_first(&router->sockets);
qlk != nxt_queue_tail(&router->sockets);
qlk = nxt_queue_next(qlk))
{
next = nxt_queue_next(nqlk);
nskcf = nxt_queue_link_data(nqlk, nxt_socket_conf_t, link);
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
for (oqlk = nxt_queue_first(&router->sockets);
oqlk != nxt_queue_tail(&router->sockets);
oqlk = nxt_queue_next(oqlk))
{
oskcf = nxt_queue_link_data(oqlk, nxt_socket_conf_t, link);
if (nxt_sockaddr_cmp(skcf->listen->sockaddr, sa)) {
nskcf->listen = skcf->listen;
if (nxt_sockaddr_cmp(nskcf->sockaddr, oskcf->sockaddr)) {
nskcf->socket = oskcf->socket;
nskcf->listen.socket = oskcf->listen.socket;
nxt_queue_remove(qlk);
nxt_queue_insert_tail(&tmcf->keeping, qlk);
nxt_queue_remove(oqlk);
nxt_queue_insert_tail(&tmcf->keeping, oqlk);
nxt_queue_insert_tail(&tmcf->updating, &nskcf->link);
nxt_queue_remove(nqlk);
nxt_queue_insert_tail(&tmcf->updating, nqlk);
break;
}
return NXT_OK;
}
}
nxt_queue_add(&tmcf->deleting, &router->sockets);
nxt_queue_init(&router->sockets);
nxt_queue_insert_tail(&tmcf->pending, &nskcf->link);
return NXT_DECLINED;
}
@@ -1316,14 +1356,14 @@ nxt_router_listen_socket_rpc_create(nxt_task_t *task,
rpc->socket_conf = skcf;
rpc->temp_conf = tmcf;
size = nxt_sockaddr_size(skcf->sockaddr);
size = nxt_sockaddr_size(skcf->listen->sockaddr);
b = nxt_buf_mem_alloc(tmcf->mem_pool, size, 0);
if (b == NULL) {
goto fail;
}
b->mem.free = nxt_cpymem(b->mem.free, skcf->sockaddr, size);
b->mem.free = nxt_cpymem(b->mem.free, skcf->listen->sockaddr, size);
rt = task->thread->runtime;
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
@@ -1352,10 +1392,9 @@ static void
nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
nxt_socket_t s;
nxt_socket_rpc_t *rpc;
nxt_router_socket_t *rtsk;
nxt_int_t ret;
nxt_socket_t s;
nxt_socket_rpc_t *rpc;
rpc = data;
@@ -1366,23 +1405,14 @@ nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
goto fail;
}
nxt_socket_defer_accept(task, s, rpc->socket_conf->sockaddr);
nxt_socket_defer_accept(task, s, rpc->socket_conf->listen->sockaddr);
ret = nxt_listen_socket(task, s, NXT_LISTEN_BACKLOG);
if (nxt_slow_path(ret != NXT_OK)) {
goto fail;
}
rtsk = nxt_malloc(sizeof(nxt_router_socket_t));
if (nxt_slow_path(rtsk == NULL)) {
goto fail;
}
rtsk->count = 0;
rtsk->fd = s;
rpc->socket_conf->listen.socket = s;
rpc->socket_conf->socket = rtsk;
rpc->socket_conf->listen->socket = s;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
@@ -1420,7 +1450,7 @@ nxt_router_listen_socket_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
};
rpc = data;
sa = rpc->socket_conf->sockaddr;
sa = rpc->socket_conf->listen->sockaddr;
tmcf = rpc->temp_conf;
in = nxt_buf_chk_make_plain(tmcf->mem_pool, msg->buf, msg->size);
@@ -1529,8 +1559,7 @@ static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
nxt_int_t ret;
nxt_thread_spinlock_t *lock;
nxt_int_t ret;
ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
nxt_router_listen_socket_create);
@@ -1544,15 +1573,6 @@ nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
return ret;
}
lock = &tmcf->conf->router->lock;
nxt_thread_spin_lock(lock);
nxt_router_engine_socket_count(&tmcf->creating);
nxt_router_engine_socket_count(&tmcf->updating);
nxt_thread_spin_unlock(lock);
return ret;
}
@@ -1561,8 +1581,7 @@ static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
{
nxt_int_t ret;
nxt_thread_spinlock_t *lock;
nxt_int_t ret;
ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
nxt_router_listen_socket_create);
@@ -1581,14 +1600,6 @@ nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
return ret;
}
lock = &tmcf->conf->router->lock;
nxt_thread_spin_lock(lock);
nxt_router_engine_socket_count(&tmcf->creating);
nxt_thread_spin_unlock(lock);
return ret;
}
@@ -1664,22 +1675,6 @@ nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
}
static void
nxt_router_engine_socket_count(nxt_queue_t *sockets)
{
nxt_queue_link_t *qlk;
nxt_socket_conf_t *skcf;
for (qlk = nxt_queue_first(sockets);
qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk))
{
skcf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
skcf->socket->count++;
}
}
static nxt_int_t
nxt_router_engine_quit(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf)
@@ -1941,24 +1936,33 @@ static void
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
nxt_listen_event_t *listen;
nxt_socket_conf_t *skcf;
nxt_listen_event_t *lev;
nxt_listen_socket_t *ls;
nxt_thread_spinlock_t *lock;
nxt_socket_conf_joint_t *joint;
job = obj;
joint = data;
ls = &joint->socket_conf->listen;
nxt_queue_insert_tail(&task->thread->engine->joints, &joint->link);
listen = nxt_listen_event(task, ls);
if (nxt_slow_path(listen == NULL)) {
nxt_router_listen_socket_release(task, joint);
skcf = joint->socket_conf;
ls = skcf->listen;
lev = nxt_listen_event(task, ls);
if (nxt_slow_path(lev == NULL)) {
nxt_router_listen_socket_release(task, skcf);
return;
}
listen->socket.data = joint;
lev->socket.data = joint;
lock = &skcf->router_conf->router->lock;
nxt_thread_spin_lock(lock);
ls->count++;
nxt_thread_spin_unlock(lock);
job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;
@@ -1973,18 +1977,18 @@ nxt_router_listen_event(nxt_queue_t *listen_connections,
{
nxt_socket_t fd;
nxt_queue_link_t *qlk;
nxt_listen_event_t *listen;
nxt_listen_event_t *lev;
fd = skcf->socket->fd;
fd = skcf->listen->socket;
for (qlk = nxt_queue_first(listen_connections);
qlk != nxt_queue_tail(listen_connections);
qlk = nxt_queue_next(qlk))
{
listen = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
lev = nxt_queue_link_data(qlk, nxt_listen_event_t, link);
if (fd == listen->socket.fd) {
return listen;
if (fd == lev->socket.fd) {
return lev;
}
}
@@ -1997,7 +2001,7 @@ nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
nxt_event_engine_t *engine;
nxt_listen_event_t *listen;
nxt_listen_event_t *lev;
nxt_socket_conf_joint_t *joint, *old;
job = obj;
@@ -2007,12 +2011,12 @@ nxt_router_listen_socket_update(nxt_task_t *task, void *obj, void *data)
nxt_queue_insert_tail(&engine->joints, &joint->link);
listen = nxt_router_listen_event(&engine->listen_connections,
joint->socket_conf);
lev = nxt_router_listen_event(&engine->listen_connections,
joint->socket_conf);
old = listen->socket.data;
listen->socket.data = joint;
listen->listen = &joint->socket_conf->listen;
old = lev->socket.data;
lev->socket.data = joint;
lev->listen = joint->socket_conf->listen;
job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;
@@ -2033,7 +2037,7 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
{
nxt_joint_job_t *job;
nxt_socket_conf_t *skcf;
nxt_listen_event_t *listen;
nxt_listen_event_t *lev;
nxt_event_engine_t *engine;
job = obj;
@@ -2041,17 +2045,17 @@ nxt_router_listen_socket_delete(nxt_task_t *task, void *obj, void *data)
engine = task->thread->engine;
listen = nxt_router_listen_event(&engine->listen_connections, skcf);
lev = nxt_router_listen_event(&engine->listen_connections, skcf);
nxt_fd_event_delete(engine, &listen->socket);
nxt_fd_event_delete(engine, &lev->socket);
nxt_debug(task, "engine %p: listen socket delete: %d", engine,
listen->socket.fd);
lev->socket.fd);
listen->timer.handler = nxt_router_listen_socket_close;
listen->timer.work_queue = &engine->fast_work_queue;
lev->timer.handler = nxt_router_listen_socket_close;
lev->timer.work_queue = &engine->fast_work_queue;
nxt_timer_add(engine, &listen->timer, 0);
nxt_timer_add(engine, &lev->timer, 0);
job->work.next = NULL;
job->work.handler = nxt_router_conf_wait;
@@ -2081,57 +2085,53 @@ static void
nxt_router_listen_socket_close(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *timer;
nxt_listen_event_t *listen;
nxt_listen_event_t *lev;
nxt_socket_conf_joint_t *joint;
timer = obj;
listen = nxt_timer_data(timer, nxt_listen_event_t, timer);
joint = listen->socket.data;
lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
joint = lev->socket.data;
nxt_debug(task, "engine %p: listen socket close: %d", task->thread->engine,
listen->socket.fd);
lev->socket.fd);
nxt_queue_remove(&listen->link);
nxt_queue_remove(&lev->link);
/* 'task' refers to listen->task and we cannot use after nxt_free() */
/* 'task' refers to lev->task and we cannot use after nxt_free() */
task = &task->thread->engine->task;
nxt_free(listen);
nxt_router_listen_socket_release(task, joint->socket_conf);
nxt_router_listen_socket_release(task, joint);
nxt_free(lev);
nxt_router_conf_release(task, joint);
}
static void
nxt_router_listen_socket_release(nxt_task_t *task,
nxt_socket_conf_joint_t *joint)
nxt_router_listen_socket_release(nxt_task_t *task, nxt_socket_conf_t *skcf)
{
nxt_socket_conf_t *skcf;
nxt_router_socket_t *rtsk;
nxt_listen_socket_t *ls;
nxt_thread_spinlock_t *lock;
skcf = joint->socket_conf;
rtsk = skcf->socket;
ls = skcf->listen;
lock = &skcf->router_conf->router->lock;
nxt_thread_spin_lock(lock);
nxt_debug(task, "engine %p: listen socket release: rtsk->count %D",
task->thread->engine, rtsk->count);
nxt_debug(task, "engine %p: listen socket release: ls->count %D",
task->thread->engine, ls->count);
if (--rtsk->count != 0) {
rtsk = NULL;
if (--ls->count != 0) {
ls = NULL;
}
nxt_thread_spin_unlock(lock);
if (rtsk != NULL) {
nxt_socket_close(task, rtsk->fd);
nxt_free(rtsk);
skcf->socket = NULL;
if (ls != NULL) {
nxt_socket_close(task, ls->socket);
nxt_free(ls);
}
nxt_router_conf_release(task, joint);
}
@@ -2261,6 +2261,7 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
{
size_t size;
nxt_conn_t *c;
nxt_socket_conf_t *skcf;
nxt_event_engine_t *engine;
nxt_socket_conf_joint_t *joint;
@@ -2269,9 +2270,13 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "router conn init");
c->joint = joint;
joint->count++;
size = joint->socket_conf->header_buffer_size;
skcf = joint->socket_conf;
c->local = skcf->sockaddr;
size = skcf->header_buffer_size;
c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
c->socket.data = NULL;
@@ -2785,7 +2790,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
use_delta = 1;
c = ra->rc->conn;
joint = c->listen->socket.data;
joint = c->joint;
app = joint->socket_conf->application;
if (app == NULL) {
@@ -2886,7 +2891,7 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
c = obj;
ap = data;
buf = c->read;
joint = c->listen->socket.data;
joint = c->joint;
nxt_debug(task, "router conn http header parse");
@@ -2903,9 +2908,16 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
ap->r.remote.start = nxt_sockaddr_address(c->remote);
ap->r.remote.length = c->remote->address_length;
local = joint->socket_conf->sockaddr;
ap->r.local.start = nxt_sockaddr_address(local);
ap->r.local.length = local->address_length;
/*
* TODO: need an application flag to get local address
* required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
*/
local = nxt_router_local_addr(task, c);
if (nxt_fast_path(local != NULL)) {
ap->r.local.start = nxt_sockaddr_address(local);
ap->r.local.length = local->address_length;
}
ap->r.header.buf = buf;
}
@@ -3000,6 +3012,58 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
}
static nxt_sockaddr_t *
nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c)
{
int ret;
size_t size;
socklen_t socklen;
nxt_sockaddr_t *sa;
if (c->local != NULL) {
return c->local;
}
/* AF_UNIX should not get in here. */
switch (c->remote->u.sockaddr.sa_family) {
#if (NXT_INET6)
case AF_INET6:
socklen = sizeof(struct sockaddr_in6);
size = offsetof(nxt_sockaddr_t, u) + socklen + NXT_INET6_ADDR_STR_LEN;
break;
#endif
case AF_INET:
default:
socklen = sizeof(struct sockaddr_in6);
size = offsetof(nxt_sockaddr_t, u) + socklen + NXT_INET_ADDR_STR_LEN;
break;
}
sa = nxt_mp_get(c->mem_pool, size);
if (nxt_slow_path(sa == NULL)) {
return NULL;
}
ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen);
if (nxt_slow_path(ret != 0)) {
nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd);
return NULL;
}
c->local = sa;
nxt_sockaddr_text(sa);
/*
* TODO: here we can adjust the end of non-freeable block
* in c->mem_pool to the end of actual sockaddr length.
*/
return sa;
}
static void
nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
{
@@ -3040,7 +3104,7 @@ nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
default: /* NXT_AGAIN */
if (nxt_buf_mem_free_size(&buf->mem) == 0) {
joint = c->listen->socket.data;
joint = c->joint;
b->preread_size += nxt_buf_mem_used_size(&buf->mem);
@@ -3585,7 +3649,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
nxt_sockaddr_cache_free(engine, c);
joint = c->listen->socket.data;
joint = c->joint;
nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup,
&engine->task, joint, NULL);
@@ -3653,7 +3717,7 @@ nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
{
nxt_socket_conf_joint_t *joint;
joint = c->listen->socket.data;
joint = c->joint;
return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
}

View File

@@ -103,22 +103,21 @@ struct nxt_app_s {
};
typedef struct {
uint32_t count;
nxt_socket_t fd;
} nxt_router_socket_t;
typedef struct {
uint32_t count;
nxt_queue_link_t link;
nxt_router_socket_t *socket;
nxt_router_conf_t *router_conf;
nxt_sockaddr_t *sockaddr;
nxt_app_t *application;
nxt_listen_socket_t listen;
/*
* A listen socket time can be shorter than socket configuration life
* time, so a copy of the non-wildcard socket sockaddr is stored here
* to be used as a local sockaddr in connections.
*/
nxt_sockaddr_t *sockaddr;
nxt_listen_socket_t *listen;
size_t header_buffer_size;
size_t large_header_buffer_size;