Request body read state implemented.

With specific timeout and buffer size settings.
This commit is contained in:
Max Romanov
2017-08-11 18:04:04 +03:00
parent e1e808bd94
commit 39a6a4c973
15 changed files with 625 additions and 269 deletions

View File

@@ -17,14 +17,14 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
} }
int int
nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len) nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
{ {
return -1; return -1;
} }
int int
nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
size_t dst_len, void *src, size_t src_len) void *src, size_t src_len)
{ {
return -1; return -1;
} }
@@ -71,7 +71,7 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
return 0; return 0;
} }
nxt_go_debug("write: %d %.*s", (int) len, (int) len, (char *) buf); nxt_go_debug("write: %d", (int) len);
ctx = (nxt_go_run_ctx_t *) r; ctx = (nxt_go_run_ctx_t *) r;
rc = nxt_go_ctx_write(ctx, buf, len); rc = nxt_go_ctx_write(ctx, buf, len);
@@ -81,44 +81,30 @@ nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len)
int int
nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, size_t dst_len) nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len)
{ {
nxt_go_msg_t *msg; size_t res;
nxt_go_run_ctx_t *ctx; nxt_go_run_ctx_t *ctx;
nxt_app_request_body_t *b;
nxt_app_request_header_t *h;
if (nxt_slow_path(r == 0)) { if (nxt_slow_path(r == 0)) {
return 0; return 0;
} }
ctx = (nxt_go_run_ctx_t *) r; ctx = (nxt_go_run_ctx_t *) r;
b = &ctx->r.body;
h = &ctx->r.header;
if (off >= h->parsed_content_length) { dst_len = nxt_min(dst_len, ctx->r.body.preread_size);
return 0;
}
if (off < b->preread.length) { res = nxt_go_ctx_read_raw(ctx, dst, dst_len);
dst_len = nxt_min(b->preread.length - off, dst_len);
if (dst_len != 0) { ctx->r.body.preread_size -= res;
nxt_memcpy(dst, b->preread.start + off, dst_len);
}
return dst_len; return res;
}
/* TODO find msg to read */
return NXT_AGAIN;
} }
int int
nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
size_t dst_len, void *src, size_t src_len) void *src, size_t src_len)
{ {
nxt_go_run_ctx_t *ctx; nxt_go_run_ctx_t *ctx;
@@ -130,7 +116,7 @@ nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst,
nxt_go_ctx_add_msg(ctx, src, src_len); nxt_go_ctx_add_msg(ctx, src, src_len);
return nxt_go_request_read(r, off, dst, dst_len); return nxt_go_request_read(r, dst, dst_len);
} }

View File

@@ -21,11 +21,10 @@ typedef uintptr_t nxt_go_request_t;
int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len); int nxt_go_response_write(nxt_go_request_t r, void *buf, size_t len);
int nxt_go_request_read(nxt_go_request_t r, off_t off, void *dst, int nxt_go_request_read(nxt_go_request_t r, void *dst, size_t dst_len);
size_t dst_len);
int nxt_go_request_read_from(nxt_go_request_t r, off_t off, void *dst, int nxt_go_request_read_from(nxt_go_request_t r, void *dst, size_t dst_len,
size_t dst_len, void *src, size_t src_len); void *src, size_t src_len);
int nxt_go_request_close(nxt_go_request_t r); int nxt_go_request_close(nxt_go_request_t r);

View File

@@ -83,22 +83,23 @@ nxt_go_data_handler(nxt_port_msg_t *port_msg, size_t size)
do { do {
rc = nxt_go_ctx_read_str(ctx, &n); rc = nxt_go_ctx_read_str(ctx, &n);
rc = nxt_go_ctx_read_str(ctx, &v);
if (n.length == 0) { if (n.length == 0) {
break; break;
} }
rc = nxt_go_ctx_read_str(ctx, &v);
nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v)); nxt_go_request_add_header(r, nxt_go_str(&n), nxt_go_str(&v));
} while(1); } while(1);
ctx->r.body.preread = v; nxt_go_ctx_read_size(ctx, &s);
ctx->r.body.preread_size = s;
if (h->parsed_content_length > 0) { if (h->parsed_content_length > 0) {
nxt_go_request_set_content_length(r, h->parsed_content_length); nxt_go_request_set_content_length(r, h->parsed_content_length);
} }
if (v.length < h->parsed_content_length) { if (ctx->r.body.preread_size < h->parsed_content_length) {
nxt_go_request_create_channel(r); nxt_go_request_create_channel(r);
} }

View File

