Introducing application and port shared memory queues.

The goal is to minimize the number of syscalls needed to deliver a message.
This commit is contained in:
Max Romanov
2020-08-11 19:20:34 +03:00
parent a82cf4ffb6
commit e227fc9e62
11 changed files with 1802 additions and 320 deletions

View File

@@ -5,6 +5,7 @@
*/
#include <nxt_main.h>
#include <nxt_port_queue.h>
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
@@ -17,6 +18,8 @@ static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
nxt_port_send_msg_t *msg);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_queue_read_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
@@ -143,12 +146,15 @@ nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
nxt_int_t
nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
void *tracking)
nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, nxt_fd_t fd2, uint32_t stream, nxt_port_id_t reply_port,
nxt_buf_t *b)
{
int notify;
uint8_t *p;
nxt_int_t res;
nxt_port_send_msg_t msg;
uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -156,14 +162,10 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = b;
msg.share = 0;
msg.fd = fd;
msg.fd2 = -1;
msg.fd2 = fd2;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.allocated = 0;
if (tracking != NULL) {
nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
}
msg.port_msg.stream = stream;
msg.port_msg.pid = nxt_pid;
msg.port_msg.reply_port = reply_port;
@@ -172,7 +174,42 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.mmap = 0;
msg.port_msg.nf = 0;
msg.port_msg.mf = 0;
msg.port_msg.tracking = tracking != NULL;
if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
if (fd == -1
&& (b == NULL
|| nxt_buf_mem_used_size(&b->mem)
<= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
{
p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
if (b != NULL) {
p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
}
res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) (p - qmsg), notify, res);
if (notify == 0) {
return res;
}
msg.port_msg.type = _NXT_PORT_MSG_READ_QUEUE;
msg.buf = NULL;
} else {
qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
notify, res);
}
}
res = nxt_port_msg_chk_insert(task, port, &msg);
if (nxt_fast_path(res == NXT_DECLINED)) {
@@ -308,10 +345,6 @@ next_fragment:
port->max_size / PORT_MMAP_MIN_SIZE);
}
if (msg->port_msg.tracking) {
iov[0].iov_len += sizeof(msg->tracking_msg);
}
sb.limit -= iov[0].iov_len;
nxt_sendbuf_mem_coalesce(task, &sb);
@@ -368,7 +401,6 @@ next_fragment:
msg->fd2 = -1;
msg->share += n;
msg->port_msg.nf = 1;
msg->port_msg.tracking = 0;
if (msg->share >= port->max_share) {
msg->share = 0;
@@ -576,7 +608,9 @@ nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
port->engine = task->thread->engine;
port->socket.read_work_queue = &port->engine->fast_work_queue;
port->socket.read_handler = nxt_port_read_handler;
port->socket.read_handler = port->queue != NULL
? nxt_port_queue_read_handler
: nxt_port_read_handler;
port->socket.error_handler = nxt_port_error_handler;
nxt_fd_event_enable_read(port->engine, &port->socket);
@@ -660,6 +694,206 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
}
static void
nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_buf_t *b;
nxt_port_t *port;
struct iovec iov[2];
nxt_port_queue_t *queue;
nxt_port_recv_msg_t msg, *smsg;
uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
port = nxt_container_of(obj, nxt_port_t, socket);
msg.port = port;
nxt_assert(port->engine == task->thread->engine);
queue = port->queue;
nxt_atomic_fetch_add(&queue->nitems, 1);
for ( ;; ) {
if (port->from_socket == 0) {
n = nxt_port_queue_recv(queue, qmsg);
if (n < 0 && !port->socket.read_ready) {
nxt_atomic_fetch_add(&queue->nitems, -1);
n = nxt_port_queue_recv(queue, qmsg);
if (n < 0) {
return;
}
nxt_atomic_fetch_add(&queue->nitems, 1);
}
if (n == 1 && qmsg[0] == _NXT_PORT_MSG_READ_SOCKET) {
port->from_socket++;
nxt_debug(task, "port{%d,%d} %d: dequeue 1 read_socket %d",
(int) port->pid, (int) port->id, port->socket.fd,
port->from_socket);
n = -1;
continue;
}
nxt_debug(task, "port{%d,%d} %d: dequeue %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
} else {
if ((smsg = port->socket_msg) != NULL && smsg->size != 0) {
msg.port_msg = smsg->port_msg;
b = smsg->buf;
n = smsg->size;
msg.fd = smsg->fd;
msg.fd2 = smsg->fd2;
smsg->size = 0;
port->from_socket--;
nxt_debug(task, "port{%d,%d} %d: use suspended message %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
goto process;
}
n = -1;
}
if (n < 0 && !port->socket.read_ready) {
nxt_atomic_fetch_add(&queue->nitems, -1);
return;
}
b = nxt_port_buf_alloc(port);
if (nxt_slow_path(b == NULL)) {
/* TODO: disable event for some time */
}
if (n >= (ssize_t) sizeof(nxt_port_msg_t)) {
nxt_memcpy(&msg.port_msg, qmsg, sizeof(nxt_port_msg_t));
if (n > (ssize_t) sizeof(nxt_port_msg_t)) {
nxt_memcpy(b->mem.pos, qmsg + sizeof(nxt_port_msg_t),
n - sizeof(nxt_port_msg_t));
}
} else {
iov[0].iov_base = &msg.port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
if (n == (ssize_t) sizeof(nxt_port_msg_t)
&& msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
{
nxt_port_buf_free(port, b);
nxt_debug(task, "port{%d,%d} %d: recv %d read_queue",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
continue;
}
nxt_debug(task, "port{%d,%d} %d: recvmsg %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
if (n > 0) {
if (port->from_socket == 0) {
nxt_debug(task, "port{%d,%d} %d: suspend message %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) n);
smsg = port->socket_msg;
if (nxt_slow_path(smsg == NULL)) {
smsg = nxt_mp_alloc(port->mem_pool,
sizeof(nxt_port_recv_msg_t));
if (nxt_slow_path(smsg == NULL)) {
nxt_alert(task, "port{%d,%d} %d: suspend message "
"failed",
(int) port->pid, (int) port->id,
port->socket.fd);
return;
}
port->socket_msg = smsg;
} else {
if (nxt_slow_path(smsg->size != 0)) {
nxt_alert(task, "port{%d,%d} %d: too many suspend "
"messages",
(int) port->pid, (int) port->id,
port->socket.fd);
return;
}
}
smsg->port_msg = msg.port_msg;
smsg->buf = b;
smsg->size = n;
smsg->fd = msg.fd;
smsg->fd2 = msg.fd2;
continue;
}
port->from_socket--;
}
}
process:
if (n > 0) {
msg.buf = b;
msg.size = n;
nxt_port_read_msg_process(task, port, &msg);
/*
* To disable instant completion or buffer re-usage,
* handler should reset 'msg.buf'.
*/
if (msg.buf == b) {
nxt_port_buf_free(port, b);
}
continue;
}
if (n == NXT_AGAIN) {
nxt_port_buf_free(port, b);
nxt_fd_event_enable_read(task->thread->engine, &port->socket);
continue;
}
/* n == 0 || n == NXT_ERROR */
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_port_error_handler, task, &port->socket, NULL);
return;
}
}
typedef struct {
uint32_t stream;
uint32_t pid;
@@ -831,12 +1065,7 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
b = orig_b = msg->buf;
b->mem.free += msg->size;
if (msg->port_msg.tracking) {
msg->cancelled = nxt_port_mmap_tracking_read(task, msg) == 0;
} else {
msg->cancelled = 0;
}
msg->cancelled = 0;
if (nxt_slow_path(msg->port_msg.nf != 0)) {