Splitting HTTP processing in more granulate connection states.

This commit is contained in:
Igor Sysoev
2018-04-10 19:30:45 +03:00
parent 41317e37da
commit 0a44ac371a
2 changed files with 227 additions and 115 deletions

View File

@@ -8,8 +8,15 @@
#include <nxt_http.h> #include <nxt_http.h>
static void nxt_h1p_read_header(nxt_task_t *task, void *obj, void *data); /*
static void nxt_h1p_header_parse(nxt_task_t *task, void *obj, void *data); * nxt_h1p_conn_ prefix is used for connection handlers.
* nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
*/
static void nxt_h1p_conn_read_header(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data);
static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p, static nxt_int_t nxt_h1p_header_process(nxt_h1proto_t *h1p,
nxt_http_request_t *r); nxt_http_request_t *r);
static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field, static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
@@ -17,32 +24,38 @@ static nxt_int_t nxt_h1p_connection(void *ctx, nxt_http_field_t *field,
static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field, static nxt_int_t nxt_h1p_transfer_encoding(void *ctx, nxt_http_field_t *field,
uintptr_t data); uintptr_t data);
static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r);
static void nxt_h1p_body_read(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_body_read(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r); static void nxt_h1p_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
static void nxt_h1p_request_header_send(nxt_task_t *task, static void nxt_h1p_request_header_send(nxt_task_t *task,
nxt_http_request_t *r); nxt_http_request_t *r);
static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out); nxt_buf_t *out);
nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r);
static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out); nxt_buf_t *out);
static void nxt_h1p_sent(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_request_sent(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r, static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *last); nxt_buf_t *last);
static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto); static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto);
static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
nxt_conn_t *c); nxt_conn_t *c);
static void nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c); static void nxt_h1p_conn_keepalive(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data); static void nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_h1p_timeout_value(nxt_conn_t *c, uintptr_t data); static void nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c);
static void nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj,
void *data);
nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r);
static nxt_msec_t nxt_h1p_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
static const nxt_conn_state_t nxt_h1p_idle_state; static const nxt_conn_state_t nxt_h1p_idle_state;
static const nxt_conn_state_t nxt_h1p_read_header_state; static const nxt_conn_state_t nxt_h1p_proto_init_state;
static const nxt_conn_state_t nxt_h1p_request_init_state;
static const nxt_conn_state_t nxt_h1p_header_parse_state;
static const nxt_conn_state_t nxt_h1p_read_body_state; static const nxt_conn_state_t nxt_h1p_read_body_state;
static const nxt_conn_state_t nxt_h1p_send_state; static const nxt_conn_state_t nxt_h1p_send_state;
static const nxt_conn_state_t nxt_h1p_keepalive_state;
const nxt_http_proto_body_read_t nxt_http_proto_body_read[3] = { const nxt_http_proto_body_read_t nxt_http_proto_body_read[3] = {
@@ -143,18 +156,18 @@ nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
static const nxt_conn_state_t nxt_h1p_idle_state static const nxt_conn_state_t nxt_h1p_idle_state
nxt_aligned(64) = nxt_aligned(64) =
{ {
.ready_handler = nxt_h1p_read_header, .ready_handler = nxt_h1p_conn_read_header,
.close_handler = nxt_h1p_conn_close, .close_handler = nxt_h1p_conn_error,
.error_handler = nxt_h1p_conn_error, .error_handler = nxt_h1p_conn_error,
.timer_handler = nxt_h1p_conn_timeout, .timer_handler = nxt_h1p_conn_timeout,
.timer_value = nxt_h1p_timeout_value, .timer_value = nxt_h1p_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, idle_timeout), .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
}; };
static void static void
nxt_h1p_read_header(nxt_task_t *task, void *obj, void *data) nxt_h1p_conn_read_header(nxt_task_t *task, void *obj, void *data)
{ {
size_t size; size_t size;
nxt_conn_t *c; nxt_conn_t *c;
@@ -162,7 +175,7 @@ nxt_h1p_read_header(nxt_task_t *task, void *obj, void *data)
c = obj; c = obj;
nxt_debug(task, "h1p read header"); nxt_debug(task, "h1p conn read header");
if (c->read == NULL) { if (c->read == NULL) {
joint = c->joint; joint = c->joint;
@@ -175,81 +188,137 @@ nxt_h1p_read_header(nxt_task_t *task, void *obj, void *data)
} }
} }
c->read_state = &nxt_h1p_read_header_state; c->read_state = &nxt_h1p_proto_init_state;
nxt_conn_read(task->thread->engine, c); nxt_conn_read(task->thread->engine, c);
} }
static const nxt_conn_state_t nxt_h1p_read_header_state static const nxt_conn_state_t nxt_h1p_proto_init_state
nxt_aligned(64) = nxt_aligned(64) =
{ {
.ready_handler = nxt_h1p_header_parse, .ready_handler = nxt_h1p_conn_proto_init,
.close_handler = nxt_h1p_conn_close, .close_handler = nxt_h1p_conn_error,
.error_handler = nxt_h1p_conn_error, .error_handler = nxt_h1p_conn_error,
.timer_handler = nxt_h1p_conn_timeout, .timer_handler = nxt_h1p_conn_timeout,
.timer_value = nxt_h1p_timeout_value, .timer_value = nxt_h1p_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout), .timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
}; };
static void static void
nxt_h1p_header_parse(nxt_task_t *task, void *obj, void *data) nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_h1proto_t *h1p;
c = obj;
nxt_debug(task, "h1p conn proto init");
h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
if (nxt_slow_path(h1p == NULL)) {
nxt_h1p_close(task, c);
return;
}
c->socket.data = h1p;
h1p->conn = c;
nxt_h1p_conn_request_init(task, c, h1p);
}
static const nxt_conn_state_t nxt_h1p_request_init_state
nxt_aligned(64) =
{
.ready_handler = nxt_h1p_conn_request_init,
.close_handler = nxt_h1p_conn_error,
.error_handler = nxt_h1p_conn_error,
.timer_handler = nxt_h1p_conn_timeout,
.timer_value = nxt_h1p_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
};
static void
nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data)
{ {
size_t size;
nxt_int_t ret; nxt_int_t ret;
nxt_buf_t *in, *b;
nxt_conn_t *c; nxt_conn_t *c;
nxt_h1proto_t *h1p; nxt_h1proto_t *h1p;
nxt_http_status_t status;
nxt_http_request_t *r; nxt_http_request_t *r;
nxt_socket_conf_joint_t *joint; nxt_socket_conf_joint_t *joint;
c = obj; c = obj;
h1p = data; h1p = data;
nxt_debug(task, "h1p header parse"); nxt_debug(task, "h1p conn request init");
if (h1p == NULL) { r = nxt_http_request_create(task);
h1p = nxt_mp_zget(c->mem_pool, sizeof(nxt_h1proto_t));
if (nxt_slow_path(h1p == NULL)) {
goto fail;
}
c->socket.data = h1p;
h1p->conn = c;
}
r = h1p->request;
if (r == NULL) {
r = nxt_http_request_create(task);
if (nxt_slow_path(r == NULL)) {
goto fail;
}
if (nxt_fast_path(r != NULL)) {
h1p->request = r; h1p->request = r;
r->proto.h1 = h1p; r->proto.h1 = h1p;
joint = c->joint; joint = c->joint;
r->socket_conf = joint->socket_conf; r->socket_conf = joint->socket_conf;
r->remote = c->remote; r->remote = c->remote;
ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool); ret = nxt_http_parse_request_init(&h1p->parser, r->mem_pool);
if (nxt_slow_path(ret != NXT_OK)) {
/* if (nxt_fast_path(ret == NXT_OK)) {
* The request is very uncomplete here, nxt_h1p_conn_header_parse(task, c, h1p);
* so "internal server error" useless here. return;
*/
nxt_mp_release(r->mem_pool);
h1p->request = NULL;
goto fail;
} }
/*
* The request is very incomplete here,
* so "internal server error" useless here.
*/
nxt_mp_release(r->mem_pool);
} }
nxt_h1p_close(task, c);
}
static const nxt_conn_state_t nxt_h1p_header_parse_state
nxt_aligned(64) =
{
.ready_handler = nxt_h1p_conn_header_parse,
.close_handler = nxt_h1p_conn_request_error,
.error_handler = nxt_h1p_conn_request_error,
.timer_handler = nxt_h1p_conn_request_timeout,
.timer_value = nxt_h1p_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
};
static void
nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data)
{
size_t size;
nxt_int_t ret;
nxt_buf_t *in, *b;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_status_t status;
nxt_http_request_t *r;
c = obj;
h1p = data;
nxt_debug(task, "h1p conn header parse");
ret = nxt_http_parse_request(&h1p->parser, &c->read->mem); ret = nxt_http_parse_request(&h1p->parser, &c->read->mem);
r = h1p->request;
if (nxt_fast_path(ret == NXT_DONE)) { if (nxt_fast_path(ret == NXT_DONE)) {
/* /*
* By default the keepalive mode is disabled in HTTP/1.0 and * By default the keepalive mode is disabled in HTTP/1.0 and
@@ -303,6 +372,7 @@ nxt_h1p_header_parse(nxt_task_t *task, void *obj, void *data)
c->read = b; c->read = b;
} }
c->read_state = &nxt_h1p_header_parse_state;
nxt_conn_read(task->thread->engine, c); nxt_conn_read(task->thread->engine, c);
return; return;
} }
@@ -329,11 +399,6 @@ nxt_h1p_header_parse(nxt_task_t *task, void *obj, void *data)
(void) nxt_h1p_header_process(h1p, r); (void) nxt_h1p_header_process(h1p, r);
nxt_http_request_error(task, r, status); nxt_http_request_error(task, r, status);
return;
fail:
nxt_h1p_conn_close(task, c, h1p);
} }
@@ -407,7 +472,7 @@ nxt_h1p_request_body_read(nxt_task_t *task, nxt_http_request_t *r)
h1p = r->proto.h1; h1p = r->proto.h1;
nxt_debug(task, "h1p body read %O te:%d", nxt_debug(task, "h1p request body read %O te:%d",
r->content_length_n, h1p->transfer_encoding); r->content_length_n, h1p->transfer_encoding);
switch (h1p->transfer_encoding) { switch (h1p->transfer_encoding) {
@@ -495,19 +560,19 @@ error:
static const nxt_conn_state_t nxt_h1p_read_body_state static const nxt_conn_state_t nxt_h1p_read_body_state
nxt_aligned(64) = nxt_aligned(64) =
{ {
.ready_handler = nxt_h1p_body_read, .ready_handler = nxt_h1p_conn_body_read,
.close_handler = nxt_h1p_conn_close, .close_handler = nxt_h1p_conn_request_error,
.error_handler = nxt_h1p_conn_error, .error_handler = nxt_h1p_conn_request_error,
.timer_handler = nxt_h1p_conn_timeout, .timer_handler = nxt_h1p_conn_request_timeout,
.timer_value = nxt_h1p_timeout_value, .timer_value = nxt_h1p_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, body_read_timeout), .timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
.timer_autoreset = 1, .timer_autoreset = 1,
}; };
static void static void
nxt_h1p_body_read(nxt_task_t *task, void *obj, void *data) nxt_h1p_conn_body_read(nxt_task_t *task, void *obj, void *data)
{ {
size_t size; size_t size;
nxt_conn_t *c; nxt_conn_t *c;
@@ -517,7 +582,7 @@ nxt_h1p_body_read(nxt_task_t *task, void *obj, void *data)
c = obj; c = obj;
h1p = data; h1p = data;
nxt_debug(task, "h1p body read"); nxt_debug(task, "h1p conn body read");
size = nxt_buf_mem_free_size(&c->read->mem); size = nxt_buf_mem_free_size(&c->read->mem);
@@ -760,22 +825,14 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
} }
nxt_inline void
nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r)
{
r->state->error_handler(task, r, r->proto.h1);
}
static const nxt_conn_state_t nxt_h1p_send_state static const nxt_conn_state_t nxt_h1p_send_state
nxt_aligned(64) = nxt_aligned(64) =
{ {
.ready_handler = nxt_h1p_sent, .ready_handler = nxt_h1p_conn_request_sent,
.close_handler = nxt_h1p_conn_close, .error_handler = nxt_h1p_conn_request_error,
.error_handler = nxt_h1p_conn_error,
.timer_handler = nxt_h1p_conn_timeout, .timer_handler = nxt_h1p_conn_request_timeout,
.timer_value = nxt_h1p_timeout_value, .timer_value = nxt_h1p_conn_timeout_value,
.timer_data = offsetof(nxt_socket_conf_t, send_timeout), .timer_data = offsetof(nxt_socket_conf_t, send_timeout),
.timer_autoreset = 1, .timer_autoreset = 1,
}; };
@@ -863,14 +920,14 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
static void static void
nxt_h1p_sent(nxt_task_t *task, void *obj, void *data) nxt_h1p_conn_request_sent(nxt_task_t *task, void *obj, void *data)
{ {
nxt_conn_t *c; nxt_conn_t *c;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
c = obj; c = obj;
nxt_debug(task, "h1p sent"); nxt_debug(task, "h1p conn request sent");
engine = task->thread->engine; engine = task->thread->engine;
@@ -969,11 +1026,11 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
in->mem.free = in->mem.start; in->mem.free = in->mem.start;
if (c->socket.read_ready) { if (c->socket.read_ready) {
c->read_state = &nxt_h1p_read_header_state; c->read_state = &nxt_h1p_request_init_state;
nxt_conn_read(task->thread->engine, c); nxt_conn_read(task->thread->engine, c);
} else { } else {
c->read_state = &nxt_h1p_idle_state; c->read_state = &nxt_h1p_keepalive_state;
nxt_conn_wait(c); nxt_conn_wait(c);
} }
@@ -985,67 +1042,49 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
in->mem.pos = in->mem.start; in->mem.pos = in->mem.start;
in->mem.free = in->mem.start + size; in->mem.free = in->mem.start + size;
nxt_h1p_header_parse(task, c, c->socket.data); nxt_h1p_conn_header_parse(task, c, c->socket.data);
} }
} }
static void static const nxt_conn_state_t nxt_h1p_keepalive_state
nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c) nxt_aligned(64) =
{ {
nxt_debug(task, "h1p close"); .ready_handler = nxt_h1p_conn_keepalive,
.close_handler = nxt_h1p_conn_error,
.error_handler = nxt_h1p_conn_error,
c->socket.data = NULL; .timer_handler = nxt_h1p_conn_timeout,
.timer_value = nxt_h1p_conn_timeout_value,
if (c->socket.fd != -1) { .timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
c->write_state = &nxt_router_conn_close_state; };
nxt_conn_close(task->thread->engine, c);
}
}
static void static void
nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data) nxt_h1p_conn_keepalive(nxt_task_t *task, void *obj, void *data)
{ {
nxt_conn_t *c; nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
c = obj; c = obj;
h1p = data;
nxt_debug(task, "h1p conn close"); nxt_debug(task, "h1p conn_keepalive");
if (h1p != NULL) { c->read_state = &nxt_h1p_request_init_state;
r = h1p->request;
if (r != NULL) { nxt_conn_read(task->thread->engine, c);
if (r->fields == NULL) {
(void) nxt_h1p_header_process(h1p, r);
}
nxt_h1p_request_error(task, r);
return;
}
}
nxt_h1p_close(task, c);
} }
static void static void
nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data) nxt_h1p_conn_error(nxt_task_t *task, void *obj, void *data)
{ {
nxt_conn_t *c; nxt_conn_t *c;
nxt_h1proto_t *h1p;
c = obj; c = obj;
h1p = data;
nxt_debug(task, "h1p conn error"); nxt_debug(task, "h1p conn error");
nxt_h1p_conn_close(task, c, h1p); nxt_h1p_close(task, c);
} }
@@ -1061,12 +1100,84 @@ nxt_h1p_conn_timeout(nxt_task_t *task, void *obj, void *data)
c = nxt_read_timer_conn(timer); c = nxt_read_timer_conn(timer);
nxt_h1p_conn_close(task, c, c->socket.data); nxt_h1p_close(task, c);
}
static void
nxt_h1p_close(nxt_task_t *task, nxt_conn_t *c)
{
nxt_debug(task, "h1p close");
c->socket.data = NULL;
c->write_state = &nxt_router_conn_close_state;
nxt_conn_close(task->thread->engine, c);
}
static void
nxt_h1p_conn_request_error(nxt_task_t *task, void *obj, void *data)
{
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
h1p = data;
nxt_debug(task, "h1p conn request error");
r = h1p->request;
if (r->fields == NULL) {
(void) nxt_h1p_header_process(h1p, r);
}
if (r->status == 0) {
r->status = NXT_HTTP_BAD_REQUEST;
}
nxt_h1p_request_error(task, r);
}
static void
nxt_h1p_conn_request_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_timer_t *timer;
nxt_h1proto_t *h1p;
nxt_http_request_t *r;
timer = obj;
nxt_debug(task, "h1p conn request timeout");
c = nxt_read_timer_conn(timer);
h1p = c->socket.data;
r = h1p->request;
if (r->fields == NULL) {
(void) nxt_h1p_header_process(h1p, r);
}
if (r->status == 0) {
r->status = NXT_HTTP_REQUEST_TIMEOUT;
}
nxt_h1p_request_error(task, r);
}
nxt_inline void
nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r)
{
r->state->error_handler(task, r, r->proto.h1);
} }
static nxt_msec_t static nxt_msec_t
nxt_h1p_timeout_value(nxt_conn_t *c, uintptr_t data) nxt_h1p_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
{ {
nxt_socket_conf_joint_t *joint; nxt_socket_conf_joint_t *joint;

View File

@@ -20,6 +20,7 @@ typedef enum {
NXT_HTTP_NOT_MODIFIED = 304, NXT_HTTP_NOT_MODIFIED = 304,
NXT_HTTP_BAD_REQUEST = 400, NXT_HTTP_BAD_REQUEST = 400,
NXT_HTTP_REQUEST_TIMEOUT = 408,
NXT_HTTP_LENGTH_REQUIRED = 411, NXT_HTTP_LENGTH_REQUIRED = 411,
NXT_HTTP_PAYLOAD_TOO_LARGE = 413, NXT_HTTP_PAYLOAD_TOO_LARGE = 413,
NXT_HTTP_URI_TOO_LONG = 414, NXT_HTTP_URI_TOO_LONG = 414,