Fixing multi-thread port write racing conditions.

This commit is contained in:
Max Romanov
2019-08-16 00:48:11 +03:00
parent 4d7576d323
commit caea9d3c07
4 changed files with 172 additions and 166 deletions

View File

@@ -134,11 +134,10 @@ typedef struct {
nxt_buf_t *buf;
size_t share;
nxt_fd_t fd;
nxt_bool_t close_fd;
nxt_port_msg_t port_msg;
uint32_t tracking_msg[2];
nxt_work_t work;
uint8_t close_fd; /* 1 bit */
uint8_t allocated; /* 1 bit */
} nxt_port_send_msg_t;
@@ -202,9 +201,6 @@ struct nxt_port_s {
nxt_atomic_t use_count;
nxt_process_type_t type;
struct iovec *iov;
void *mmsg_buf;
};

View File

@@ -798,7 +798,7 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
void
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb)
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf)
{
size_t bsize;
nxt_buf_t *bmem;
@@ -811,7 +811,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
"via shared memory", sb->size, port->pid);
bsize = sb->niov * sizeof(nxt_port_mmap_msg_t);
mmap_msg = port->mmsg_buf;
mmap_msg = mmsg_buf;
bmem = msg->buf;
@@ -841,7 +841,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
port->pid);
}
sb->iobuf[0].iov_base = port->mmsg_buf;
sb->iobuf[0].iov_base = mmsg_buf;
sb->iobuf[0].iov_len = bsize;
sb->niov = 1;
sb->size = bsize;

View File

@@ -55,7 +55,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
void
nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb);
nxt_port_send_msg_t *msg, nxt_sendbuf_coalesce_t *sb, void *mmsg_buf);
void
nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg);

View File