@@ -186,7 +186,7 @@ nxt_go_ctx_add_msg(nxt_go_run_ctx_t *ctx, nxt_port_msg_t *port_msg, size_t size)
msg->start_offset = ctx->msg_last->start_offset; msg->start_offset = ctx->msg_last->start_offset;
if (ctx->msg_last == &ctx->msg) { if (ctx->msg_last == &ctx->msg) {
msg->start_offset += ctx->r.body.preread.length; msg->start_offset += ctx->r.body.preread_size;
} else { } else {
msg->start_offset += ctx->msg_last->data_size; msg->start_offset += ctx->msg_last->data_size;
} }
@@ -227,8 +227,8 @@ nxt_go_ctx_flush(nxt_go_run_ctx_t *ctx, int last)
} }
nxt_int_t nxt_buf_t *
nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len) nxt_go_port_mmap_get_buf(nxt_go_run_ctx_t *ctx, size_t size)
{ {
size_t nchunks; size_t nchunks;
nxt_buf_t *buf; nxt_buf_t *buf;
@@ -237,30 +237,16 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
nxt_port_mmap_msg_t *mmap_msg; nxt_port_mmap_msg_t *mmap_msg;
nxt_port_mmap_header_t *hdr; nxt_port_mmap_header_t *hdr;
buf = &ctx->wbuf;
if (ctx->nwbuf > 0 && nxt_buf_mem_free_size(&buf->mem) >= len) {
memcpy(buf->mem.free, data, len);
buf->mem.free += len;
mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
mmap_msg->size += len;
return NXT_OK;
}
if (ctx->nwbuf >= 8) {
nxt_go_ctx_flush(ctx, 0);
}
c = 0; c = 0;
buf = &ctx->wbuf;
hdr = nxt_go_port_mmap_get(ctx->process, hdr = nxt_go_port_mmap_get(ctx->process,
ctx->msg.port_msg->reply_port, &c); ctx->msg.port_msg->reply_port, &c);
if (nxt_slow_path(hdr == NULL)) { if (nxt_slow_path(hdr == NULL)) {
nxt_go_warn("failed to get port_mmap"); nxt_go_warn("failed to get port_mmap");
return NXT_ERROR; return NULL;
} }
buf->mem.start = nxt_port_mmap_chunk_start(hdr, c); buf->mem.start = nxt_port_mmap_chunk_start(hdr, c);
@@ -268,12 +254,15 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
buf->mem.free = buf->mem.start; buf->mem.free = buf->mem.start;
buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE; buf->mem.end = buf->mem.start + PORT_MMAP_CHUNK_SIZE;
buf->parent = hdr;
mmap_msg = ctx->wmmap_msg + ctx->nwbuf; mmap_msg = ctx->wmmap_msg + ctx->nwbuf;
mmap_msg->mmap_id = hdr->id; mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = c; mmap_msg->chunk_id = c;
mmap_msg->size = 0;
nchunks = len / PORT_MMAP_CHUNK_SIZE; nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((len % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++; nchunks++;
} }
@@ -283,27 +272,124 @@ nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
/* Try to acquire as much chunks as required. */ /* Try to acquire as much chunks as required. */
while (nchunks > 0) { while (nchunks > 0) {
if (nxt_port_mmap_get_chunk_busy(hdr, c)) { if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break; break;
} }
nxt_port_mmap_set_chunk_busy(hdr, c);
buf->mem.end += PORT_MMAP_CHUNK_SIZE; buf->mem.end += PORT_MMAP_CHUNK_SIZE;
c++; c++;
nchunks--; nchunks--;
} }
if (nxt_buf_mem_free_size(&buf->mem) < len) { ctx->nwbuf++;
len = nxt_buf_mem_free_size(&buf->mem);
return buf;
}
nxt_int_t
nxt_go_port_mmap_increase_buf(nxt_buf_t *b, size_t size, size_t min_size)
{
size_t nchunks, free_size;
nxt_chunk_id_t c, start;
nxt_port_mmap_header_t *hdr;
free_size = nxt_buf_mem_free_size(&b->mem);
if (nxt_slow_path(size <= free_size)) {
return NXT_OK;
} }
memcpy(buf->mem.free, data, len); hdr = b->parent;
buf->mem.free += len;
mmap_msg->size = len; start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
ctx->nwbuf++; size -= free_size;
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
}
c = start;
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
if (nxt_port_mmap_chk_set_chunk_busy(hdr, c) == 0) {
break;
}
c++;
nchunks--;
}
if (nchunks != 0 &&
min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
c--;
while (c >= start) {
nxt_port_mmap_set_chunk_free(hdr, c);
c--;
}
return NXT_ERROR;
} else {
b->mem.end += PORT_MMAP_CHUNK_SIZE * (c - start);
return NXT_OK;
}
}
nxt_int_t
nxt_go_ctx_write(nxt_go_run_ctx_t *ctx, void *data, size_t len)
{
size_t free_size, copy_size;
nxt_buf_t *buf;
nxt_port_mmap_msg_t *mmap_msg;
buf = &ctx->wbuf;
while (len > 0) {
if (ctx->nwbuf == 0) {
buf = nxt_go_port_mmap_get_buf(ctx, len);
if (nxt_slow_path(buf == NULL)) {
return NXT_ERROR;
}
}
do {
free_size = nxt_buf_mem_free_size(&buf->mem);
if (free_size > 0) {
copy_size = nxt_min(free_size, len);
buf->mem.free = nxt_cpymem(buf->mem.free, data, copy_size);
mmap_msg = ctx->wmmap_msg + ctx->nwbuf - 1;
mmap_msg->size += copy_size;
len -= copy_size;
data = nxt_pointer_to(data, copy_size);
if (len == 0) {
return NXT_OK;
}
}
} while (nxt_go_port_mmap_increase_buf(buf, len, 1) == NXT_OK);
if (ctx->nwbuf >= 8) {
nxt_go_ctx_flush(ctx, 0);
}
buf = nxt_go_port_mmap_get_buf(ctx, len);
if (nxt_slow_path(buf == NULL)) {
return NXT_ERROR;
}
}
return NXT_OK; return NXT_OK;
} }
@@ -403,4 +489,43 @@ nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str)
} }
size_t
nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size)
{
size_t res, read_size;
nxt_int_t rc;
nxt_buf_t *buf;
res = 0;
while (size > 0) {
buf = &ctx->rbuf;
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
ctx->nrbuf++;
rc = nxt_go_ctx_init_rbuf(ctx);
if (nxt_slow_path(rc != NXT_OK)) {
nxt_go_warn("read raw: init rbuf failed");
return res;
}
continue;
}
read_size = nxt_buf_mem_used_size(&buf->mem);
read_size = nxt_min(read_size, size);
dst = nxt_cpymem(dst, buf->mem.pos, read_size);
size -= read_size;
buf->mem.pos += read_size;
res += read_size;
}
nxt_go_debug("read_raw: %d", (int) res);
return res;
}
#endif /* NXT_CONFIGURE */ #endif /* NXT_CONFIGURE */

View File

@@ -68,5 +68,7 @@ nxt_int_t nxt_go_ctx_read_size(nxt_go_run_ctx_t *ctx, size_t *size);
nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str); nxt_int_t nxt_go_ctx_read_str(nxt_go_run_ctx_t *ctx, nxt_str_t *str);
size_t nxt_go_ctx_read_raw(nxt_go_run_ctx_t *ctx, void *dst, size_t size);
#endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */ #endif /* _NXT_GO_RUN_CTX_H_INCLUDED_ */

View File

