483 lines
12 KiB
C
483 lines
12 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_main.h>
|
|
|
|
|
|
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
|
nxt_port_recv_msg_t *msg);
|
|
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
|
|
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
|
|
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
|
|
|
|
|
|
nxt_int_t
|
|
nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
|
|
{
|
|
nxt_mp_t *mp;
|
|
nxt_int_t sndbuf, rcvbuf, size;
|
|
nxt_socket_t snd, rcv;
|
|
|
|
port->socket.task = task;
|
|
|
|
port->pair[0] = -1;
|
|
port->pair[1] = -1;
|
|
|
|
nxt_queue_init(&port->messages);
|
|
|
|
mp = nxt_mp_create(1024, 128, 256, 32);
|
|
if (nxt_slow_path(mp == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
port->mem_pool = mp;
|
|
|
|
if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
|
|
goto socketpair_fail;
|
|
}
|
|
|
|
snd = port->pair[1];
|
|
|
|
sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
|
|
if (nxt_slow_path(sndbuf < 0)) {
|
|
goto getsockopt_fail;
|
|
}
|
|
|
|
rcv = port->pair[0];
|
|
|
|
rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
|
|
if (nxt_slow_path(rcvbuf < 0)) {
|
|
goto getsockopt_fail;
|
|
}
|
|
|
|
if (max_size == 0) {
|
|
max_size = 16 * 1024;
|
|
}
|
|
|
|
if ((size_t) sndbuf < max_size) {
|
|
/*
|
|
* On Unix domain sockets
|
|
* Linux uses 224K on both send and receive directions;
|
|
* FreeBSD, MacOSX, NetBSD, and OpenBSD use 2K buffer size
|
|
* on send direction and 4K buffer size on receive direction;
|
|
* Solaris uses 16K on send direction and 5K on receive direction.
|
|
*/
|
|
(void) nxt_socket_setsockopt(task, snd, SOL_SOCKET, SO_SNDBUF,
|
|
max_size);
|
|
|
|
sndbuf = nxt_socket_getsockopt(task, snd, SOL_SOCKET, SO_SNDBUF);
|
|
if (nxt_slow_path(sndbuf < 0)) {
|
|
goto getsockopt_fail;
|
|
}
|
|
|
|
size = sndbuf * 4;
|
|
|
|
if (rcvbuf < size) {
|
|
(void) nxt_socket_setsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF,
|
|
size);
|
|
|
|
rcvbuf = nxt_socket_getsockopt(task, rcv, SOL_SOCKET, SO_RCVBUF);
|
|
if (nxt_slow_path(rcvbuf < 0)) {
|
|
goto getsockopt_fail;
|
|
}
|
|
}
|
|
}
|
|
|
|
port->max_size = nxt_min(max_size, (size_t) sndbuf);
|
|
port->max_share = (64 * 1024);
|
|
|
|
return NXT_OK;
|
|
|
|
getsockopt_fail:
|
|
|
|
nxt_socket_close(task, port->pair[0]);
|
|
nxt_socket_close(task, port->pair[1]);
|
|
|
|
socketpair_fail:
|
|
|
|
nxt_mp_destroy(port->mem_pool);
|
|
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_destroy(nxt_port_t *port)
|
|
{
|
|
nxt_socket_close(port->socket.task, port->socket.fd);
|
|
nxt_mp_destroy(port->mem_pool);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
|
|
{
|
|
port->socket.fd = port->pair[1];
|
|
port->socket.log = &nxt_main_log;
|
|
port->socket.write_ready = 1;
|
|
|
|
port->socket.write_work_queue = &task->thread->engine->fast_work_queue;
|
|
port->socket.write_handler = nxt_port_write_handler;
|
|
port->socket.error_handler = nxt_port_error_handler;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_write_close(nxt_port_t *port)
|
|
{
|
|
nxt_socket_close(port->socket.task, port->pair[1]);
|
|
port->pair[1] = -1;
|
|
}
|
|
|
|
|
|
nxt_int_t
|
|
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
|
|
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
|
|
{
|
|
nxt_queue_link_t *link;
|
|
nxt_port_send_msg_t *msg;
|
|
|
|
for (link = nxt_queue_first(&port->messages);
|
|
link != nxt_queue_tail(&port->messages);
|
|
link = nxt_queue_next(link))
|
|
{
|
|
msg = (nxt_port_send_msg_t *) link;
|
|
|
|
if (msg->port_msg.stream == stream) {
|
|
/*
|
|
* An fd is ignored since a file descriptor
|
|
* must be sent only in the first message of a stream.
|
|
*/
|
|
nxt_buf_chain_add(&msg->buf, b);
|
|
|
|
return NXT_OK;
|
|
}
|
|
}
|
|
|
|
msg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_send_msg_t));
|
|
if (nxt_slow_path(msg == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
msg->buf = b;
|
|
msg->fd = fd;
|
|
msg->share = 0;
|
|
msg->mem_pool = port->mem_pool;
|
|
|
|
msg->port_msg.stream = stream;
|
|
msg->port_msg.pid = nxt_pid;
|
|
msg->port_msg.reply_port = reply_port;
|
|
msg->port_msg.type = type;
|
|
msg->port_msg.last = 0;
|
|
msg->port_msg.mmap = 0;
|
|
|
|
nxt_queue_insert_tail(&port->messages, &msg->link);
|
|
|
|
if (port->socket.write_ready) {
|
|
nxt_port_write_handler(task, port, NULL);
|
|
}
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
ssize_t n;
|
|
nxt_port_t *port;
|
|
struct iovec iov[NXT_IOBUF_MAX];
|
|
nxt_queue_link_t *link;
|
|
nxt_port_send_msg_t *msg;
|
|
nxt_sendbuf_coalesce_t sb;
|
|
nxt_port_method_t m;
|
|
|
|
size_t plain_size;
|
|
nxt_buf_t *plain_buf;
|
|
|
|
port = obj;
|
|
|
|
do {
|
|
link = nxt_queue_first(&port->messages);
|
|
|
|
if (link == nxt_queue_tail(&port->messages)) {
|
|
nxt_fd_event_block_write(task->thread->engine, &port->socket);
|
|
return;
|
|
}
|
|
|
|
msg = (nxt_port_send_msg_t *) link;
|
|
|
|
iov[0].iov_base = &msg->port_msg;
|
|
iov[0].iov_len = sizeof(nxt_port_msg_t);
|
|
|
|
sb.buf = msg->buf;
|
|
sb.iobuf = &iov[1];
|
|
sb.nmax = NXT_IOBUF_MAX - 1;
|
|
sb.sync = 0;
|
|
sb.last = 0;
|
|
sb.size = 0;
|
|
sb.limit = port->max_size;
|
|
|
|
m = nxt_port_mmap_get_method(task, port, msg->buf);
|
|
|
|
if (m == NXT_PORT_METHOD_MMAP) {
|
|
sb.limit = (1ULL << 31) - 1;
|
|
}
|
|
|
|
nxt_sendbuf_mem_coalesce(task, &sb);
|
|
|
|
plain_size = sb.size;
|
|
plain_buf = msg->buf;
|
|
|
|
/*
|
|
* Send through mmap enabled only when payload
|
|
* is bigger than PORT_MMAP_MIN_SIZE.
|
|
*/
|
|
if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
|
|
nxt_port_mmap_write(task, port, msg, &sb);
|
|
|
|
} else {
|
|
m = NXT_PORT_METHOD_PLAIN;
|
|
}
|
|
|
|
msg->port_msg.last = sb.last;
|
|
|
|
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
|
|
|
|
if (n > 0) {
|
|
if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
|
|
nxt_log(task, NXT_LOG_CRIT,
|
|
"port %d: short write: %z instead of %uz",
|
|
port->socket.fd, n, sb.size + iov[0].iov_len);
|
|
goto fail;
|
|
}
|
|
|
|
if (msg->buf != plain_buf) {
|
|
/*
|
|
* Complete crafted mmap_msgs buf and restore msg->buf
|
|
* for regular completion call.
|
|
*/
|
|
nxt_port_mmap_completion(task,
|
|
port->socket.write_work_queue,
|
|
msg->buf);
|
|
|
|
msg->buf = plain_buf;
|
|
}
|
|
|
|
msg->buf = nxt_sendbuf_completion(task,
|
|
port->socket.write_work_queue,
|
|
msg->buf,
|
|
plain_size);
|
|
|
|
if (msg->buf != NULL) {
|
|
/*
|
|
* A file descriptor is sent only
|
|
* in the first message of a stream.
|
|
*/
|
|
msg->fd = -1;
|
|
msg->share += n;
|
|
|
|
if (msg->share >= port->max_share) {
|
|
msg->share = 0;
|
|
nxt_queue_remove(link);
|
|
nxt_queue_insert_tail(&port->messages, link);
|
|
}
|
|
|
|
} else {
|
|
nxt_queue_remove(link);
|
|
nxt_mp_free(msg->mem_pool, msg);
|
|
}
|
|
|
|
} else if (nxt_slow_path(n == NXT_ERROR)) {
|
|
goto fail;
|
|
}
|
|
|
|
/* n == NXT_AGAIN */
|
|
|
|
} while (port->socket.write_ready);
|
|
|
|
if (nxt_fd_event_is_disabled(port->socket.write)) {
|
|
nxt_fd_event_enable_write(task->thread->engine, &port->socket);
|
|
}
|
|
|
|
return;
|
|
|
|
fail:
|
|
|
|
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
|
|
nxt_port_error_handler, task, &port->socket, NULL);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
|
|
{
|
|
port->socket.fd = port->pair[0];
|
|
port->socket.log = &nxt_main_log;
|
|
|
|
port->socket.read_work_queue = &task->thread->engine->fast_work_queue;
|
|
port->socket.read_handler = nxt_port_read_handler;
|
|
port->socket.error_handler = nxt_port_error_handler;
|
|
|
|
nxt_fd_event_enable_read(task->thread->engine, &port->socket);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_read_close(nxt_port_t *port)
|
|
{
|
|
nxt_socket_close(port->socket.task, port->pair[0]);
|
|
port->pair[0] = -1;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
ssize_t n;
|
|
nxt_buf_t *b;
|
|
nxt_port_t *port;
|
|
struct iovec iov[2];
|
|
nxt_port_recv_msg_t msg;
|
|
|
|
port = msg.port = obj;
|
|
|
|
for ( ;; ) {
|
|
|
|
b = nxt_port_buf_alloc(port);
|
|
|
|
if (nxt_slow_path(b == NULL)) {
|
|
/* TODO: disable event for some time */
|
|
}
|
|
|
|
iov[0].iov_base = &msg.port_msg;
|
|
iov[0].iov_len = sizeof(nxt_port_msg_t);
|
|
|
|
iov[1].iov_base = b->mem.pos;
|
|
iov[1].iov_len = port->max_size;
|
|
|
|
n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
|
|
|
|
if (n > 0) {
|
|
|
|
msg.buf = b;
|
|
msg.size = n;
|
|
|
|
nxt_port_read_msg_process(task, port, &msg);
|
|
|
|
if (b->mem.pos == b->mem.free) {
|
|
nxt_port_buf_free(port, b);
|
|
}
|
|
|
|
if (port->socket.read_ready) {
|
|
continue;
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
if (n == NXT_AGAIN) {
|
|
nxt_port_buf_free(port, b);
|
|
|
|
nxt_fd_event_enable_read(task->thread->engine, &port->socket);
|
|
return;
|
|
}
|
|
|
|
/* n == 0 || n == NXT_ERROR */
|
|
|
|
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
|
|
nxt_port_error_handler, task, &port->socket, NULL);
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
|
nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_buf_t *orig_b;
|
|
|
|
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
|
|
nxt_log(task, NXT_LOG_CRIT,
|
|
"port %d: too small message:%uz", port->socket.fd, msg->size);
|
|
goto fail;
|
|
}
|
|
|
|
/* adjust size to actual buffer used size */
|
|
msg->size -= sizeof(nxt_port_msg_t);
|
|
|
|
b = orig_b = msg->buf;
|
|
b->mem.free += msg->size;
|
|
|
|
if (msg->port_msg.mmap) {
|
|
nxt_port_mmap_read(task, port, msg);
|
|
b = msg->buf;
|
|
}
|
|
|
|
port->handler(task, msg);
|
|
|
|
if (msg->port_msg.mmap && orig_b != b) {
|
|
/* complete used mmap buffers */
|
|
for (; b && nxt_buf_used_size(b) == 0;
|
|
b = b->next) {
|
|
nxt_debug(task, "complete buffer %p", b);
|
|
|
|
nxt_work_queue_add(port->socket.read_work_queue,
|
|
b->completion_handler, task, b, b->parent);
|
|
}
|
|
}
|
|
|
|
return;
|
|
|
|
fail:
|
|
|
|
if (msg->fd != -1) {
|
|
nxt_fd_close(msg->fd);
|
|
}
|
|
}
|
|
|
|
|
|
static nxt_buf_t *
|
|
nxt_port_buf_alloc(nxt_port_t *port)
|
|
{
|
|
nxt_buf_t *b;
|
|
|
|
if (port->free_bufs != NULL) {
|
|
b = port->free_bufs;
|
|
port->free_bufs = b->next;
|
|
|
|
b->mem.pos = b->mem.start;
|
|
b->mem.free = b->mem.start;
|
|
b->next = NULL;
|
|
} else {
|
|
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
|
|
if (nxt_slow_path(b == NULL)) {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return b;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b)
|
|
{
|
|
b->next = port->free_bufs;
|
|
port->free_bufs = b;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
/* TODO */
|
|
}
|