@@ -7,9 +7,15 @@
#include <nxt_main.h>
static nxt_int_t nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg);
static nxt_port_send_msg_t *nxt_port_msg_alloc(nxt_port_send_msg_t *m);
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static nxt_port_send_msg_t *nxt_port_msg_first(nxt_port_t *port);
static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
static nxt_port_send_msg_t *nxt_port_msg_insert_tail(nxt_port_t *port,
nxt_port_send_msg_t *msg);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg);
@@ -116,13 +122,6 @@ nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port)
port->socket.write_work_queue = &port->engine->fast_work_queue;
port->socket.write_handler = nxt_port_write_handler;
port->socket.error_handler = nxt_port_error_handler;
if (port->iov == NULL) {
port->iov = nxt_mp_get(port->mem_pool,
sizeof(struct iovec) * NXT_IOBUF_MAX * 10);
port->mmsg_buf = nxt_mp_get(port->mem_pool,
sizeof(uint32_t) * 3 * NXT_IOBUF_MAX * 10);
}
}
@@ -135,109 +134,11 @@ nxt_port_write_close(nxt_port_t *port)
static void
nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
nxt_port_release_send_msg(nxt_port_send_msg_t *msg)
{
nxt_event_engine_t *engine;
nxt_port_send_msg_t *msg;
msg = obj;
engine = data;
nxt_assert(data == msg->work.data);
if (engine != task->thread->engine) {
nxt_debug(task, "current thread is %PT, expected %PT",
task->thread->tid, engine->task.thread->tid);
nxt_event_engine_post(engine, &msg->work);
return;
if (msg->allocated) {
nxt_free(msg);
}
nxt_mp_free(engine->mem_pool, obj);
nxt_mp_release(engine->mem_pool);
}
static nxt_port_send_msg_t *
nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
{
nxt_mp_t *mp;
nxt_port_send_msg_t *msg;
mp = task->thread->engine->mem_pool;
msg = nxt_mp_alloc(mp, sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
return NULL;
}
nxt_mp_retain(mp);
msg->link.next = NULL;
msg->link.prev = NULL;
msg->buf = m->buf;
msg->share = m->share;
msg->fd = m->fd;
msg->close_fd = m->close_fd;
msg->port_msg = m->port_msg;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
msg->work.task = task;
msg->work.obj = msg;
msg->work.data = task->thread->engine;
return msg;
}
static nxt_port_send_msg_t *
nxt_port_msg_insert_head(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)
{
if (msg->work.data == NULL) {
msg = nxt_port_msg_create(task, msg);
}
if (msg != NULL) {
nxt_queue_insert_head(&port->messages, &msg->link);
}
return msg;
}
static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)
{
if (msg->work.data == NULL) {
msg = nxt_port_msg_create(task, msg);
}
if (msg != NULL) {
nxt_queue_insert_tail(&port->messages, &msg->link);
}
return msg;
}
static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
{
nxt_queue_link_t *lnk;
lnk = nxt_queue_first(&port->messages);
if (lnk == nxt_queue_tail(&port->messages)) {
return msg;
}
return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
}
@@ -246,15 +147,17 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b,
void *tracking)
{
nxt_port_send_msg_t msg, *res;
nxt_int_t res;
nxt_port_send_msg_t msg;
msg.link.next = NULL;
msg.link.prev = NULL;
msg.buf = b;
msg.share = 0;
msg.fd = fd;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.share = 0;
msg.allocated = 0;
if (tracking != NULL) {
nxt_port_mmap_tracking_write(msg.tracking_msg, tracking);
@@ -270,25 +173,63 @@ nxt_port_socket_twrite(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.port_msg.mf = 0;
msg.port_msg.tracking = tracking != NULL;
msg.work.data = NULL;
if (port->socket.write_ready) {
res = nxt_port_msg_chk_insert(task, port, &msg);
if (nxt_fast_path(res == NXT_DECLINED)) {
nxt_port_write_handler(task, &port->socket, &msg);
} else {
res = NXT_OK;
}
return res;
}
static nxt_int_t
nxt_port_msg_chk_insert(nxt_task_t *task, nxt_port_t *port,
nxt_port_send_msg_t *msg)
{
nxt_int_t res;
nxt_thread_mutex_lock(&port->write_mutex);
res = nxt_port_msg_insert_tail(task, port, &msg);
if (nxt_fast_path(port->socket.write_ready
&& nxt_queue_is_empty(&port->messages)))
{
res = NXT_DECLINED;
} else {
msg = nxt_port_msg_alloc(msg);
if (nxt_fast_path(msg != NULL)) {
nxt_queue_insert_tail(&port->messages, &msg->link);
nxt_port_use(task, port, 1);
res = NXT_OK;
} else {
res = NXT_ERROR;
}
}
nxt_thread_mutex_unlock(&port->write_mutex);
if (res == NULL) {
return NXT_ERROR;
return res;
}
static nxt_port_send_msg_t *
nxt_port_msg_alloc(nxt_port_send_msg_t *m)
{
nxt_port_send_msg_t *msg;
msg = nxt_malloc(sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
return NULL;
}
nxt_port_use(task, port, 1);
}
*msg = *m;
return NXT_OK;
msg->allocated = 1;
return msg;
}
@@ -312,9 +253,10 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
int use_delta;
size_t plain_size;
ssize_t n;
uint32_t mmsg_buf[3 * NXT_IOBUF_MAX * 10];
nxt_bool_t block_write, enable_write;
nxt_port_t *port;
struct iovec *iov;
struct iovec iov[NXT_IOBUF_MAX * 10];
nxt_work_queue_t *wq;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
@@ -326,19 +268,22 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
enable_write = 0;
use_delta = 0;
nxt_thread_mutex_lock(&port->write_mutex);
iov = port->iov;
wq = &task->thread->engine->fast_work_queue;
do {
msg = nxt_port_msg_first(task, port, data);
if (data) {
msg = data;
} else {
msg = nxt_port_msg_first(port);
if (msg == NULL) {
block_write = 1;
goto unlock_mutex;
goto cleanup;
}
}
next_fragment:
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
@@ -377,7 +322,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
* is bigger than PORT_MMAP_MIN_SIZE.
*/
if (m == NXT_PORT_METHOD_MMAP && plain_size > PORT_MMAP_MIN_SIZE) {
nxt_port_mmap_write(task, port, msg, &sb);
nxt_port_mmap_write(task, port, msg, &sb, mmsg_buf);
} else {
m = NXT_PORT_METHOD_PLAIN;
@@ -421,36 +366,58 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->share = 0;
if (msg->link.next != NULL) {
nxt_queue_remove(&msg->link);
use_delta--;
}
data = NULL;
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_remove(&msg->link);
nxt_queue_insert_tail(&port->messages, &msg->link);
nxt_thread_mutex_unlock(&port->write_mutex);
} else {
msg = nxt_port_msg_insert_tail(port, msg);
if (nxt_slow_path(msg == NULL)) {
goto fail;
}
if (nxt_port_msg_insert_tail(task, port, msg) != NULL) {
use_delta++;
}
} else {
goto next_fragment;
}
} else {
if (msg->link.next != NULL) {
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_remove(&msg->link);
msg->link.next = NULL;
nxt_thread_mutex_unlock(&port->write_mutex);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->work.data);
}
data = NULL;
nxt_port_release_send_msg(msg);
}
if (data != NULL) {
goto cleanup;
}
} else {
if (msg->link.next == NULL) {
if (nxt_port_msg_insert_head(task, port, msg) != NULL) {
use_delta++;
}
}
if (nxt_slow_path(n == NXT_ERROR)) {
goto fail;
}
if (msg->link.next == NULL) {
msg = nxt_port_msg_insert_tail(port, msg);
if (nxt_slow_path(msg == NULL)) {
goto fail;
}
use_delta++;
}
}
} while (port->socket.write_ready);
@@ -459,7 +426,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
enable_write = 1;
}
goto unlock_mutex;
goto cleanup;
fail:
@@ -468,8 +435,7 @@ fail:
nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
&port->socket);
unlock_mutex:
nxt_thread_mutex_unlock(&port->write_mutex);
cleanup:
if (block_write && nxt_fd_event_is_active(port->socket.write)) {
nxt_port_post(task, port, nxt_port_fd_block_write, NULL);
@@ -485,6 +451,29 @@ unlock_mutex:
}
static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_port_t *port)
{
nxt_queue_link_t *lnk;
nxt_port_send_msg_t *msg;
nxt_thread_mutex_lock(&port->write_mutex);
lnk = nxt_queue_first(&port->messages);
if (lnk == nxt_queue_tail(&port->messages)) {
msg = NULL;
} else {
msg = nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
}
nxt_thread_mutex_unlock(&port->write_mutex);
return msg;
}
static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
@@ -546,6 +535,27 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
}
static nxt_port_send_msg_t *
nxt_port_msg_insert_tail(nxt_port_t *port, nxt_port_send_msg_t *msg)
{
if (msg->allocated == 0) {
msg = nxt_port_msg_alloc(msg);
if (nxt_slow_path(msg == NULL)) {
return NULL;
}
}
nxt_thread_mutex_lock(&port->write_mutex);
nxt_queue_insert_tail(&port->messages, &msg->link);
nxt_thread_mutex_unlock(&port->write_mutex);
return msg;
}
void
nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
{
@@ -986,8 +996,8 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&msg->link);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->work.data);
nxt_port_release_send_msg(msg);
} nxt_queue_loop;