@@ -21,7 +21,6 @@ type request struct {
resp *response resp *response
c_req C.nxt_go_request_t c_req C.nxt_go_request_t
id C.uint32_t id C.uint32_t
read_pos C.off_t
msgs []*cmsg msgs []*cmsg
ch chan *cmsg ch chan *cmsg
} }
@@ -29,18 +28,17 @@ type request struct {
func (r *request) Read(p []byte) (n int, err error) { func (r *request) Read(p []byte) (n int, err error) {
c := C.size_t(cap(p)) c := C.size_t(cap(p))
b := C.malloc(c) b := C.malloc(c)
res := C.nxt_go_request_read(r.c_req, r.read_pos, b, c) res := C.nxt_go_request_read(r.c_req, b, c)
if res == -2 /* NXT_AGAIN */ { if res == -2 /* NXT_AGAIN */ {
m := <-r.ch m := <-r.ch
res = C.nxt_go_request_read_from(r.c_req, r.read_pos, b, c, m.buf.b, m.buf.s) res = C.nxt_go_request_read_from(r.c_req, b, c, m.buf.b, m.buf.s)
r.push(m) r.push(m)
} }
if res > 0 { if res > 0 {
copy(p, C.GoBytes(b, res)) copy(p, C.GoBytes(b, res))
r.read_pos += C.off_t(res)
} }
C.free(b) C.free(b)

View File

@@ -154,7 +154,7 @@ nxt_app_msg_write_get_buf(nxt_task_t *task, nxt_app_wmsg_t *msg, size_t size)
return res; return res;
} }
if (nxt_port_mmap_increase_buf(task, b, size) == NXT_OK) { if (nxt_port_mmap_increase_buf(task, b, size, size) == NXT_OK) {
res = b->mem.free; res = b->mem.free;
b->mem.free += size; b->mem.free += size;
@@ -306,6 +306,43 @@ nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_str_t *str)
} }
size_t
nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *dst,
size_t size)
{
size_t res, read_size;
nxt_buf_t *buf;
res = 0;
while (size > 0) {
buf = msg->buf;
if (nxt_slow_path(buf == NULL)) {
break;
}
if (nxt_slow_path(nxt_buf_mem_used_size(&buf->mem) == 0)) {
msg->buf = buf->next;
continue;
}
read_size = nxt_buf_mem_used_size(&buf->mem);
read_size = nxt_min(read_size, size);
dst = nxt_cpymem(dst, buf->mem.pos, read_size);
size -= read_size;
buf->mem.pos += read_size;
res += read_size;
}
nxt_debug(task, "nxt_read_raw: %uz", res);
return res;
}
nxt_int_t nxt_int_t
nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n, nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_str_t *n,
nxt_str_t *v) nxt_str_t *v)
@@ -459,7 +496,7 @@ nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx)
nxt_int_t nxt_int_t
nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, nxt_app_http_req_header_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
nxt_buf_t *buf) nxt_buf_t *buf)
{ {
nxt_int_t rc; nxt_int_t rc;
@@ -471,50 +508,76 @@ nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
b = &ctx->r.body; b = &ctx->r.body;
h = &ctx->r.header; h = &ctx->r.header;
if (h->done == 0) { nxt_assert(h->done == 0);
rc = nxt_http_parse_request(p, &buf->mem);
if (nxt_slow_path(rc != NXT_DONE)) { rc = nxt_http_parse_request(p, &buf->mem);
return rc;
}
rc = nxt_http_fields_process(p->fields, ctx, task->log); if (nxt_slow_path(rc != NXT_DONE)) {
return rc;
if (nxt_slow_path(rc != NXT_OK)) {
return rc;
}
h->fields = p->fields;
h->done = 1;
h->version.start = p->version.str;
h->version.length = nxt_strlen(p->version.str);
h->method = p->method;
h->target.start = p->target_start;
h->target.length = p->target_end - p->target_start;
h->path = p->path;
h->query = p->args;
if (h->parsed_content_length == 0) {
b->done = 1;
}
} }
if (b->done == 0) { rc = nxt_http_fields_process(p->fields, ctx, task->log);
b->preread.length = buf->mem.free - buf->mem.pos;
b->preread.start = buf->mem.pos;
b->done = b->preread.length >= (size_t) h->parsed_content_length; if (nxt_slow_path(rc != NXT_OK)) {
return rc;
} }
if (h->done == 1 && b->done == 1) { h->fields = p->fields;
h->done = 1;
h->version.start = p->version.str;
h->version.length = nxt_strlen(p->version.str);
h->method = p->method;
h->target.start = p->target_start;
h->target.length = p->target_end - p->target_start;
h->path = p->path;
h->query = p->args;
if (h->parsed_content_length == 0) {
b->done = 1;
}
if (buf->mem.free == buf->mem.pos) {
return NXT_DONE; return NXT_DONE;
} }
return NXT_AGAIN; b->buf = buf;
b->done = nxt_buf_mem_used_size(&buf->mem) >=
h->parsed_content_length;
if (b->done == 1) {
b->preread_size = nxt_buf_mem_used_size(&buf->mem);
}
return NXT_DONE;
}
nxt_int_t
nxt_app_http_req_body_read(nxt_task_t *task, nxt_app_parse_ctx_t *ctx,
nxt_buf_t *buf)
{
nxt_app_request_body_t *b;
nxt_app_request_header_t *h;
b = &ctx->r.body;
h = &ctx->r.header;
nxt_assert(h->done == 1);
nxt_assert(b->done == 0);
b->done = nxt_buf_mem_used_size(&buf->mem) + b->preread_size >=
(size_t) h->parsed_content_length;
if (b->done == 1) {
b->preread_size += nxt_buf_mem_used_size(&buf->mem);
}
return b->done == 1 ? NXT_DONE : NXT_AGAIN;
} }
@@ -571,18 +634,49 @@ nxt_int_t
nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c, nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg, const u_char *c,
size_t size) size_t size)
{ {
u_char *dst; size_t free_size, copy_size;
nxt_buf_t *b;
nxt_port_t *port;
dst = nxt_app_msg_write_get_buf(task, msg, size); nxt_debug(task, "nxt_app_msg_write_raw: %uz", size);
if (nxt_slow_path(dst == NULL)) {
return NXT_ERROR; while (size > 0) {
b = *msg->buf;
if (b == NULL) {
port = nxt_app_msg_get_port(task, msg);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
b = nxt_port_mmap_get_buf(task, port, size);
if (nxt_slow_path(b == NULL)) {
return NXT_ERROR;
}
*msg->buf = b;
}
do {
free_size = nxt_buf_mem_free_size(&b->mem);
if (free_size > 0) {
copy_size = nxt_min(free_size, size);
b->mem.free = nxt_cpymem(b->mem.free, c, copy_size);
size -= copy_size;
c += copy_size;
if (size == 0) {
return NXT_OK;
}
}
} while (nxt_port_mmap_increase_buf(task, b, size, 1) == NXT_OK);
msg->buf = &b->next;
} }
nxt_memcpy(dst, c, size);
nxt_debug(task, "nxt_app_msg_write_raw: %d %*s", (int)size,
(int)size, c);
return NXT_OK; return NXT_OK;
} }

