Files
nginx-unit/src/nxt_conn_accept.c
Valentin Bartenev 2b53c7bbbd Fixed nxt_conn_accept_alloc() behavior in low memory conditions.
Earlier, if nxt_mp_create() failed to allocate memory while accepting a new
connection, the resulting NULL was subsequently passed to nxt_mp_destroy(),
crashing the process.

More, if nxt_mp_create() was successful but nxt_sockaddr_cache_alloc() failed,
the connection object wasn't destroyed properly, leaving the connection counter
in an inconsistent state.  Repeated, this condition lowered the connection
capacity of the process and could eventually prevent it from accepting
connections altogether.
2020-08-05 14:55:34 +03:00

382 lines
9.6 KiB
C

/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
/*
* A listen socket handler calls an event facility specific io_accept()
* method. The method accept()s a new connection and then calls
* nxt_event_conn_accept() to handle the new connection and to prepare
* for a next connection to avoid just dropping next accept()ed socket
* if no more connections allowed. If there are no available connections
* an idle connection would be closed. If there are no idle connections
* then new connections will not be accept()ed for 1 second.
*/
static nxt_conn_t *nxt_conn_accept_alloc(nxt_task_t *task,
nxt_listen_event_t *lev);
static void nxt_conn_listen_handler(nxt_task_t *task, void *obj,
void *data);
static nxt_conn_t *nxt_conn_accept_next(nxt_task_t *task,
nxt_listen_event_t *lev);
static void nxt_conn_accept_close_idle(nxt_task_t *task,
nxt_listen_event_t *lev);
static void nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_conn_listen_event_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj,
void *data);
nxt_listen_event_t *
nxt_listen_event(nxt_task_t *task, nxt_listen_socket_t *ls)
{
nxt_listen_event_t *lev;
nxt_event_engine_t *engine;
lev = nxt_zalloc(sizeof(nxt_listen_event_t));
if (nxt_fast_path(lev != NULL)) {
lev->socket.fd = ls->socket;
engine = task->thread->engine;
lev->batch = engine->batch;
lev->count = 1;
lev->socket.read_work_queue = &engine->accept_work_queue;
lev->socket.read_handler = nxt_conn_listen_handler;
lev->socket.error_handler = nxt_conn_listen_event_error;
lev->socket.log = &nxt_main_log;
lev->accept = engine->event.io->accept;
lev->listen = ls;
lev->work_queue = &engine->read_work_queue;
lev->timer.work_queue = &engine->fast_work_queue;
lev->timer.handler = nxt_conn_listen_timer_handler;
lev->timer.log = &nxt_main_log;
lev->task.thread = task->thread;
lev->task.log = &nxt_main_log;
lev->task.ident = nxt_task_next_ident();
lev->socket.task = &lev->task;
lev->timer.task = &lev->task;
if (nxt_conn_accept_alloc(task, lev) != NULL) {
nxt_fd_event_enable_accept(engine, &lev->socket);
nxt_queue_insert_tail(&engine->listen_connections, &lev->link);
}
return lev;
}
return NULL;
}
static nxt_conn_t *
nxt_conn_accept_alloc(nxt_task_t *task, nxt_listen_event_t *lev)
{
nxt_mp_t *mp;
nxt_conn_t *c;
nxt_event_engine_t *engine;
engine = task->thread->engine;
if (engine->connections < engine->max_connections) {
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_fast_path(mp != NULL)) {
c = nxt_conn_create(mp, lev->socket.task);
if (nxt_slow_path(c == NULL)) {
nxt_mp_destroy(mp);
return NULL;
}
c->socket.read_work_queue = lev->socket.read_work_queue;
c->socket.write_ready = 1;
c->remote = nxt_sockaddr_cache_alloc(engine, lev->listen);
if (nxt_fast_path(c->remote != NULL)) {
lev->next = c;
return c;
}
nxt_conn_free(task, c);
}
}
return NULL;
}
static void
nxt_conn_listen_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_listen_event_t *lev;
lev = obj;
lev->ready = lev->batch;
lev->accept(task, lev, data);
}
void
nxt_conn_io_accept(nxt_task_t *task, void *obj, void *data)
{
socklen_t socklen;
nxt_conn_t *c;
nxt_socket_t s;
struct sockaddr *sa;
nxt_listen_event_t *lev;
lev = obj;
c = lev->next;
lev->ready--;
lev->socket.read_ready = (lev->ready != 0);
sa = &c->remote->u.sockaddr;
socklen = c->remote->socklen;
/*
* The returned socklen is ignored here, because sockaddr_in and
* sockaddr_in6 socklens are not changed. As to unspecified sockaddr_un
* it is 3 byte length and already prepared, because old BSDs return zero
* socklen and do not update the sockaddr_un at all; Linux returns 2 byte
* socklen and updates only the sa_family part; other systems copy 3 bytes
* and truncate surplus zero part. Only bound sockaddr_un will be really
* truncated here.
*/
s = accept(lev->socket.fd, sa, &socklen);
if (s == -1) {
nxt_conn_accept_error(task, lev, "accept", nxt_socket_errno);
return;
}
c->socket.fd = s;
#if (NXT_LINUX)
/*
* Linux does not inherit non-blocking mode
* from listen socket for accept()ed socket.
*/
if (nxt_slow_path(nxt_socket_nonblocking(task, s) != NXT_OK)) {
nxt_socket_close(task, s);
}
#endif
nxt_debug(task, "accept(%d): %d", lev->socket.fd, s);
nxt_conn_accept(task, lev, c);
}
void
nxt_conn_accept(nxt_task_t *task, nxt_listen_event_t *lev, nxt_conn_t *c)
{
nxt_conn_t *next;
nxt_sockaddr_text(c->remote);
nxt_debug(task, "client: %*s",
(size_t) c->remote->address_length,
nxt_sockaddr_address(c->remote));
nxt_queue_insert_head(&task->thread->engine->idle_connections, &c->link);
c->listen = lev;
lev->count++;
lev->next = NULL;
c->socket.data = NULL;
c->read_work_queue = lev->work_queue;
c->write_work_queue = lev->work_queue;
if (lev->listen->read_after_accept) {
//c->socket.read_ready = 1;
// lev->listen->handler(task, c, lev);
nxt_work_queue_add(c->read_work_queue, lev->listen->handler,
&c->task, c, lev);
} else {
nxt_work_queue_add(c->write_work_queue, lev->listen->handler,
&c->task, c, lev);
}
next = nxt_conn_accept_next(task, lev);
if (next != NULL && lev->socket.read_ready) {
nxt_work_queue_add(lev->socket.read_work_queue,
lev->accept, task, lev, next);
}
}
static nxt_conn_t *
nxt_conn_accept_next(nxt_task_t *task, nxt_listen_event_t *lev)
{
nxt_conn_t *c;
c = lev->next;
if (c == NULL) {
c = nxt_conn_accept_alloc(task, lev);
if (nxt_slow_path(c == NULL)) {
nxt_conn_accept_close_idle(task, lev);
}
}
return c;
}
static void
nxt_conn_accept_close_idle(nxt_task_t *task, nxt_listen_event_t *lev)
{
nxt_event_engine_t *engine;
engine = task->thread->engine;
nxt_work_queue_add(&engine->close_work_queue,
nxt_conn_accept_close_idle_handler, task, NULL, NULL);
nxt_timer_add(engine, &lev->timer, 100);
nxt_fd_event_disable_read(engine, &lev->socket);
nxt_alert(task, "new connections are not accepted within 100ms");
}
static void
nxt_conn_accept_close_idle_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_uint_t times;
nxt_conn_t *c;
nxt_queue_t *idle;
nxt_queue_link_t *link, *next;
nxt_event_engine_t *engine;
static nxt_log_moderation_t nxt_idle_close_log_moderation = {
NXT_LOG_INFO, 2, "idle connections closed", NXT_LOG_MODERATION
};
times = 10;
engine = task->thread->engine;
idle = &engine->idle_connections;
for (link = nxt_queue_last(idle);
link != nxt_queue_head(idle);
link = next)
{
next = nxt_queue_next(link);
c = nxt_queue_link_data(link, nxt_conn_t, link);
nxt_debug(c->socket.task, "idle connection: %d rdy:%d",
c->socket.fd, c->socket.read_ready);
if (!c->socket.read_ready) {
nxt_log_moderate(&nxt_idle_close_log_moderation, NXT_LOG_INFO,
task->log, "no available connections, "
"close idle connection");
c->read_state->close_handler(c->socket.task, c, c->socket.data);
times--;
if (times == 0) {
break;
}
}
}
}
void
nxt_conn_accept_error(nxt_task_t *task, nxt_listen_event_t *lev,
const char *accept_syscall, nxt_err_t err)
{
static nxt_log_moderation_t nxt_accept_log_moderation = {
NXT_LOG_INFO, 2, "accept() failed", NXT_LOG_MODERATION
};
lev->socket.read_ready = 0;
switch (err) {
case NXT_EAGAIN:
nxt_debug(task, "%s(%d) %E", accept_syscall, lev->socket.fd, err);
return;
case ECONNABORTED:
nxt_log_moderate(&nxt_accept_log_moderation, NXT_LOG_WARN,
task->log, "%s(%d) failed %E",
accept_syscall, lev->socket.fd, err);
return;
case EMFILE:
case ENFILE:
case ENOBUFS:
case ENOMEM:
nxt_alert(task, "%s(%d) failed %E",
accept_syscall, lev->socket.fd, err);
nxt_conn_accept_close_idle(task, lev);
return;
default:
nxt_alert(task, "%s(%d) failed %E",
accept_syscall, lev->socket.fd, err);
return;
}
}
static void
nxt_conn_listen_timer_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_timer_t *timer;
nxt_listen_event_t *lev;
timer = obj;
lev = nxt_timer_data(timer, nxt_listen_event_t, timer);
c = nxt_conn_accept_next(task, lev);
if (c == NULL) {
return;
}
nxt_fd_event_enable_accept(task->thread->engine, &lev->socket);
lev->accept(task, lev, c);
}
static void
nxt_conn_listen_event_error(nxt_task_t *task, void *obj, void *data)
{
nxt_fd_event_t *ev;
ev = obj;
nxt_alert(task, "accept(%d) event error", ev->fd);
}