nxt_port_buf_completion() and nxt_sendbuf_completion().

nxt_sendbuf_completion() has been renamed to nxt_port_buf_completion()
and moved to src/nxt_port_socket.c.  nxt_sendbuf_completion0() has been
renamed to nxt_sendbuf_completion().
This commit is contained in:
Igor Sysoev
2018-03-28 19:10:02 +03:00
parent cd340b09e6
commit 18377ad288
4 changed files with 67 additions and 67 deletions

View File

@@ -845,8 +845,8 @@ nxt_h1p_sent(nxt_task_t *task, void *obj, void *data)
engine = task->thread->engine; engine = task->thread->engine;
c->write = nxt_sendbuf_completion0(task, &engine->fast_work_queue, c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
c->write);
if (c->write != NULL) { if (c->write != NULL) {
nxt_conn_write(engine, c); nxt_conn_write(engine, c);
} }

View File

@@ -8,6 +8,8 @@
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
static nxt_buf_t *nxt_port_buf_completion(nxt_task_t *task,
nxt_work_queue_t *wq, nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data); static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port, static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg); nxt_port_recv_msg_t *msg);
@@ -379,7 +381,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg->fd = -1; msg->fd = -1;
} }
msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size, msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
m == NXT_PORT_METHOD_MMAP); m == NXT_PORT_METHOD_MMAP);
if (msg->buf != NULL) { if (msg->buf != NULL) {
@@ -462,6 +464,67 @@ unlock_mutex:
} }
static nxt_buf_t *
nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
{
size_t size;
while (b != NULL) {
nxt_prefetch(b->next);
if (!nxt_buf_is_sync(b)) {
size = nxt_buf_used_size(b);
if (size != 0) {
if (sent == 0) {
break;
}
if (nxt_buf_is_port_mmap(b) && mmap_mode) {
/*
* buffer has been sent to other side which is now
* responsible for shared memory bucket release
*/
b->is_port_mmap_sent = 1;
}
if (sent < size) {
if (nxt_buf_is_mem(b)) {
b->mem.pos += sent;
}
if (nxt_buf_is_file(b)) {
b->file_pos += sent;
}
break;
}
/* b->mem.free is NULL in file-only buffer. */
b->mem.pos = b->mem.free;
if (nxt_buf_is_file(b)) {
b->file_pos = b->file_end;
}
sent -= size;
}
}
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
}
return b;
}
void void
nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port) nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port)
{ {

View File

@@ -376,68 +376,7 @@ nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
nxt_buf_t * nxt_buf_t *
nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b, nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
size_t sent, nxt_bool_t mmap_mode)
{
size_t size;
while (b != NULL) {
nxt_prefetch(b->next);
if (!nxt_buf_is_sync(b)) {
size = nxt_buf_used_size(b);
if (size != 0) {
if (sent == 0) {
break;
}
if (nxt_buf_is_port_mmap(b) && mmap_mode) {
/*
* buffer has been sent to other side which is now
* responsible for shared memory bucket release
*/
b->is_port_mmap_sent = 1;
}
if (sent < size) {
if (nxt_buf_is_mem(b)) {
b->mem.pos += sent;
}
if (nxt_buf_is_file(b)) {
b->file_pos += sent;
}
break;
}
/* b->mem.free is NULL in file-only buffer. */
b->mem.pos = b->mem.free;
if (nxt_buf_is_file(b)) {
b->file_pos = b->file_end;
}
sent -= size;
}
}
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
}
return b;
}
nxt_buf_t *
nxt_sendbuf_completion0(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{ {
while (b != NULL) { while (b != NULL) {

View File

@@ -123,8 +123,6 @@ ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm,
nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent); nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent);
nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq,
nxt_buf_t *b, size_t sent, nxt_bool_t mmap_mode);
nxt_buf_t *nxt_sendbuf_completion0(nxt_task_t *task, nxt_work_queue_t *wq,
nxt_buf_t *b); nxt_buf_t *b);