Introduced chained buffer completion handlers.

This commit is contained in:
Igor Sysoev
2019-11-14 16:39:48 +03:00
parent 643c433f8e
commit 57e326b411
9 changed files with 115 additions and 33 deletions

View File

@@ -195,7 +195,7 @@ static void
nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
nxt_buf_t *b, *parent;
nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -204,10 +204,17 @@ nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
do {
next = b->next;
parent = b->parent;
mp = b->data;
nxt_mp_free(mp, b);
nxt_buf_parent_completion(task, parent);
b = next;
} while (b != NULL);
}
@@ -262,7 +269,7 @@ static void
nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
nxt_buf_t *b, *parent;
nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -275,11 +282,18 @@ nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
nxt_assert(data == b->parent);
do {
next = b->next;
parent = b->parent;
mp = b->data;
nxt_mp_free(mp, b);
nxt_mp_release(mp);
nxt_buf_parent_completion(task, parent);
b = next;
} while (b != NULL);
}

View File

@@ -721,7 +721,7 @@ void
nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_event_engine_t *engine;
nxt_buf_t *b, *parent;
nxt_buf_t *b, *next, *parent;
b = obj;
parent = data;
@@ -729,9 +729,17 @@ nxt_event_engine_buf_mem_completion(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
engine = b->data;
do {
next = b->next;
parent = b->parent;
nxt_event_engine_buf_mem_free(engine, b);
nxt_buf_parent_completion(task, parent);
b = next;
} while (b != NULL);
}

View File

@@ -1219,6 +1219,7 @@ nxt_h1p_complete_buffers(nxt_task_t *task, nxt_h1proto_t *h1p)
while (b != NULL) {
next = b->next;
b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);

View File

@@ -697,6 +697,7 @@ nxt_h1p_conn_ws_pong(nxt_task_t *task, void *obj, void *data)
for (i = 0; i < payload_len; i++) {
while (nxt_buf_mem_used_size(&b->mem) == 0) {
next = b->next;
b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);

View File

@@ -483,15 +483,20 @@ nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size)
static void
nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_buf_t *b, *next;
nxt_http_request_t *r;
b = obj;
r = data;
nxt_mp_free(r->mem_pool, b);
do {
next = b->next;
nxt_mp_free(r->mem_pool, b);
nxt_mp_release(r->mem_pool);
b = next;
} while (b != NULL);
}

View File

@@ -88,6 +88,7 @@ nxt_http_websocket_client(nxt_task_t *task, void *obj, void *data)
frame_size -= copy_size;
next = b->next;
b->next = NULL;
if (nxt_buf_mem_used_size(&b->mem) == 0) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,

View File

@@ -479,6 +479,7 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
size_t sent, nxt_bool_t mmap_mode)
{
size_t size;
nxt_buf_t *next;
while (b != NULL) {
@@ -528,7 +529,9 @@ nxt_port_buf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
next = b->next;
b->next = NULL;
b = next;
}
return b;
@@ -796,7 +799,7 @@ static void
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b, *orig_b;
nxt_buf_t *b, *orig_b, *next;
nxt_port_recv_msg_t *fmsg;
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
@@ -915,11 +918,15 @@ fmsg_failed:
*/
if (msg->buf == b) {
/* complete mmap buffers */
for (; b != NULL; b = b->next) {
while (b != NULL) {
nxt_debug(task, "complete buffer %p", b);
nxt_work_queue_add(port->socket.read_work_queue,
b->completion_handler, task, b, b->parent);
next = b->next;
b->next = NULL;
b = next;
}
}
@@ -964,7 +971,7 @@ static void
nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
{
int use_delta;
nxt_buf_t *b;
nxt_buf_t *b, *next;
nxt_port_t *port;
nxt_work_queue_t *wq;
nxt_port_send_msg_t *msg;
@@ -986,7 +993,10 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
for (b = msg->buf; b != NULL; b = b->next) {
for (b = msg->buf; b != NULL; b = next) {
next = b->next;
b->next = NULL;
if (nxt_buf_is_sync(b)) {
continue;
}

View File

@@ -537,6 +537,7 @@ nxt_router_msg_cancel(nxt_task_t *task, nxt_msg_info_t *msg_info,
for (b = msg_info->buf; b != NULL; b = next) {
next = b->next;
b->next = NULL;
b->completion_handler = msg_info->completion_handler;
@@ -3498,7 +3499,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_int_t ret;
nxt_buf_t *b;
nxt_buf_t *b, *next;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
@@ -3613,10 +3614,13 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
}
if (nxt_buf_mem_used_size(&b->mem) == 0) {
next = b->next;
b->next = NULL;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
b->completion_handler, task, b, b->parent);
b = b->next;
b = next;
}
if (b != NULL) {
@@ -5057,6 +5061,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
if (nxt_slow_path(buf == NULL)) {
while (out != NULL) {
buf = out->next;
out->next = NULL;
out->completion_handler(task, out, out->parent);
out = buf;
}

View File

@@ -9,6 +9,8 @@
static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
size_t *copied);
static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
nxt_work_queue_t *wq, nxt_buf_t *start);
nxt_uint_t
@@ -380,15 +382,11 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
nxt_prefetch(b->next);
if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
break;
}
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
b = nxt_sendbuf_coalesce_completion(task, wq, b);
}
return b;
@@ -399,10 +397,49 @@ void
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
nxt_prefetch(b->next);
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
b = nxt_sendbuf_coalesce_completion(task, wq, b);
}
}
static nxt_buf_t *
nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
nxt_buf_t *start)
{
nxt_buf_t *b, *next, **last, *rest, **last_rest;
nxt_work_handler_t handler;
rest = NULL;
last_rest = &rest;
last = &start->next;
b = start;
handler = b->completion_handler;
for ( ;; ) {
next = b->next;
if (next == NULL) {
break;
}
b->next = NULL;
b = next;
if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
*last_rest = b;
break;
}
if (handler == b->completion_handler) {
*last = b;
last = &b->next;
} else {
*last_rest = b;
last_rest = &b->next;
}
}
nxt_work_queue_add(wq, handler, task, start, start->parent);
return rest;
}