View File

@@ -84,12 +84,17 @@ typedef struct {
off_t parsed_content_length; off_t parsed_content_length;
nxt_bool_t done; nxt_bool_t done;
size_t bufs;
nxt_buf_t *buf;
} nxt_app_request_header_t; } nxt_app_request_header_t;
typedef struct { typedef struct {
nxt_str_t preread; size_t preread_size;
nxt_bool_t done; nxt_bool_t done;
nxt_buf_t *buf;
} nxt_app_request_body_t; } nxt_app_request_body_t;
@@ -112,8 +117,12 @@ struct nxt_app_parse_ctx_s {
nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx); nxt_int_t nxt_app_http_req_init(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
nxt_int_t nxt_app_http_req_parse(nxt_task_t *task, nxt_app_parse_ctx_t *ctx, nxt_int_t nxt_app_http_req_header_parse(nxt_task_t *task,
nxt_buf_t *buf); nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf);
nxt_int_t nxt_app_http_req_body_read(nxt_task_t *task,
nxt_app_parse_ctx_t *ctx, nxt_buf_t *buf);
nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx); nxt_int_t nxt_app_http_req_done(nxt_task_t *task, nxt_app_parse_ctx_t *ctx);
@@ -178,6 +187,9 @@ nxt_int_t nxt_app_msg_write_raw(nxt_task_t *task, nxt_app_wmsg_t *msg,
nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg, nxt_int_t nxt_app_msg_read_str(nxt_task_t *task, nxt_app_rmsg_t *msg,
nxt_str_t *str); nxt_str_t *str);
size_t nxt_app_msg_read_raw(nxt_task_t *task, nxt_app_rmsg_t *msg, void *buf,
size_t size);
nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_int_t nxt_app_msg_read_nvp(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
nxt_str_t *n, nxt_str_t *v); nxt_str_t *n, nxt_str_t *v);

View File

@@ -116,6 +116,7 @@ static nxt_int_t
nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg) nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
{ {
nxt_int_t rc; nxt_int_t rc;
nxt_buf_t *b;
nxt_http_field_t *field; nxt_http_field_t *field;
nxt_app_request_header_t *h; nxt_app_request_header_t *h;
@@ -168,7 +169,13 @@ nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r, nxt_app_wmsg_t *wmsg)
/* end-of-headers mark */ /* end-of-headers mark */
NXT_WRITE(&eof); NXT_WRITE(&eof);
NXT_WRITE(&r->body.preread);
RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
#undef NXT_WRITE #undef NXT_WRITE
#undef RC #undef RC

View File

@@ -128,6 +128,8 @@ typedef struct {
nxt_str_t script; nxt_str_t script;
nxt_app_wmsg_t *wmsg; nxt_app_wmsg_t *wmsg;
nxt_mp_t *mem_pool; nxt_mp_t *mem_pool;
size_t body_preread_size;
} nxt_php_run_ctx_t; } nxt_php_run_ctx_t;
nxt_inline nxt_int_t nxt_php_write(nxt_php_run_ctx_t *ctx, nxt_inline nxt_int_t nxt_php_write(nxt_php_run_ctx_t *ctx,
@@ -342,8 +344,6 @@ nxt_php_read_request(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
RC(nxt_app_msg_read_size(task, rmsg, &s)); RC(nxt_app_msg_read_size(task, rmsg, &s));
h->parsed_content_length = s; h->parsed_content_length = s;
NXT_READ(&ctx->r.body.preread);
#undef NXT_READ #undef NXT_READ
#undef RC #undef RC
@@ -361,6 +361,7 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg) nxt_app_wmsg_t *wmsg)
{ {
nxt_int_t rc; nxt_int_t rc;
nxt_buf_t *b;
nxt_http_field_t *field; nxt_http_field_t *field;
nxt_app_request_header_t *h; nxt_app_request_header_t *h;
@@ -413,8 +414,6 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length)); RC(nxt_app_msg_write_size(task, wmsg, h->parsed_content_length));
NXT_WRITE(&r->body.preread);
nxt_list_each(field, h->fields) { nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
&prefix, &field->name)); &prefix, &field->name));
@@ -425,6 +424,13 @@ nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
/* end-of-headers mark */ /* end-of-headers mark */
NXT_WRITE(&eof); NXT_WRITE(&eof);
RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
#undef NXT_WRITE #undef NXT_WRITE
#undef RC #undef RC
@@ -673,23 +679,14 @@ static int
nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC) nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC)
#endif #endif
{ {
off_t rest; size_t size, rest;
size_t size;
/*
ssize_t n;
nxt_err_t err;
nxt_php_ctx_t *ctx;
nxt_app_request_t *r;
*/
nxt_php_run_ctx_t *ctx; nxt_php_run_ctx_t *ctx;
nxt_app_request_body_t *b;
nxt_app_request_header_t *h; nxt_app_request_header_t *h;
ctx = SG(server_context); ctx = SG(server_context);
h = &ctx->r.header; h = &ctx->r.header;
b = &ctx->r.body;
rest = h->parsed_content_length - SG(read_post_bytes); rest = (size_t) h->parsed_content_length - SG(read_post_bytes);
nxt_debug(ctx->task, "nxt_php_read_post %O", rest); nxt_debug(ctx->task, "nxt_php_read_post %O", rest);
@@ -697,43 +694,11 @@ nxt_php_read_post(char *buffer, uint count_bytes TSRMLS_DC)
return 0; return 0;
} }
size = 0; rest = nxt_min(ctx->body_preread_size, (size_t) count_bytes);
#ifdef NXT_PHP7 size = nxt_app_msg_read_raw(ctx->task, ctx->rmsg, buffer, rest);
count_bytes = (size_t) nxt_min(rest, (off_t) count_bytes);
#else
count_bytes = (uint) nxt_min(rest, (off_t) count_bytes);
#endif
if (b->preread.length != 0) { ctx->body_preread_size -= size;
size = nxt_min(b->preread.length, count_bytes);
nxt_memcpy(buffer, b->preread.start, size);
b->preread.length -= size;
b->preread.start += size;
if (size == count_bytes) {
return size;
}
}
#if 0
nxt_debug(ctx->task, "recv %z", (size_t) count_bytes - size);
n = recv(r->event_conn->socket.fd, buffer + size, count_bytes - size, 0);
if (nxt_slow_path(n <= 0)) {
err = (n == 0) ? 0 : nxt_socket_errno;
nxt_log_error(NXT_LOG_ERR, r->log, "recv(%d, %uz) failed %E",
r->event_conn->socket.fd, (size_t) count_bytes - size,
err);
return size;
}
return size + n;
#endif
return size; return size;
} }
@@ -868,6 +833,8 @@ nxt_php_register_variables(zval *track_vars_array TSRMLS_DC)
NXT_PHP_SET(n.start, v); NXT_PHP_SET(n.start, v);
} }
nxt_app_msg_read_size(task, ctx->rmsg, &ctx->body_preread_size);
#undef NXT_PHP_SET #undef NXT_PHP_SET
} }

