Fixing shm buffer leakage when sending over the port queue.

When the shm buffer is sent over the port queue, it needs to be completed
because it's sent over the port socket.
This commit is contained in:
Max Romanov
2021-03-25 16:55:16 +03:00
parent 067c6096e2
commit b8052b050e
2 changed files with 91 additions and 13 deletions

View File

@@ -36,6 +36,13 @@ appeared in 1.6.
</para>
</change>
<change type="bugfix">
<para>
a descriptor and memory leak occurring in the router process when processing
small WebSocket frames from a client; the bug had appeared in 1.19.0.
</para>
</change>
<change type="bugfix">
<para>
a descriptor leak occurring in the router process when removing or

View File

@@ -6,8 +6,16 @@
#include <nxt_main.h>
#include <nxt_port_queue.h>
#include <nxt_port_memory_int.h>
#define NXT_PORT_MAX_ENQUEUE_BUF_SIZE \
(int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))
static nxt_bool_t nxt_port_can_enqueue_buf(nxt_buf_t *b);
static uint8_t nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm,
void *qbuf, nxt_buf_t *b);
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg);
static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
@@ -151,10 +159,13 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_buf_t *b)
{
int notify;
uint8_t *p;
uint8_t qmsg_size;
nxt_int_t res;
nxt_port_send_msg_t msg;
uint8_t qmsg[NXT_PORT_QUEUE_MSG_SIZE];
struct {
nxt_port_msg_t pm;
uint8_t buf[NXT_PORT_MAX_ENQUEUE_BUF_SIZE];
} qmsg;
msg.link.next = NULL;
msg.link.prev = NULL;
@@ -177,21 +188,31 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
if (port->queue != NULL && type != _NXT_PORT_MSG_READ_QUEUE) {
if (fd == -1
&& (b == NULL
|| nxt_buf_mem_used_size(&b->mem)
<= (int) (NXT_PORT_QUEUE_MSG_SIZE - sizeof(nxt_port_msg_t))))
{
p = nxt_cpymem(qmsg, &msg.port_msg, sizeof(nxt_port_msg_t));
if (fd == -1 && nxt_port_can_enqueue_buf(b)) {
qmsg.pm = msg.port_msg;
qmsg_size = sizeof(qmsg.pm);
if (b != NULL) {
p = nxt_cpymem(p, b->mem.pos, nxt_buf_mem_used_size(&b->mem));
qmsg_size += nxt_port_enqueue_buf(task, &qmsg.pm, qmsg.buf, b);
}
res = nxt_port_queue_send(port->queue, qmsg, p - qmsg, &notify);
res = nxt_port_queue_send(port->queue, &qmsg, qmsg_size, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue %d notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
(int) (p - qmsg), notify, res);
(int) qmsg_size, notify, res);
if (b != NULL && nxt_fast_path(res == NXT_OK)) {
if (qmsg.pm.mmap) {
b->is_port_mmap_sent = 1;
}
b->mem.pos = b->mem.free;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
}
if (notify == 0) {
return res;
@@ -201,9 +222,9 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = NULL;
} else {
qmsg[0] = _NXT_PORT_MSG_READ_SOCKET;
qmsg.buf[0] = _NXT_PORT_MSG_READ_SOCKET;
res = nxt_port_queue_send(port->queue, qmsg, 1, &notify);
res = nxt_port_queue_send(port->queue, qmsg.buf, 1, &notify);
nxt_debug(task, "port{%d,%d} %d: enqueue 1 notify %d, %d",
(int) port->pid, (int) port->id, port->socket.fd,
@@ -225,6 +246,56 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
}
static nxt_bool_t
nxt_port_can_enqueue_buf(nxt_buf_t *b)
{
if (b == NULL) {
return 1;
}
if (b->next != NULL) {
return 0;
}
return (nxt_buf_mem_used_size(&b->mem) <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE
|| nxt_buf_is_port_mmap(b));
}
static uint8_t
nxt_port_enqueue_buf(nxt_task_t *task, nxt_port_msg_t *pm, void *qbuf,
nxt_buf_t *b)
{
ssize_t size;
nxt_port_mmap_msg_t *mm;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
size = nxt_buf_mem_used_size(&b->mem);
if (size <= NXT_PORT_MAX_ENQUEUE_BUF_SIZE) {
nxt_memcpy(qbuf, b->mem.pos, size);
return size;
}
mmap_handler = b->parent;
hdr = mmap_handler->hdr;
mm = qbuf;
mm->mmap_id = hdr->id;
mm->chunk_id = nxt_port_mmap_chunk_id(hdr, b->mem.pos);
mm->size = nxt_buf_mem_used_size(&b->mem);
pm->mmap = 1;
nxt_debug(task, "mmap_msg={%D, %D, %D}", mm->mmap_id, mm->chunk_id,
mm->size);
return sizeof(nxt_port_mmap_msg_t);
}
static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)