Using disk file to store large request body.

This closes #386 on GitHub.
This commit is contained in:
Max Romanov
2020-03-12 17:54:29 +03:00
parent 08b65721e2
commit 5296be0b82
18 changed files with 455 additions and 36 deletions

View File

@@ -817,12 +817,16 @@ nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, uintptr_t data)
static void
nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
{
size_t size, body_length;
size_t size, body_length, body_buffer_size, body_rest;
ssize_t res;
nxt_str_t *tmp_path, tmp_name;
nxt_buf_t *in, *b;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_status_t status;
static const nxt_str_t tmp_name_pattern = nxt_string("/req-XXXXXXXX");
h1p = r->proto.h1;
nxt_debug(task, "h1p request body read %O te:%d",
@@ -849,36 +853,95 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
body_length = (size_t) r->content_length_n;
b = r->body;
body_buffer_size = nxt_min(r->conf->socket_conf->body_buffer_size,
body_length);
if (body_length > body_buffer_size) {
tmp_path = &r->conf->socket_conf->body_temp_path;
tmp_name.length = tmp_path->length + tmp_name_pattern.length;
b = nxt_buf_file_alloc(r->mem_pool,
body_buffer_size + sizeof(nxt_file_t)
+ tmp_name.length + 1, 0);
} else {
/* This initialization required for CentOS 6, gcc 4.4.7. */
tmp_path = NULL;
tmp_name.length = 0;
b = nxt_buf_mem_alloc(r->mem_pool, body_buffer_size, 0);
}
if (nxt_slow_path(b == NULL)) {
status = NXT_HTTP_INTERNAL_SERVER_ERROR;
goto error;
}
r->body = b;
if (body_length > body_buffer_size) {
tmp_name.start = nxt_pointer_to(b->mem.start, sizeof(nxt_file_t));
memcpy(tmp_name.start, tmp_path->start, tmp_path->length);
memcpy(tmp_name.start + tmp_path->length, tmp_name_pattern.start,
tmp_name_pattern.length);
tmp_name.start[tmp_name.length] = '\0';
b->file = (nxt_file_t *) b->mem.start;
nxt_memzero(b->file, sizeof(nxt_file_t));
b->file->fd = -1;
b->file->size = body_length;
b->mem.start += sizeof(nxt_file_t) + tmp_name.length + 1;
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->file->fd = mkstemp((char *) tmp_name.start);
if (nxt_slow_path(b->file->fd == -1)) {
nxt_log(task, NXT_LOG_ERR, "mkstemp() failed %E", nxt_errno);
if (b == NULL) {
b = nxt_buf_mem_alloc(r->mem_pool, body_length, 0);
if (nxt_slow_path(b == NULL)) {
status = NXT_HTTP_INTERNAL_SERVER_ERROR;
goto error;
}
r->body = b;
nxt_debug(task, "create body tmp file \"%V\", %d",
&tmp_name, b->file->fd);
unlink((char *) tmp_name.start);
}
body_rest = body_length;
in = h1p->conn->read;
size = nxt_buf_mem_used_size(&in->mem);
if (size != 0) {
if (size > body_length) {
size = body_length;
size = nxt_min(size, body_length);
if (nxt_buf_is_file(b)) {
res = nxt_fd_write(b->file->fd, in->mem.pos, size);
if (nxt_slow_path(res < (ssize_t) size)) {
status = NXT_HTTP_INTERNAL_SERVER_ERROR;
goto error;
}
b->file_end += size;
} else {
size = nxt_min(body_buffer_size, size);
b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
body_buffer_size -= size;
}
b->mem.free = nxt_cpymem(b->mem.free, in->mem.pos, size);
in->mem.pos += size;
body_rest -= size;
}
size = nxt_buf_mem_free_size(&b->mem);
nxt_debug(task, "h1p body rest: %uz", body_rest);
nxt_debug(task, "h1p body rest: %uz", size);
if (size != 0) {
if (body_rest != 0) {
in->next = h1p->buffers;
h1p->buffers = in;
h1p->nbuffers++;
@@ -891,6 +954,13 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
return;
}
if (nxt_buf_is_file(b)) {
b->mem.start = NULL;
b->mem.end = NULL;
b->mem.pos = NULL;
b->mem.free = NULL;
}
ready:
r->state->ready_handler(task, r, NULL);
@@ -922,7 +992,9 @@ static const nxt_conn_state_t nxt_h1p_read_body_state
static void
nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
{
size_t size;
size_t size, body_rest;
ssize_t res;
nxt_buf_t *b;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
@@ -933,18 +1005,59 @@ nxt_h1p_conn_request_body_read(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "h1p conn request body read");
size = nxt_buf_mem_free_size(&c->read->mem);
nxt_debug(task, "h1p body rest: %uz", size);
r = h1p->request;
engine = task->thread->engine;
if (size != 0) {
b = c->read;
if (nxt_buf_is_file(b)) {
body_rest = b->file->size - b->file_end;
size = nxt_buf_mem_used_size(&b->mem);
size = nxt_min(size, body_rest);
res = nxt_fd_write(b->file->fd, b->mem.pos, size);
if (nxt_slow_path(res < (ssize_t) size)) {
nxt_h1p_request_error(task, h1p, r);
return;
}
b->file_end += size;
body_rest -= res;
b->mem.pos += size;
if (b->mem.pos == b->mem.free) {
if (body_rest >= (size_t) nxt_buf_mem_size(&b->mem)) {
b->mem.free = b->mem.start;
} else {
/* This required to avoid reading next request. */
b->mem.free = b->mem.end - body_rest;
}
b->mem.pos = b->mem.free;
}
} else {
body_rest = nxt_buf_mem_free_size(&c->read->mem);
}
nxt_debug(task, "h1p body rest: %uz", body_rest);
if (body_rest != 0) {
nxt_conn_read(engine, c);
} else {
if (nxt_buf_is_file(b)) {
b->mem.start = NULL;
b->mem.end = NULL;
b->mem.pos = NULL;
b->mem.free = NULL;
}
c->read = NULL;
r = h1p->request;
r->state->ready_handler(task, r, NULL);
}
@@ -2140,7 +2253,13 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
c->write_state = &nxt_h1p_peer_header_send_state;
if (r->body != NULL) {
body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
if (nxt_buf_is_file(r->body)) {
body = nxt_buf_file_alloc(r->mem_pool, 0, 0);
} else {
body = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
}
if (nxt_slow_path(body == NULL)) {
r->state->error_handler(task, r, peer);
return;
@@ -2148,8 +2267,15 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
header->next = body;
body->mem = r->body->mem;
size += nxt_buf_mem_used_size(&body->mem);
if (nxt_buf_is_file(r->body)) {
body->file = r->body->file;
body->file_end = r->body->file_end;
} else {
body->mem = r->body->mem;
}
size += nxt_buf_used_size(body);
// nxt_mp_retain(r->mem_pool);
}
@@ -2205,13 +2331,13 @@ nxt_h1p_peer_header_sent(nxt_task_t *task, void *obj, void *data)
c->write = nxt_sendbuf_completion(task, &engine->fast_work_queue, c->write);
if (c->write == NULL) {
r = peer->request;
r->state->ready_handler(task, r, peer);
if (c->write != NULL) {
nxt_conn_write(engine, c);
return;
}
nxt_conn_write(engine, c);
r = peer->request;
r->state->ready_handler(task, r, peer);
}