Fixing multi-buffer body send to application.

Application shared queue only capable to pass one shared memory buffer.
The rest buffers in chain needs to be send directly to application in response
to REQ_HEADERS_AC message.

The issue can be reproduced for configurations where 'body_buffer_size' is
greater than memory segment size (10 Mb).  Requests with body size greater
than 10 Mb are just `stuck` e.g. not passed to application awaiting for more
data from router.

The bug was introduced in 1d84b9e4b459 (v1.19.0).
This commit is contained in:
Max Romanov
2020-11-10 22:27:08 +03:00
parent 5fd2933d2e
commit 896d8e8bfb
2 changed files with 20 additions and 6 deletions

View File

@@ -3912,6 +3912,7 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
{
int res;
nxt_app_t *app;
nxt_buf_t *b;
nxt_bool_t start_process, unlinked;
nxt_port_t *app_port, *main_app_port, *idle_port;
nxt_queue_link_t *idle_lnk;
@@ -4009,16 +4010,25 @@ nxt_router_req_headers_ack_handler(nxt_task_t *task,
req_rpc_data->app_port = app_port;
if (req_rpc_data->msg_info.body_fd != -1) {
b = req_rpc_data->msg_info.buf;
if (b != NULL) {
/* First buffer is already sent. Start from second. */
b = b->next;
}
if (req_rpc_data->msg_info.body_fd != -1 || b != NULL) {
nxt_debug(task, "stream #%uD: send body fd %d", req_rpc_data->stream,
req_rpc_data->msg_info.body_fd);
lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
if (req_rpc_data->msg_info.body_fd != -1) {
lseek(req_rpc_data->msg_info.body_fd, 0, SEEK_SET);
}
res = nxt_port_socket_write(task, app_port, NXT_PORT_MSG_REQ_BODY,
req_rpc_data->msg_info.body_fd,
req_rpc_data->stream,
task->thread->engine->port->id, NULL);
task->thread->engine->port->id, b);
if (nxt_slow_path(res != NXT_OK)) {
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);

View File

@@ -1357,8 +1357,14 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
if (recv_msg->incoming_buf != NULL) {
b = nxt_container_of(req->content_buf, nxt_unit_mmap_buf_t, buf);
while (b->next != NULL) {
b = b->next;
}
/* "Move" incoming buffer list to req_impl. */
nxt_unit_mmap_buf_insert_tail(&b->next, recv_msg->incoming_buf);
b->next = recv_msg->incoming_buf;
b->next->prev = &b->next;
recv_msg->incoming_buf = NULL;
}
@@ -2988,8 +2994,6 @@ nxt_unit_request_read(nxt_unit_request_info_t *req, void *dst, size_t size)
buf_res = nxt_unit_buf_read(&req->content_buf, &req->content_length,
dst, size);
nxt_unit_req_debug(req, "read: %d", (int) buf_res);
if (buf_res < (ssize_t) size && req->content_fd != -1) {
res = read(req->content_fd, dst, size);
if (nxt_slow_path(res < 0)) {