View File

@@ -95,8 +95,6 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
b = obj; b = obj;
nxt_debug(task, "mmap buf completion: %p %p", b, b->mem.start);
mp = b->data; mp = b->data;
#if (NXT_DEBUG) #if (NXT_DEBUG)
@@ -125,6 +123,10 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_port_mmap_free_junk(p, b->mem.end - p); nxt_port_mmap_free_junk(p, b->mem.end - p);
nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b,
b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent,
hdr->pid, hdr->id, c);
while (p < b->mem.end) { while (p < b->mem.end) {
nxt_port_mmap_set_chunk_free(hdr, c); nxt_port_mmap_set_chunk_free(hdr, c);
@@ -414,11 +416,6 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_debug(task, "request %z bytes shm buffer", size); nxt_debug(task, "request %z bytes shm buffer", size);
if (nxt_slow_path(size > PORT_MMAP_DATA_SIZE)) {
nxt_debug(task, "requested size (%z bytes) too big", size);
return NULL;
}
b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0);
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
return NULL; return NULL;
@@ -445,6 +442,10 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nchunks++; nchunks++;
} }
nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
b->mem.start, b->mem.end - b->mem.start,
hdr->pid, hdr->id, c);
c++; c++;
nchunks--; nchunks--;
@@ -465,9 +466,10 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_int_t nxt_int_t
nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size) nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
size_t min_size)
{ {
size_t nchunks; size_t nchunks, free_size;
nxt_chunk_id_t c, start; nxt_chunk_id_t c, start;
nxt_port_mmap_header_t *hdr; nxt_port_mmap_header_t *hdr;
@@ -479,7 +481,9 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
return NXT_ERROR; return NXT_ERROR;
} }
if (nxt_slow_path(size <= (size_t) nxt_buf_mem_free_size(&b->mem))) { free_size = nxt_buf_mem_free_size(&b->mem);
if (nxt_slow_path(size <= free_size)) {
return NXT_OK; return NXT_OK;
} }
@@ -487,7 +491,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
start = nxt_port_mmap_chunk_id(hdr, b->mem.end); start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
size -= nxt_buf_mem_free_size(&b->mem); size -= free_size;
nchunks = size / PORT_MMAP_CHUNK_SIZE; nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) { if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
@@ -507,7 +511,9 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size)
nchunks--; nchunks--;
} }
if (nchunks != 0) { if (nchunks != 0 &&
min_size > free_size + PORT_MMAP_CHUNK_SIZE * (c - start)) {
c--; c--;
while (c >= start) { while (c >= start) {
nxt_port_mmap_set_chunk_free(hdr, c); nxt_port_mmap_set_chunk_free(hdr, c);
@@ -559,6 +565,10 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
b->parent = hdr; b->parent = hdr;
nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
b->mem.start, b->mem.end - b->mem.start,
hdr->pid, hdr->id, mmap_msg->chunk_id);
return b; return b;
} }

View File

@@ -28,7 +28,7 @@ nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size); nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
size_t size); size_t size, size_t min_size);
nxt_port_mmap_header_t * nxt_port_mmap_header_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process, nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,

View File

