Implicit port read buffer completion.
To disable implicit completion, handler should reset msg->buf field.
This commit is contained in:
@@ -141,8 +141,6 @@ nxt_port_master_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
b = msg->buf;
|
b = msg->buf;
|
||||||
|
|
||||||
nxt_debug(task, "master data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
|
nxt_debug(task, "master data: %*s", b->mem.free - b->mem.pos, b->mem.pos);
|
||||||
|
|
||||||
b->mem.pos = b->mem.free;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -175,7 +173,6 @@ nxt_port_master_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
start += app_conf.name.length + 1;
|
start += app_conf.name.length + 1;
|
||||||
|
|
||||||
conf = nxt_conf_json_parse(mp, start, b->mem.free);
|
conf = nxt_conf_json_parse(mp, start, b->mem.free);
|
||||||
b->mem.pos = b->mem.free;
|
|
||||||
|
|
||||||
if (conf == NULL) {
|
if (conf == NULL) {
|
||||||
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
|
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
|
||||||
|
|||||||
@@ -225,7 +225,6 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
|
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
|
||||||
msg->buf->mem.pos = msg->buf->mem.free;
|
|
||||||
|
|
||||||
nxt_debug(task, "new port %d received for process %PI:%d",
|
nxt_debug(task, "new port %d received for process %PI:%d",
|
||||||
msg->fd, new_port_msg->pid, new_port_msg->id);
|
msg->fd, new_port_msg->pid, new_port_msg->id);
|
||||||
@@ -405,8 +404,6 @@ nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
|
nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
|
||||||
|
|
||||||
b->mem.pos = b->mem.free;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -415,7 +415,11 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
nxt_port_read_msg_process(task, port, &msg);
|
nxt_port_read_msg_process(task, port, &msg);
|
||||||
|
|
||||||
if (b->mem.pos == b->mem.free) {
|
/*
|
||||||
|
* To disable instant completion or buffer re-usage,
|
||||||
|
* handler should reset 'msg.buf'.
|
||||||
|
*/
|
||||||
|
if (msg.buf == b) {
|
||||||
nxt_port_buf_free(port, b);
|
nxt_port_buf_free(port, b);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -469,14 +473,23 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
|||||||
port->handler(task, msg);
|
port->handler(task, msg);
|
||||||
|
|
||||||
if (msg->port_msg.mmap && orig_b != b) {
|
if (msg->port_msg.mmap && orig_b != b) {
|
||||||
/* complete used mmap buffers */
|
|
||||||
for (; b && nxt_buf_used_size(b) == 0;
|
|
||||||
b = b->next) {
|
|
||||||
nxt_debug(task, "complete buffer %p", b);
|
|
||||||
|
|
||||||
nxt_work_queue_add(port->socket.read_work_queue,
|
/*
|
||||||
b->completion_handler, task, b, b->parent);
|
* To disable instant buffer completion,
|
||||||
|
* handler should reset 'msg->buf'.
|
||||||
|
*/
|
||||||
|
if (msg->buf == b) {
|
||||||
|
/* complete mmap buffers */
|
||||||
|
for (; b != NULL; b = b->next) {
|
||||||
|
nxt_debug(task, "complete buffer %p", b);
|
||||||
|
|
||||||
|
nxt_work_queue_add(port->socket.read_work_queue,
|
||||||
|
b->completion_handler, task, b, b->parent);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* restore original buf */
|
||||||
|
msg->buf = orig_b;
|
||||||
}
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -333,8 +333,6 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
ret = nxt_router_conf_new(task, tmcf, b->mem.pos, b->mem.free);
|
ret = nxt_router_conf_new(task, tmcf, b->mem.pos, b->mem.free);
|
||||||
|
|
||||||
b->mem.pos = b->mem.free;
|
|
||||||
|
|
||||||
if (ret == NXT_OK) {
|
if (ret == NXT_OK) {
|
||||||
nxt_router_conf_success(task, tmcf);
|
nxt_router_conf_success(task, tmcf);
|
||||||
return;
|
return;
|
||||||
@@ -1735,7 +1733,7 @@ static void
|
|||||||
nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
size_t dump_size;
|
size_t dump_size;
|
||||||
nxt_buf_t *b, *i, *last;
|
nxt_buf_t *b, *last;
|
||||||
nxt_conn_t *c;
|
nxt_conn_t *c;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_conn_link_t *rc;
|
||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
@@ -1748,11 +1746,6 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
|
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
|
||||||
|
|
||||||
/* Mark buffers as read. */
|
|
||||||
for (i = b; i != NULL; i = i->next) {
|
|
||||||
i->mem.pos = i->mem.free;
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1793,6 +1786,9 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Disable instant buffer completion/re-using by port. */
|
||||||
|
msg->buf = NULL;
|
||||||
|
|
||||||
if (c->write == NULL) {
|
if (c->write == NULL) {
|
||||||
c->write = b;
|
c->write = b;
|
||||||
c->write_state = &nxt_router_conn_write_state;
|
c->write_state = &nxt_router_conn_write_state;
|
||||||
|
|||||||
Reference in New Issue
Block a user