Files
nginx-unit/src/nxt_port_socket.c
Max Romanov b8052b050e Fixing shm buffer leakage when sending over the port queue.
When the shm buffer is sent over the port queue, it needs to be completed
because it's sent over the port socket.
2021-03-25 16:55:16 +03:00

1356 lines
34 KiB
C

/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
#include <nxt_port_queue.h>
#include <nxt_port_memory_int.h>
#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
(int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
void *qbuf, nxt_buf_t *b);
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg);
static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
nxt_port_send_msg_t *msg);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
nxt_int_t
nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
{
nxt_int_t sndbuf, rcvbuf, size;
nxt_socket_t snd, rcv;
port->socket.task = task;
port->pair[0] = -1;
port->pair[1] = -1;
if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
goto socketpair_fail;
}
snd = port->pair[1];
sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
if (nxt_slow_path(sndbuf < 0)) {
goto getsockopt_fail;
}
rcv = port->pair[0];
rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
if (nxt_slow_path(rcvbuf < 0)) {
goto getsockopt_fail;
}
if (max_size == 0) {
max_size = 16 * 1024;
}
if ((size_t) sndbuf < max_size) {
/*
* On Unix domain sockets
* Linux uses 224K on both send and receive directions;
* FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
* on send direction and 4K buffer size on receive direction;
* Solaris uses 16K on send direction and 5K on receive direction.
*/
(void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
max_size);
sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
if (nxt_slow_path(sndbuf < 0)) {
goto getsockopt_fail;
}
size = sndbuf * 4;
if (rcvbuf < size) {
(void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
size);
rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
if (nxt_slow_path(rcvbuf < 0)) {
goto getsockopt_fail;
}
}
}
port->max_size = nxt_min(max_size, (size_t) sndbuf);
port->max_share = (64 * 1024);
return NXT_OK;
getsockopt_fail:
nxt_socket_close(task, port->pair[0]);
nxt_socket_close(task, port->pair[1]);
socketpair_fail:
return NXT_ERROR;
}
void
nxt_port_destroy(nxt_port_t *port)
{
nxt_socket_close(port->socket.task, port->socket.fd);
nxt_mp_destroy(port->mem_pool);
}
void
nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
{
port->socket.fd = port->pair[1];
port->socket.log = &nxt_main_log;
port->socket.write_ready = 1;
port->engine = task->thread->engine;
port->socket.write_work_queue = &port->engine->fast_work_queue;
port->socket.write_handler = nxt_port_write_handler;
port->socket.error_handler = nxt_port_error_handler;
}
void
nxt_port_write_close(nxt_port_t *port)
{
nxt_socket_close(port->socket.task, port->pair[1]);
port->pair[1] = -1;
}
static void
nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
{
if (msg->allocated) {
nxt_free(msg);
}
}
nxt_int_t
nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
nxt_buf_t *b)
{
int notify;
uint8_t qmsg_size;
nxt_int_t res;
nxt_port_send_msg_t msg;
struct {
nxt_port_msg_t pm;
uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
} qmsg;
msg.link.next = NULL;
msg.link.prev = NULL;
msg.buf = b;
msg.share = 0;
msg.fd[0] = fd;
msg.fd[1] = fd2;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.allocated = 0;
msg.port_msg.stream = stream;
msg.port_msg.pid = nxt_pid;
msg.port_msg.reply_port = reply_port;
msg.port_msg.type = type & NXT_PORT_MSG_MASK;
msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg.port_msg.mmap = 0;
msg.port_msg.nf = 0;
msg.port_msg.mf = 0;
if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
qmsg.pm = msg.port_msg;
qmsg_size = sizeof(qmsg.pm);
if (b != NULL) {
qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
}
res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) qmsg_size, notify, res);
if (b != NULL && nxt_fast_path(res == NXT_OK)) {
if (qmsg.pm.mmap) {
b->is_port_mmap_sent = 1;
}
b->mem.pos = b->mem.free;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
}
if (notify == 0) {
return res;
}
msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
msg.buf = NULL;
} else {
qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
notify, res);
if (nxt_slow_path(res == NXT_AGAIN)) {
return NXT_AGAIN;
}
}
}
res = nxt_port_msg_chk_insert(task, port, &msg);
if (nxt_fast_path(res == NXT_DECLINED)) {
nxt_port_write_handler(task, &port->socket, &msg);
res = NXT_OK;
}
return res;
}
static nxt_bool_t
nxt_port_can_enqueue_buf(nxt_buf_t *b)
{
if (b == NULL) {
return 1;
}
if (b->next != NULL) {
return 0;
}
return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
|| nxt_buf_is_port_mmap(b));
}
static uint8_t
nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
nxt_buf_t *b)
{
ssize_t size;
nxt_port_mmap_msg_t *mm;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
size = nxt_buf_mem_used_size(&b->mem);
if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
nxt_memcpy(qbuf, b->mem.pos, size);
return size;
}
mmap_handler = b->parent;
hdr = mmap_handler->hdr;
mm = qbuf;
mm->mmap_id = hdr->id;
mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
mm->size = nxt_buf_mem_used_size(&b->mem);
pm->mmap = 1;
nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
mm->size);
return sizeof(nxt_port_mmap_msg_t);
}
static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)
{
nxt_int_t res;
nxt_thread_mutex_lock(&port->write_mutex);
if (nxt_fast_path(port->socket.write_ready
&& nxt_queue_is_empty(&port->messages)))
{
res = NXT_DECLINED;
} else {
msg = nxt_port_msg_alloc(msg);
if (nxt_fast_path(msg != NULL)) {
nxt_queue_insert_tail(&port->messages, &msg->link);
nxt_port_use(task, port, 1);
res = NXT_OK;
} else {
res = NXT_ERROR;
}
}
nxt_thread_mutex_unlock(&port->write_mutex);
return res;
}
static nxt_port_send_msg_t *
nxt_port_msg_alloc(nxt_port_send_msg_t *m)
{
nxt_port_send_msg_t *msg;
msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
return NULL;
}
*msg = *m;
msg->allocated = 1;
return msg;
}
static void
nxt_port_fd_block_write(nxt_task_t *task, nxt_port_t *port, void *data)
{
nxt_fd_event_block_write(task->thread->engine, &port->socket);
}
static void
nxt_port_fd_enable_write(nxt_task_t *task, nxt_port_t *port, void *data)
{
nxt_fd_event_enable_write(task->thread->engine, &port->socket);
}
static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
int use_delta;
size_t plain_size;
ssize_t n;
uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10];
nxt_bool_t block_write, enable_write;
nxt_port_t *port;
struct iovec iov[NXT_IOBUF_MAX * 10];
nxt_work_queue_t *wq;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
port = nxt_container_of(obj, nxt_port_t, socket);
block_write = 0;
enable_write = 0;
use_delta = 0;
wq = &task->thread->engine->fast_work_queue;
do {
if (data) {
msg = data;
} else {
msg = nxt_port_msg_first(port);
if (msg == NULL) {
block_write = 1;
goto cleanup;
}
}
next_fragment:
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
sb.buf = msg->buf;
sb.iobuf = &iov[1];
sb.nmax = NXT_IOBUF_MAX - 1;
sb.sync = 0;
sb.last = 0;
sb.size = 0;
sb.limit = port->max_size;
sb.limit_reached = 0;
sb.nmax_reached = 0;
m = nxt_port_mmap_get_method(task, port, msg->buf);
if (m == NXT_PORT_METHOD_MMAP) {
sb.limit = (1ULL << 31) - 1;
sb.nmax = nxt_min(NXT_IOBUF_MAX * 10 - 1,
port->max_size / PORT_MMAP_MIN_SIZE);
}
sb.limit -= iov[0].iov_len;
nxt_sendbuf_mem_coalesce(task, &sb);
plain_size = sb.size;
/*
* Send through mmap enabled only when payload
* is bigger than PORT_MMAP_MIN_SIZE.
*/
if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
} else {
m = NXT_PORT_METHOD_PLAIN;
}
msg->port_msg.last |= sb.last;
msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
if (n > 0) {
if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
nxt_alert(task, "port %d: short write: %z instead of %uz",
port->socket.fd, n, sb.size + iov[0].iov_len);
goto fail;
}
if (msg->close_fd) {
if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
msg->fd[0] = -1;
}
if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
msg->fd[1] = -1;
}
}
msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
m == NXT_PORT_METHOD_MMAP);
if (msg->buf != NULL) {
nxt_debug(task, "port %d: frag stream #%uD", port->socket.fd,
msg->port_msg.stream);
/*
* A file descriptor is sent only
* in the first message of a stream.
*/
msg->fd[0] = -1;
msg->fd[1] = -1;
msg->share += n;
msg->port_msg.nf = 1;
if (msg->share >= port->max_share) {
msg->share = 0;
if (msg->link.next != NULL) {
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_remove(&msg->link);
nxt_queue_insert_tail(&port->messages, &msg->link);
nxt_thread_mutex_unlock(&port->write_mutex);
} else {
msg = nxt_port_msg_insert_tail(port, msg);
if (nxt_slow_path(msg == NULL)) {
goto fail;
}
use_delta++;
}
} else {
goto next_fragment;
}
} else {
if (msg->link.next != NULL) {
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_remove(&msg->link);
msg->link.next = NULL;
nxt_thread_mutex_unlock(&port->write_mutex);
use_delta--;
}
nxt_port_release_send_msg(msg);
}
if (data != NULL) {
goto cleanup;
}
} else {
if (nxt_slow_path(n == NXT_ERROR)) {
goto fail;
}
if (msg->link.next == NULL) {
msg = nxt_port_msg_insert_tail(port, msg);
if (nxt_slow_path(msg == NULL)) {
goto fail;
}
use_delta++;
}
}
} while (port->socket.write_ready);
if (nxt_fd_event_is_disabled(port->socket.write)) {
enable_write = 1;
}
goto cleanup;
fail:
use_delta++;
nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
&port->socket);
cleanup:
if (block_write && nxt_fd_event_is_active(port->socket.write)) {
nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
}
if (enable_write) {
nxt_port_post(task, port, nxt_port_fd_enable_write, NULL);
}
if (use_delta != 0) {
nxt_port_use(task, port, use_delta);
}
}
static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_port_t *port)
{
nxt_queue_link_t *lnk;
nxt_port_send_msg_t *msg;
nxt_thread_mutex_lock(&port->write_mutex);
lnk = nxt_queue_first(&port->messages);
if (lnk == nxt_queue_tail(&port->messages)) {
msg = NULL;
} else {
msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
}
nxt_thread_mutex_unlock(&port->write_mutex);
return msg;
}
static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
{
size_t size;
nxt_buf_t *next;
while (b != NULL) {
nxt_prefetch(b->next);
if (!nxt_buf_is_sync(b)) {
size = nxt_buf_used_size(b);
if (size != 0) {
if (sent == 0) {
break;
}
if (nxt_buf_is_port_mmap(b) && mmap_mode) {
/*
* buffer has been sent to other side which is now
* responsible for shared memory bucket release
*/
b->is_port_mmap_sent = 1;
}
if (sent < size) {
if (nxt_buf_is_mem(b)) {
b->mem.pos += sent;
}
if (nxt_buf_is_file(b)) {
b->file_pos += sent;
}
break;
}
/* b->mem.free is NULL in file-only buffer. */
b->mem.pos = b->mem.free;
if (nxt_buf_is_file(b)) {
b->file_pos = b->file_end;
}
sent -= size;
}
}
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
next = b->next;
b->next = NULL;
b = next;
}
return b;
}
static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
{
if (msg->allocated == 0) {
msg = nxt_port_msg_alloc(msg);
if (nxt_slow_path(msg == NULL)) {
return NULL;
}
}
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_insert_tail(&port->messages, &msg->link);
nxt_thread_mutex_unlock(&port->write_mutex);
return msg;
}
void
nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
{
port->socket.fd = port->pair[0];
port->socket.log = &nxt_main_log;
port->engine = task->thread->engine;
port->socket.read_work_queue = &port->engine->fast_work_queue;
port->socket.read_handler = port->queue != NULL
? nxt_port_queue_read_handler
: nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler;
nxt_fd_event_enable_read(port->engine, &port->socket);
}
void
nxt_port_read_close(nxt_port_t *port)
{
port->socket.read_ready = 0;
port->socket.read = NXT_EVENT_INACTIVE;
nxt_socket_close(port->socket.task, port->pair[0]);
port->pair[0] = -1;
}
static void
nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
nxt_port_t *port;
struct iovec iov[2];
nxt_port_recv_msg_t msg;
port = msg.port = nxt_container_of(obj, nxt_port_t, socket);
nxt_assert(port->engine == task->thread->engine);
for ( ;; ) {
b = nxt_port_buf_alloc(port);
if (nxt_slow_path(b == NULL)) {
/* TODO: disable event for some time */
}
iov[0].iov_base = &msg.port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
if (n > 0) {
msg.buf = b;
msg.size = n;
nxt_port_read_msg_process(task, port, &msg);
/*
* To disable instant completion or buffer re-usage,
* handler should reset 'msg.buf'.
*/
if (msg.buf == b) {
nxt_port_buf_free(port, b);
}
if (port->socket.read_ready) {
continue;
}
return;
}
if (n == NXT_AGAIN) {
nxt_port_buf_free(port, b);
nxt_fd_event_enable_read(task->thread->engine, &port->socket);
return;
}
/* n == 0 || n == NXT_ERROR */
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_port_error_handler, task, &port->socket, NULL);
return;
}
}
static void
nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
nxt_port_t *port;
struct iovec iov[2];
nxt_port_queue_t *queue;
nxt_port_recv_msg_t msg, *smsg;
uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
port = nxt_container_of(obj, nxt_port_t, socket);
msg.port = port;
nxt_assert(port->engine == task->thread->engine);
queue = port->queue;
nxt_atomic_fetch_add(&queue->nitems, 1);
for ( ;; ) {
if (port->from_socket == 0) {
n = nxt_port_queue_recv(queue, qmsg);
if (n < 0 && !port->socket.read_ready) {
nxt_atomic_fetch_add(&queue->nitems, -1);
n = nxt_port_queue_recv(queue, qmsg);
if (n < 0) {
return;
}
nxt_atomic_fetch_add(&queue->nitems, 1);
}
if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
port->from_socket++;
nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
(int) port->pid, (int) port->id, port->socket.fd,
port->from_socket);
continue;
}
nxt_debug(task, "port{%d,%d} %d: dequeue %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
} else {
if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
msg.port_msg = smsg->port_msg;
b = smsg->buf;
n = smsg->size;
msg.fd[0] = smsg->fd[0];
msg.fd[1] = smsg->fd[1];
smsg->size = 0;
port->from_socket--;
nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
goto process;
}
n = -1;
}
if (n < 0 && !port->socket.read_ready) {
nxt_atomic_fetch_add(&queue->nitems, -1);
return;
}
b = nxt_port_buf_alloc(port);
if (nxt_slow_path(b == NULL)) {
/* TODO: disable event for some time */
}
if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
n - sizeof(nxt_port_msg_t));
}
} else {
iov[0].iov_base = &msg.port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
if (n == (ssize_t) sizeof(nxt_port_msg_t)
&& msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
{
nxt_port_buf_free(port, b);
nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
continue;
}
nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
if (n > 0) {
if (port->from_socket == 0) {
nxt_debug(task, "port{%d,%d} %d: suspend message %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
smsg = port->socket_msg;
if (nxt_slow_path(smsg == NULL)) {
smsg = nxt_mp_alloc(port->mem_pool,
sizeof(nxt_port_recv_msg_t));
if (nxt_slow_path(smsg == NULL)) {
nxt_alert(task, "port{%d,%d} %d: suspend message "
"failed",
(int) port->pid, (int) port->id,
port->socket.fd);
return;
}
port->socket_msg = smsg;
} else {
if (nxt_slow_path(smsg->size != 0)) {
nxt_alert(task, "port{%d,%d} %d: too many suspend "
"messages",
(int) port->pid, (int) port->id,
port->socket.fd);
return;
}
}
smsg->port_msg = msg.port_msg;
smsg->buf = b;
smsg->size = n;
smsg->fd[0] = msg.fd[0];
smsg->fd[1] = msg.fd[1];
continue;
}
port->from_socket--;
}
}
process:
if (n > 0) {
msg.buf = b;
msg.size = n;
nxt_port_read_msg_process(task, port, &msg);
/*
* To disable instant completion or buffer re-usage,
* handler should reset 'msg.buf'.
*/
if (msg.buf == b) {
nxt_port_buf_free(port, b);
}
continue;
}
if (n == NXT_AGAIN) {
nxt_port_buf_free(port, b);
nxt_fd_event_enable_read(task->thread->engine, &port->socket);
continue;
}
/* n == 0 || n == NXT_ERROR */
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_port_error_handler, task, &port->socket, NULL);
return;
}
}
typedef struct {
uint32_t stream;
uint32_t pid;
} nxt_port_frag_key_t;
static nxt_int_t
nxt_port_lvlhsh_frag_test(nxt_lvlhsh_query_t *lhq, void *data)
{
nxt_port_recv_msg_t *fmsg;
nxt_port_frag_key_t *frag_key;
fmsg = data;
frag_key = (nxt_port_frag_key_t *) lhq->key.start;
if (lhq->key.length == sizeof(nxt_port_frag_key_t)
&& frag_key->stream == fmsg->port_msg.stream
&& frag_key->pid == (uint32_t) fmsg->port_msg.pid)
{
return NXT_OK;
}
return NXT_DECLINED;
}
static void *
nxt_port_lvlhsh_frag_alloc(void *ctx, size_t size)
{
return nxt_mp_align(ctx, size, size);
}
static void
nxt_port_lvlhsh_frag_free(void *ctx, void *p)
{
nxt_mp_free(ctx, p);
}
static const nxt_lvlhsh_proto_t lvlhsh_frag_proto nxt_aligned(64) = {
NXT_LVLHSH_DEFAULT,
nxt_port_lvlhsh_frag_test,
nxt_port_lvlhsh_frag_alloc,
nxt_port_lvlhsh_frag_free,
};
static nxt_port_recv_msg_t *
nxt_port_frag_start(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
nxt_int_t res;
nxt_lvlhsh_query_t lhq;
nxt_port_recv_msg_t *fmsg;
nxt_port_frag_key_t frag_key;
nxt_debug(task, "start frag stream #%uD", msg->port_msg.stream);
fmsg = nxt_mp_alloc(port->mem_pool, sizeof(nxt_port_recv_msg_t));
if (nxt_slow_path(fmsg == NULL)) {
return NULL;
}
*fmsg = *msg;
frag_key.stream = fmsg->port_msg.stream;
frag_key.pid = fmsg->port_msg.pid;
lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
lhq.key.length = sizeof(nxt_port_frag_key_t);
lhq.key.start = (u_char *) &frag_key;
lhq.proto = &lvlhsh_frag_proto;
lhq.replace = 0;
lhq.value = fmsg;
lhq.pool = port->mem_pool;
res = nxt_lvlhsh_insert(&port->frags, &lhq);
switch (res) {
case NXT_OK:
return fmsg;
case NXT_DECLINED:
nxt_log(task, NXT_LOG_WARN, "duplicate frag stream #%uD",
fmsg->port_msg.stream);
nxt_mp_free(port->mem_pool, fmsg);
return NULL;
default:
nxt_log(task, NXT_LOG_WARN, "failed to add frag stream #%uD",
fmsg->port_msg.stream);
nxt_mp_free(port->mem_pool, fmsg);
return NULL;
}
}
static nxt_port_recv_msg_t *
nxt_port_frag_find(nxt_task_t *task, nxt_port_t *port, nxt_port_recv_msg_t *msg)
{
nxt_int_t res;
nxt_bool_t last;
nxt_lvlhsh_query_t lhq;
nxt_port_frag_key_t frag_key;
last = msg->port_msg.mf == 0;
nxt_debug(task, "%s frag stream #%uD", last ? "last" : "next",
msg->port_msg.stream);
frag_key.stream = msg->port_msg.stream;
frag_key.pid = msg->port_msg.pid;
lhq.key_hash = nxt_murmur_hash2(&frag_key, sizeof(nxt_port_frag_key_t));
lhq.key.length = sizeof(nxt_port_frag_key_t);
lhq.key.start = (u_char *) &frag_key;
lhq.proto = &lvlhsh_frag_proto;
lhq.pool = port->mem_pool;
res = last != 0 ? nxt_lvlhsh_delete(&port->frags, &lhq) :
nxt_lvlhsh_find(&port->frags, &lhq);
switch (res) {
case NXT_OK:
return lhq.value;
default:
nxt_log(task, NXT_LOG_INFO, "frag stream #%uD not found",
frag_key.stream);
return NULL;
}
}
static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b, *orig_b, *next;
nxt_port_recv_msg_t *fmsg;
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
nxt_alert(task, "port %d: too small message:%uz",
port->socket.fd, msg->size);
if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
}
if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
}
return;
}
/* adjust size to actual buffer used size */
msg->size -= sizeof(nxt_port_msg_t);
b = orig_b = msg->buf;
b->mem.free += msg->size;
msg->cancelled = 0;
if (nxt_slow_path(msg->port_msg.nf != 0)) {
fmsg = nxt_port_frag_find(task, port, msg);
if (nxt_slow_path(fmsg == NULL)) {
goto fmsg_failed;
}
if (nxt_fast_path(fmsg->cancelled == 0)) {
if (msg->port_msg.mmap) {
nxt_port_mmap_read(task, msg);
}
nxt_buf_chain_add(&fmsg->buf, msg->buf);
fmsg->size += msg->size;
msg->buf = NULL;
b = NULL;
if (nxt_fast_path(msg->port_msg.mf == 0)) {
b = fmsg->buf;
port->handler(task, fmsg);
msg->buf = fmsg->buf;
msg->fd[0] = fmsg->fd[0];
msg->fd[1] = fmsg->fd[1];
/*
* To disable instant completion or buffer re-usage,
* handler should reset 'msg.buf'.
*/
if (!msg->port_msg.mmap && msg->buf == b) {
nxt_port_buf_free(port, b);
}
}
}
if (nxt_fast_path(msg->port_msg.mf == 0)) {
nxt_mp_free(port->mem_pool, fmsg);
}
} else {
if (nxt_slow_path(msg->port_msg.mf != 0)) {
if (msg->port_msg.mmap && msg->cancelled == 0) {
nxt_port_mmap_read(task, msg);
b = msg->buf;
}
fmsg = nxt_port_frag_start(task, port, msg);
if (nxt_slow_path(fmsg == NULL)) {
goto fmsg_failed;
}
fmsg->port_msg.nf = 0;
fmsg->port_msg.mf = 0;
if (nxt_fast_path(msg->cancelled == 0)) {
msg->buf = NULL;
msg->fd[0] = -1;
msg->fd[1] = -1;
b = NULL;
} else {
if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
}
if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
}
}
} else {
if (nxt_fast_path(msg->cancelled == 0)) {
if (msg->port_msg.mmap) {
nxt_port_mmap_read(task, msg);
b = msg->buf;
}
port->handler(task, msg);
}
}
}
fmsg_failed:
if (msg->port_msg.mmap && orig_b != b) {
/*
* To disable instant buffer completion,
* handler should reset 'msg->buf'.
*/
if (msg->buf == b) {
/* complete mmap buffers */
while (b != NULL) {
nxt_debug(task, "complete buffer %p", b);
nxt_work_queue_add(port->socket.read_work_queue,
b->completion_handler, task, b, b->parent);
next = b->next;
b->next = NULL;
b = next;
}
}
/* restore original buf */
msg->buf = orig_b;
}
}
static nxt_buf_t *
nxt_port_buf_alloc(nxt_port_t *port)
{
nxt_buf_t *b;
if (port->free_bufs != NULL) {
b = port->free_bufs;
port->free_bufs = b->next;
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->next = NULL;
} else {
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
}
return b;
}
static void
nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
{
nxt_buf_chain_add(&b, port->free_bufs);
port->free_bufs = b;
}
static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
int use_delta;
nxt_buf_t *b, *next;
nxt_port_t *port;
nxt_work_queue_t *wq;
nxt_port_send_msg_t *msg;
nxt_debug(task, "port error handler %p", obj);
/* TODO */
port = nxt_container_of(obj, nxt_port_t, socket);
use_delta = 0;
if (obj == data) {
use_delta--;
}
wq = &task->thread->engine->fast_work_queue;
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
if (msg->close_fd) {
if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
msg->fd[0] = -1;
}
if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd[1]);
msg->fd[1] = -1;
}
}
for (b = msg->buf; b != NULL; b = next) {
next = b->next;
b->next = NULL;
if (nxt_buf_is_sync(b)) {
continue;
}
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
}
nxt_queue_remove(&msg->link);
use_delta--;
nxt_port_release_send_msg(msg);
} nxt_queue_loop;
nxt_thread_mutex_unlock(&port->write_mutex);
if (use_delta != 0) {
nxt_port_use(task, port, use_delta);
}
}