@@ -57,6 +57,7 @@ typedef struct {
//nxt_app_request_t *request; //nxt_app_request_t *request;
} nxt_py_error_t; } nxt_py_error_t;
typedef struct nxt_python_run_ctx_s nxt_python_run_ctx_t;
static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf); static nxt_int_t nxt_python_init(nxt_task_t *task, nxt_common_app_conf_t *conf);
@@ -68,7 +69,7 @@ static nxt_int_t nxt_python_run(nxt_task_t *task,
static PyObject *nxt_python_create_environ(nxt_task_t *task); static PyObject *nxt_python_create_environ(nxt_task_t *task);
static PyObject *nxt_python_get_environ(nxt_task_t *task, static PyObject *nxt_python_get_environ(nxt_task_t *task,
nxt_app_rmsg_t *rmsg); nxt_app_rmsg_t *rmsg, nxt_python_run_ctx_t *ctx);
static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args); static PyObject *nxt_py_start_resp(PyObject *self, PyObject *args);
@@ -77,11 +78,13 @@ static PyObject *nxt_py_input_read(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readline(nxt_py_input_t *self, PyObject *args);
static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args); static PyObject *nxt_py_input_readlines(nxt_py_input_t *self, PyObject *args);
typedef struct { struct nxt_python_run_ctx_s {
nxt_task_t *task; nxt_task_t *task;
nxt_app_rmsg_t *rmsg; nxt_app_rmsg_t *rmsg;
nxt_app_wmsg_t *wmsg; nxt_app_wmsg_t *wmsg;
} nxt_python_run_ctx_t;
size_t body_preread_size;
};
nxt_inline nxt_int_t nxt_python_write(nxt_python_run_ctx_t *ctx, nxt_inline nxt_int_t nxt_python_write(nxt_python_run_ctx_t *ctx,
const u_char *data, size_t len, const u_char *data, size_t len,
@@ -171,8 +174,6 @@ static PyObject *nxt_py_application;
static PyObject *nxt_py_start_resp_obj; static PyObject *nxt_py_start_resp_obj;
static PyObject *nxt_py_environ_ptyp; static PyObject *nxt_py_environ_ptyp;
static nxt_str_t nxt_python_request_body;
static nxt_python_run_ctx_t *nxt_python_run_ctx; static nxt_python_run_ctx_t *nxt_python_run_ctx;
@@ -323,6 +324,7 @@ nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_app_wmsg_t *wmsg) nxt_app_wmsg_t *wmsg)
{ {
nxt_int_t rc; nxt_int_t rc;
nxt_buf_t *b;
nxt_http_field_t *field; nxt_http_field_t *field;
nxt_app_request_header_t *h; nxt_app_request_header_t *h;
@@ -369,14 +371,20 @@ nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
nxt_list_each(field, h->fields) { nxt_list_each(field, h->fields) {
RC(nxt_app_msg_write_prefixed_upcase(task, wmsg, RC(nxt_app_msg_write_prefixed_upcase(task, wmsg,
&prefix, &field->name)); &prefix, &field->name));
NXT_WRITE(&field->value); NXT_WRITE(&field->value);
} nxt_list_loop; } nxt_list_loop;
/* end-of-headers mark */ /* end-of-headers mark */
NXT_WRITE(&eof); NXT_WRITE(&eof);
NXT_WRITE(&r->body.preread);
RC(nxt_app_msg_write_size(task, wmsg, r->body.preread_size));
for(b = r->body.buf; b != NULL; b = b->next) {
RC(nxt_app_msg_write_raw(task, wmsg, b->mem.pos,
nxt_buf_mem_used_size(&b->mem)));
}
#undef NXT_WRITE #undef NXT_WRITE
#undef RC #undef RC
@@ -395,9 +403,9 @@ nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg)
u_char *buf; u_char *buf;
size_t size; size_t size;
PyObject *result, *iterator, *item, *args, *environ; PyObject *result, *iterator, *item, *args, *environ;
nxt_python_run_ctx_t run_ctx = {task, rmsg, wmsg}; nxt_python_run_ctx_t run_ctx = {task, rmsg, wmsg, 0};
environ = nxt_python_get_environ(task, rmsg); environ = nxt_python_get_environ(task, rmsg, &run_ctx);
if (nxt_slow_path(environ == NULL)) { if (nxt_slow_path(environ == NULL)) {
return NXT_ERROR; return NXT_ERROR;
@@ -465,8 +473,8 @@ nxt_python_run(nxt_task_t *task, nxt_app_rmsg_t *rmsg, nxt_app_wmsg_t *wmsg)
size = PyBytes_GET_SIZE(item); size = PyBytes_GET_SIZE(item);
buf = (u_char *) PyBytes_AS_STRING(item); buf = (u_char *) PyBytes_AS_STRING(item);
nxt_debug(task, "nxt_app_write(fake): %d %*s", (int)size, (int)size, nxt_debug(task, "nxt_app_write(fake): %uz", size);
buf);
nxt_python_write(&run_ctx, buf, size, 1, 0); nxt_python_write(&run_ctx, buf, size, 1, 0);
Py_DECREF(item); Py_DECREF(item);
@@ -688,7 +696,8 @@ nxt_python_read_add_env(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
static PyObject * static PyObject *
nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg) nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg,
nxt_python_run_ctx_t *ctx)
{ {
size_t s; size_t s;
u_char *colon; u_char *colon;
@@ -774,22 +783,24 @@ nxt_python_get_environ(nxt_task_t *task, nxt_app_rmsg_t *rmsg)
NXT_READ("CONTENT_TYPE"); NXT_READ("CONTENT_TYPE");
NXT_READ("CONTENT_LENGTH"); NXT_READ("CONTENT_LENGTH");
while ( (rc = nxt_app_msg_read_nvp(task, rmsg, &n, &v)) == NXT_OK) { while (nxt_app_msg_read_str(task, rmsg, &n) == NXT_OK) {
if (nxt_slow_path(n.length == 0)) { if (nxt_slow_path(n.length == 0)) {
rc = NXT_DONE; break;
}
rc = nxt_app_msg_read_str(task, rmsg, &v);
if (nxt_slow_path(rc != NXT_OK)) {
break; break;
} }
RC(nxt_python_add_env(task, environ, (char *) n.start, &v)); RC(nxt_python_add_env(task, environ, (char *) n.start, &v));
} }
RC(nxt_app_msg_read_size(task, rmsg, &ctx->body_preread_size));
#undef NXT_READ #undef NXT_READ
#undef RC #undef RC
if (rc == NXT_DONE && v.length > 0) {
nxt_python_request_body = v;
}
return environ; return environ;
fail: fail:
@@ -900,11 +911,15 @@ static PyObject *
nxt_py_input_read(nxt_py_input_t *self, PyObject *args) nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
{ {
u_char *buf; u_char *buf;
size_t copy_size;
PyObject *body, *obj; PyObject *body, *obj;
Py_ssize_t size; Py_ssize_t size;
nxt_uint_t n; nxt_uint_t n;
nxt_python_run_ctx_t *ctx;
size = nxt_python_request_body.length; ctx = nxt_python_run_ctx;
size = ctx->body_preread_size;
n = PyTuple_GET_SIZE(args); n = PyTuple_GET_SIZE(args);
@@ -926,8 +941,8 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
"the read body size cannot be zero or less"); "the read body size cannot be zero or less");
} }
if (size == 0 || size > (Py_ssize_t) nxt_python_request_body.length) { if (size == 0 || size > (Py_ssize_t) ctx->body_preread_size) {
size = nxt_python_request_body.length; size = ctx->body_preread_size;
} }
} }
@@ -937,16 +952,12 @@ nxt_py_input_read(nxt_py_input_t *self, PyObject *args)
return NULL; return NULL;
} }
if (size > 0) { buf = (u_char *) PyBytes_AS_STRING(body);
buf = (u_char *) PyBytes_AS_STRING(body);
nxt_memcpy(buf, nxt_python_request_body.start, size); copy_size = nxt_min((size_t) size, ctx->body_preread_size);
copy_size = nxt_app_msg_read_raw(ctx->task, ctx->rmsg, buf, copy_size);
nxt_python_request_body.start += size; ctx->body_preread_size -= copy_size;
nxt_python_request_body.length -= size;
/* TODO wait body */
}
return body; return body;
} }

