Optimized send message allocations.

For empty write queue cases, it is possible to avoid allocation and enqueue
send message structures.  Send message initialized on stack and passed to
write handler.  If immediate write fails, send message allocated from engine
pool and enqueued.
This commit is contained in:
Max Romanov
2017-10-04 14:59:35 +03:00
parent 6a64533fa3
commit ebbe89bd5c
2 changed files with 103 additions and 43 deletions

View File

@@ -118,8 +118,6 @@ typedef struct {
nxt_port_msg_t port_msg;
nxt_work_t work;
nxt_event_engine_t *engine;
nxt_mp_t *mem_pool;
} nxt_port_send_msg_t;

View File

@@ -142,9 +142,9 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
engine = data;
#if (NXT_DEBUG)
if (nxt_slow_path(data != msg->engine)) {
if (nxt_slow_path(data != msg->work.data)) {
nxt_log_alert(task->log, "release msg data (%p) != msg->engine (%p)",
data, msg->engine);
data, msg->work.data);
nxt_abort();
}
#endif
@@ -159,29 +159,28 @@ nxt_port_release_send_msg(nxt_task_t *task, void *obj, void *data)
return;
}
nxt_mp_release(msg->mem_pool, obj);
nxt_mp_release(engine->mem_pool, obj);
}
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
static nxt_port_send_msg_t *
nxt_port_msg_create(nxt_task_t *task, nxt_port_send_msg_t *m)
{
nxt_port_send_msg_t *msg;
msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) {
return NXT_ERROR;
return NULL;
}
msg->link.next = NULL;
msg->link.prev = NULL;
msg->buf = b;
msg->fd = fd;
msg->close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg->share = 0;
msg->buf = m->buf;
msg->fd = m->fd;
msg->close_fd = m->close_fd;
msg->port_msg = m->port_msg;
msg->work.next = NULL;
msg->work.handler = nxt_port_release_send_msg;
@@ -189,26 +188,77 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->work.obj = msg;
msg->work.data = task->thread->engine;
msg->engine = task->thread->engine;
msg->mem_pool = msg->engine->mem_pool;
return msg;
}
msg->port_msg.stream = stream;
msg->port_msg.pid = nxt_pid;
msg->port_msg.reply_port = reply_port;
msg->port_msg.type = type & NXT_PORT_MSG_MASK;
msg->port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg->port_msg.mmap = 0;
nxt_thread_mutex_lock(&port->write_mutex);
static nxt_port_send_msg_t *
nxt_port_msg_push(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
{
if (msg->work.data == NULL) {
msg = nxt_port_msg_create(task, msg);
}
nxt_queue_insert_tail(&port->messages, &msg->link);
if (msg != NULL) {
nxt_queue_insert_tail(&port->messages, &msg->link);
}
nxt_thread_mutex_unlock(&port->write_mutex);
return msg;
}
nxt_port_use(task, port, 1);
static nxt_port_send_msg_t *
nxt_port_msg_first(nxt_task_t *task, nxt_port_t *port, nxt_port_send_msg_t *msg)
{
nxt_queue_link_t *lnk;
lnk = nxt_queue_first(&port->messages);
if (lnk == nxt_queue_tail(&port->messages)) {
return msg;
}
return nxt_queue_link_data(lnk, nxt_port_send_msg_t, link);
}
nxt_int_t
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_port_id_t reply_port, nxt_buf_t *b)
{
nxt_port_send_msg_t msg, *res;
msg.link.next = NULL;
msg.link.prev = NULL;
msg.buf = b;
msg.fd = fd;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.share = 0;
msg.port_msg.stream = stream;
msg.port_msg.pid = nxt_pid;
msg.port_msg.reply_port = reply_port;
msg.port_msg.type = type & NXT_PORT_MSG_MASK;
msg.port_msg.last = (type & NXT_PORT_MSG_LAST) != 0;
msg.port_msg.mmap = 0;
msg.work.data = NULL;
if (port->socket.write_ready) {
nxt_port_write_handler(task, &port->socket, NULL);
nxt_port_write_handler(task, &port->socket, &msg);
} else {
nxt_thread_mutex_lock(&port->write_mutex);
res = nxt_port_msg_push(task, port, &msg);
nxt_thread_mutex_unlock(&port->write_mutex);
if (res == NULL) {
return NXT_ERROR;
}
nxt_port_use(task, port, 1);
}
return NXT_OK;
@@ -239,7 +289,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
nxt_port_t *port;
struct iovec *iov;
nxt_work_queue_t *wq;
nxt_queue_link_t *link;
nxt_port_method_t m;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
@@ -254,16 +303,16 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
iov = port->iov;
do {
link = nxt_queue_first(&port->messages);
wq = &task->thread->engine->fast_work_queue;
if (link == nxt_queue_tail(&port->messages)) {
do {
msg = nxt_port_msg_first(task, port, data);
if (msg == NULL) {
block_write = 1;
goto unlock_mutex;
}
msg = nxt_queue_link_data(link, nxt_port_send_msg_t, link);
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
@@ -315,8 +364,6 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->fd = -1;
}
wq = &task->thread->engine->fast_work_queue;
msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size,
m == NXT_PORT_METHOD_MMAP);
@@ -330,18 +377,34 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
if (msg->share >= port->max_share) {
msg->share = 0;
nxt_queue_remove(link);
nxt_queue_insert_tail(&port->messages, link);
if (msg->link.next != NULL) {
nxt_queue_remove(&msg->link);
use_delta--;
}
data = NULL;
if (nxt_port_msg_push(task, port, msg) != NULL) {
use_delta++;
}
}
} else {
nxt_queue_remove(link);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
if (msg->link.next != NULL) {
nxt_queue_remove(&msg->link);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->work.data);
}
data = NULL;
}
} else if (nxt_slow_path(n == NXT_ERROR)) {
if (msg->link.next == NULL) {
if (nxt_port_msg_push(task, port, msg) != NULL) {
use_delta++;
}
}
goto fail;
}
@@ -359,8 +422,7 @@ fail:
use_delta++;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_port_error_handler, task, &port->socket,
nxt_work_queue_add(wq, nxt_port_error_handler, task, &port->socket,
&port->socket);
unlock_mutex:
@@ -595,7 +657,7 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_remove(&msg->link);
use_delta--;
nxt_work_queue_add(wq, nxt_port_release_send_msg, task, msg,
msg->engine);
msg->work.data);
} nxt_queue_loop;