Using shared memory to send data via nxt_port.

Usage:
    b = nxt_port_mmap_get_buf(task, port, size);
    b->mem.free = nxt_cpymem(b->mem.free, data, size);
    nxt_port_socket_write(task, port, NXT_PORT_MSG_DATA, -1, 0, b);
This commit is contained in:
Max Romanov
2017-05-12 20:32:41 +03:00
parent 1782c771fa
commit f7b4bdfd89
19 changed files with 1506 additions and 186 deletions

View File

@@ -10,7 +10,7 @@
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size);
nxt_port_recv_msg_t *msg, size_t size);
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
@@ -137,7 +137,7 @@ nxt_port_write_close(nxt_port_t *port)
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
{
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
@@ -169,8 +169,11 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->share = 0;
msg->port_msg.stream = stream;
msg->port_msg.pid = nxt_pid;
msg->port_msg.reply_port = reply_port;
msg->port_msg.type = type;
msg->port_msg.last = 0;
msg->port_msg.mmap = 0;
nxt_queue_insert_tail(&port->messages, &msg->link);
@@ -186,12 +189,15 @@ static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_uint_t niov;
nxt_port_t *port;
struct iovec iov[NXT_IOBUF_MAX];
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
nxt_port_method_t m;
size_t plain_size;
nxt_buf_t *plain_buf;
port = obj;
@@ -213,27 +219,59 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
sb.nmax = NXT_IOBUF_MAX - 1;
sb.sync = 0;
sb.last = 0;
sb.size = sizeof(nxt_port_msg_t);
sb.size = 0;
sb.limit = port->max_size;
niov = nxt_sendbuf_mem_coalesce(task, &sb);
m = nxt_port_mmap_get_method(task, port, msg->buf);
if (m == NXT_PORT_METHOD_MMAP) {
sb.limit = (1ULL << 31) - 1;
}
nxt_sendbuf_mem_coalesce(task, &sb);
plain_size = sb.size;
plain_buf = msg->buf;
/*
* Send through mmap enabled only when payload
* is bigger than PORT_MMAP_MIN_SIZE.
*/
if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
nxt_port_mmap_write(task, port, msg, &sb);
} else {
m = NXT_PORT_METHOD_PLAIN;
}
msg->port_msg.last = sb.last;
n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1);
n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
if (n > 0) {
if (nxt_slow_path((size_t) n != sb.size)) {
if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
nxt_log(task, NXT_LOG_CRIT,
"port %d: short write: %z instead of %uz",
port->socket.fd, n, sb.size);
port->socket.fd, n, sb.size + iov[0].iov_len);
goto fail;
}
if (msg->buf != plain_buf) {
/*
* Complete crafted mmap_msgs buf and restore msg->buf
* for regular completion call.
*/
nxt_port_mmap_completion(task,
port->socket.write_work_queue,
msg->buf);
msg->buf = plain_buf;
}
msg->buf = nxt_sendbuf_completion(task,
port->socket.write_work_queue,
msg->buf,
n - sizeof(nxt_port_msg_t));
plain_size);
if (msg->buf != NULL) {
/*
@@ -301,14 +339,13 @@ nxt_port_read_close(nxt_port_t *port)
static void
nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_t *port;
struct iovec iov[2];
nxt_port_msg_t msg;
ssize_t n;
nxt_buf_t *b;
nxt_port_t *port;
struct iovec iov[2];
nxt_port_recv_msg_t msg;
port = obj;
port = msg.port = obj;
for ( ;; ) {
@@ -318,24 +355,21 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
/* TODO: disable event for some time */
}
iov[0].iov_base = &msg;
iov[0].iov_base = &msg.port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, &fd, iov, 2);
n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2);
if (n > 0) {
nxt_port_read_msg_process(task, port, &msg, fd, b, n);
msg.buf = b;
nxt_port_read_msg_process(task, port, &msg, n);
if (b->mem.pos == b->mem.free) {
if (b->next != NULL) {
/* A sync buffer */
nxt_buf_free(port->mem_pool, b->next);
}
nxt_port_buf_free(port, b);
}
@@ -364,10 +398,11 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_msg_t *msg, nxt_fd_t fd, nxt_buf_t *b, size_t size)
nxt_port_recv_msg_t *msg, size_t size)
{
nxt_buf_t *sync;
nxt_port_recv_msg_t recv_msg;
nxt_buf_t *b;
nxt_buf_t *orig_b;
nxt_buf_t **last_next;
if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
nxt_log(port->socket.task, NXT_LOG_CRIT,
@@ -375,31 +410,56 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
goto fail;
}
recv_msg.stream = msg->stream;
recv_msg.type = msg->type;
recv_msg.fd = fd;
recv_msg.buf = b;
recv_msg.port = port;
/* adjust size to actual buffer used size */
size -= sizeof(nxt_port_msg_t);
b->mem.free += size - sizeof(nxt_port_msg_t);
b = orig_b = msg->buf;
b->mem.free += size;
if (msg->last) {
sync = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(sync == NULL)) {
goto fail;
}
b->next = sync;
if (msg->port_msg.mmap) {
nxt_port_mmap_read(task, port, msg, size);
b = msg->buf;
}
port->handler(task, &recv_msg);
last_next = &b->next;
if (msg->port_msg.last) {
/* find reference to last next, the NULL one */
while (*last_next) {
last_next = &(*last_next)->next;
}
*last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(*last_next == NULL)) {
goto fail;
}
}
port->handler(task, msg);
if (*last_next != NULL) {
/* A sync buffer */
nxt_buf_free(port->mem_pool, *last_next);
*last_next = NULL;
}
if (orig_b != b) {
/* complete mmap buffers */
for (; b && nxt_buf_used_size(b) == 0;
b = b->next) {
nxt_debug(task, "complete buffer %p", b);
nxt_work_queue_add(port->socket.read_work_queue,
b->completion_handler, task, b, b->parent);
}
}
return;
fail:
if (fd != -1) {
nxt_fd_close(fd);
if (msg->fd != -1) {
nxt_fd_close(msg->fd);
}
}
@@ -415,6 +475,7 @@ nxt_port_buf_alloc(nxt_port_t *port)
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->next = NULL;
} else {
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);