View File

@@ -128,6 +128,8 @@ static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data); void *data);
static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_process_http_request(nxt_task_t *task, static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap); nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_process_http_request_mp(nxt_task_t *task, static void nxt_router_process_http_request_mp(nxt_task_t *task,
@@ -603,11 +605,35 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
offsetof(nxt_socket_conf_t, large_header_buffer_size), offsetof(nxt_socket_conf_t, large_header_buffer_size),
}, },
{
nxt_string("large_header_buffers"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, large_header_buffers),
},
{
nxt_string("body_buffer_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, body_buffer_size),
},
{
nxt_string("max_body_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_socket_conf_t, max_body_size),
},
{ {
nxt_string("header_read_timeout"), nxt_string("header_read_timeout"),
NXT_CONF_MAP_MSEC, NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, header_read_timeout), offsetof(nxt_socket_conf_t, header_read_timeout),
}, },
{
nxt_string("body_read_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_socket_conf_t, body_read_timeout),
},
}; };
@@ -792,7 +818,11 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
// STUB, default values if http block is not defined. // STUB, default values if http block is not defined.
skcf->header_buffer_size = 2048; skcf->header_buffer_size = 2048;
skcf->large_header_buffer_size = 8192; skcf->large_header_buffer_size = 8192;
skcf->large_header_buffers = 4;
skcf->body_buffer_size = 16 * 1024;
skcf->max_body_size = 2 * 1024 * 1024;
skcf->header_read_timeout = 5000; skcf->header_read_timeout = 5000;
skcf->body_read_timeout = 5000;
if (http != NULL) { if (http != NULL) {
ret = nxt_conf_map_object(http, nxt_router_http_conf, ret = nxt_conf_map_object(http, nxt_router_http_conf,
@@ -1807,7 +1837,7 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
} }
static const nxt_conn_state_t nxt_router_conn_read_state static const nxt_conn_state_t nxt_router_conn_read_header_state
nxt_aligned(64) = nxt_aligned(64) =
{ {
.ready_handler = nxt_router_conn_http_header_parse, .ready_handler = nxt_router_conn_http_header_parse,
@@ -1820,6 +1850,20 @@ static const nxt_conn_state_t nxt_router_conn_read_state
}; };
static const nxt_conn_state_t nxt_router_conn_read_body_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_http_body_read,
.close_handler = nxt_router_conn_close,
.error_handler = nxt_router_conn_error,
.timer_handler = nxt_router_conn_timeout,
.timer_value = nxt_router_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
.timer_autoreset = 1,
};
static void static void
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data) nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
{ {
@@ -1844,7 +1888,7 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
c->read_work_queue = &engine->fast_work_queue; c->read_work_queue = &engine->fast_work_queue;
c->write_work_queue = &engine->fast_work_queue; c->write_work_queue = &engine->fast_work_queue;
c->read_state = &nxt_router_conn_read_state; c->read_state = &nxt_router_conn_read_header_state;
nxt_conn_read(engine, c); nxt_conn_read(engine, c);
} }
@@ -1873,7 +1917,6 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rc = nxt_event_engine_request_find(engine, msg->port_msg.stream); rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
if (nxt_slow_path(rc == NULL)) { if (nxt_slow_path(rc == NULL)) {
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream); nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
return; return;
@@ -1910,14 +1953,18 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rc->app_port = NULL; rc->app_port = NULL;
} }
rc->conn = NULL;
} }
if (b == NULL) { if (b == NULL) {
return; return;
} }
/* Disable instant buffer completion/re-using by port. */ if (msg->buf == b) {
msg->buf = NULL; /* Disable instant buffer completion/re-using by port. */
msg->buf = NULL;
}
if (c->write == NULL) { if (c->write == NULL) {
c->write = b; c->write = b;
@@ -1938,6 +1985,9 @@ nxt_router_text_by_code(int code)
case 400: return "Bad request"; case 400: return "Bad request";
case 404: return "Not found"; case 404: return "Not found";
case 403: return "Forbidden"; case 403: return "Forbidden";
case 408: return "Request Timeout";
case 411: return "Length Required";
case 413: return "Request Entity Too Large";
case 500: case 500:
default: return "Internal server error"; default: return "Internal server error";
} }
@@ -1965,6 +2015,7 @@ nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
msg = (const char *) b->mem.free; msg = (const char *) b->mem.free;
b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args); b->mem.free = nxt_vsprintf(b->mem.free, b->mem.end, fmt, args);
b->mem.free[0] = '\0';
nxt_log_alert(task->log, "error %d: %s", code, msg); nxt_log_alert(task->log, "error %d: %s", code, msg);
@@ -1996,6 +2047,11 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args); b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
va_end(args); va_end(args);
if (c->socket.data != NULL) {
nxt_mp_free(c->mem_pool, c->socket.data);
c->socket.data = NULL;
}
if (c->write == NULL) { if (c->write == NULL) {
c->write = b; c->write = b;
c->write_state = &nxt_router_conn_write_state; c->write_state = &nxt_router_conn_write_state;
@@ -2345,22 +2401,24 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
static void static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data) nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{ {
size_t size, preread; size_t size;
nxt_int_t ret; nxt_int_t ret;
nxt_buf_t *b; nxt_buf_t *buf;
nxt_conn_t *c; nxt_conn_t *c;
nxt_app_parse_ctx_t *ap; nxt_app_parse_ctx_t *ap;
nxt_app_request_body_t *b;
nxt_socket_conf_joint_t *joint; nxt_socket_conf_joint_t *joint;
nxt_app_request_header_t *h; nxt_app_request_header_t *h;
c = obj; c = obj;
ap = data; ap = data;
b = c->read; buf = c->read;
joint = c->listen->socket.data;
nxt_debug(task, "router conn http header parse"); nxt_debug(task, "router conn http header parse");
if (ap == NULL) { if (ap == NULL) {
ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t)); ap = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
if (nxt_slow_path(ap == NULL)) { if (nxt_slow_path(ap == NULL)) {
nxt_router_conn_close(task, c, data); nxt_router_conn_close(task, c, data);
return; return;
@@ -2376,76 +2434,90 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
ap->r.remote.start = nxt_sockaddr_address(c->remote); ap->r.remote.start = nxt_sockaddr_address(c->remote);
ap->r.remote.length = c->remote->address_length; ap->r.remote.length = c->remote->address_length;
ap->r.header.buf = buf;
} }
h = &ap->r.header; h = &ap->r.header;
b = &ap->r.body;
ret = nxt_app_http_req_parse(task, ap, b); ret = nxt_app_http_req_header_parse(task, ap, buf);
nxt_debug(task, "http parse request: %d", ret); nxt_debug(task, "http parse request header: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) { switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE: case NXT_DONE:
preread = nxt_buf_mem_used_size(&b->mem);
nxt_debug(task, "router request header parsing complete, " nxt_debug(task, "router request header parsing complete, "
"content length: %O, preread: %uz", "content length: %O, preread: %uz",
h->parsed_content_length, preread); h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
nxt_router_process_http_request(task, c, ap); if (b->done) {
return; nxt_router_process_http_request(task, c, ap);
return;
}
if (joint->socket_conf->max_body_size > 0 &&
(size_t) h->parsed_content_length >
joint->socket_conf->max_body_size) {
nxt_router_gen_error(task, c, 413, "Content-Length too big");
return;
}
if (nxt_buf_mem_free_size(&buf->mem) == 0) {
size = nxt_min(joint->socket_conf->body_buffer_size,
(size_t) h->parsed_content_length);
buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(buf->next == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"buffer for request body");
return;
}
c->read = buf->next;
b->preread_size += nxt_buf_mem_used_size(&buf->mem);
}
if (b->buf == NULL) {
b->buf = c->read;
}
c->read_state = &nxt_router_conn_read_body_state;
break;
case NXT_ERROR: case NXT_ERROR:
nxt_router_conn_close(task, c, data); nxt_router_gen_error(task, c, 400, "Request header parse error");
return; return;
default: /* NXT_AGAIN */ default: /* NXT_AGAIN */
if (h->done == 0) { if (c->read->mem.free == c->read->mem.end) {
size = joint->socket_conf->large_header_buffer_size;
if (c->read->mem.free == c->read->mem.end) { if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem) ||
joint = c->listen->socket.data; ap->r.header.bufs >= joint->socket_conf->large_header_buffers) {
size = joint->socket_conf->large_header_buffer_size; nxt_router_gen_error(task, c, 413,
"Too long request headers");
if (size > (size_t) nxt_buf_mem_size(&b->mem)) { return;
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(b == NULL)) {
nxt_router_conn_close(task, c, data);
return;
}
size = c->read->mem.free - c->read->mem.pos;
c->read = nxt_buf_cpy(b, c->read->mem.pos, size);
} else {
nxt_router_gen_error(task, c, 400,
"Too long request headers");
return;
}
}
}
if (ap->r.body.done == 0) {
preread = nxt_buf_mem_used_size(&b->mem);
if (h->parsed_content_length - preread >
(size_t) nxt_buf_mem_free_size(&b->mem)) {
b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
if (nxt_slow_path(b == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"buffer for request body");
return;
}
c->read = nxt_buf_cpy(b, c->read->mem.pos, preread);
} }
nxt_debug(task, "router request body read again, rest: %uz", buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
h->parsed_content_length - preread); if (nxt_slow_path(buf->next == NULL)) {
nxt_router_gen_error(task, c, 500,
"Failed to allocate large header "
"buffer");
return;
}
ap->r.header.bufs++;
size = c->read->mem.free - c->read->mem.pos;
c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
} }
} }
@@ -2454,6 +2526,71 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
} }
static void
nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
{
size_t size;
nxt_int_t ret;
nxt_buf_t *buf;
nxt_conn_t *c;
nxt_app_parse_ctx_t *ap;
nxt_app_request_body_t *b;
nxt_socket_conf_joint_t *joint;
nxt_app_request_header_t *h;
c = obj;
ap = data;
buf = c->read;
nxt_debug(task, "router conn http body read");
nxt_assert(ap != NULL);
b = &ap->r.body;
h = &ap->r.header;
ret = nxt_app_http_req_body_read(task, ap, buf);
nxt_debug(task, "http read request body: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE:
nxt_router_process_http_request(task, c, ap);
return;
case NXT_ERROR:
nxt_router_gen_error(task, c, 500, "Read body error");
return;
default: /* NXT_AGAIN */
if (nxt_buf_mem_free_size(&buf->mem) == 0) {
joint = c->listen->socket.data;
b->preread_size += nxt_buf_mem_used_size(&buf->mem);
size = nxt_min(joint->socket_conf->body_buffer_size,
(size_t) h->parsed_content_length - b->preread_size);
buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(buf->next == NULL)) {
nxt_router_gen_error(task, c, 500, "Failed to allocate "
"buffer for request body");
return;
}
c->read = buf->next;
}
nxt_debug(task, "router request body read again, rest: %uz",
h->parsed_content_length - b->preread_size);
}
nxt_conn_read(task->thread->engine, c);
}
static void static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap) nxt_app_parse_ctx_t *ap)
@@ -2725,9 +2862,12 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
c = nxt_read_timer_conn(timer); c = nxt_read_timer_conn(timer);
c->write_state = &nxt_router_conn_close_state; if (c->read_state == &nxt_router_conn_read_header_state) {
nxt_router_gen_error(task, c, 408, "Read header timeout");
nxt_conn_close(task->thread->engine, c); } else {
nxt_router_gen_error(task, c, 408, "Read body timeout");
}
} }

View File

@@ -106,7 +106,11 @@ typedef struct {
size_t header_buffer_size; size_t header_buffer_size;
size_t large_header_buffer_size; size_t large_header_buffer_size;
size_t large_header_buffers;
size_t body_buffer_size;
size_t max_body_size;
nxt_msec_t header_read_timeout; nxt_msec_t header_read_timeout;
nxt_msec_t body_read_timeout;
} nxt_socket_conf_t; } nxt_socket_conf_t;