HTTP keep-alive connections support.
This commit is contained in:
762
src/nxt_router.c
762
src/nxt_router.c
@@ -7,6 +7,7 @@
|
||||
|
||||
#include <nxt_router.h>
|
||||
#include <nxt_conf.h>
|
||||
#include <nxt_http.h>
|
||||
|
||||
|
||||
typedef struct {
|
||||
@@ -35,15 +36,14 @@ typedef struct nxt_req_app_link_s nxt_req_app_link_t;
|
||||
|
||||
|
||||
typedef struct {
|
||||
uint32_t stream;
|
||||
nxt_conn_t *conn;
|
||||
nxt_app_t *app;
|
||||
nxt_port_t *app_port;
|
||||
nxt_app_parse_ctx_t *ap;
|
||||
nxt_msg_info_t msg_info;
|
||||
nxt_req_app_link_t *ra;
|
||||
uint32_t stream;
|
||||
nxt_app_t *app;
|
||||
nxt_port_t *app_port;
|
||||
nxt_app_parse_ctx_t *ap;
|
||||
nxt_msg_info_t msg_info;
|
||||
nxt_req_app_link_t *ra;
|
||||
|
||||
nxt_queue_link_t link; /* for nxt_conn_t.requests */
|
||||
nxt_queue_link_t link; /* for nxt_conn_t.requests */
|
||||
} nxt_req_conn_link_t;
|
||||
|
||||
|
||||
@@ -199,14 +199,6 @@ static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
|
||||
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
|
||||
nxt_req_app_link_t *ra);
|
||||
|
||||
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
static nxt_sockaddr_t *nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c);
|
||||
static void nxt_router_conn_http_body_read(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
static void nxt_router_process_http_request(nxt_task_t *task,
|
||||
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
|
||||
static void nxt_router_app_prepare_request(nxt_task_t *task,
|
||||
nxt_req_app_link_t *ra);
|
||||
static nxt_int_t nxt_python_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
|
||||
@@ -215,16 +207,11 @@ static nxt_int_t nxt_php_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
|
||||
nxt_app_wmsg_t *wmsg);
|
||||
static nxt_int_t nxt_go_prepare_msg(nxt_task_t *task, nxt_app_request_t *r,
|
||||
nxt_app_wmsg_t *wmsg);
|
||||
static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
|
||||
static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
|
||||
|
||||
static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
|
||||
const char* str);
|
||||
static const nxt_http_request_state_t nxt_http_request_send_state;
|
||||
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
|
||||
|
||||
static nxt_router_t *nxt_router;
|
||||
|
||||
@@ -245,7 +232,7 @@ nxt_router_start(nxt_task_t *task, void *data)
|
||||
|
||||
rt = task->thread->runtime;
|
||||
|
||||
ret = nxt_app_http_init(task, rt);
|
||||
ret = nxt_http_init(task, rt);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
return ret;
|
||||
}
|
||||
@@ -502,9 +489,8 @@ nxt_router_ra_update_peer(nxt_task_t *task, nxt_req_app_link_t *ra)
|
||||
static void
|
||||
nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
|
||||
{
|
||||
nxt_mp_t *mp;
|
||||
nxt_conn_t *c;
|
||||
nxt_req_conn_link_t *rc;
|
||||
nxt_mp_t *mp;
|
||||
nxt_req_conn_link_t *rc;
|
||||
|
||||
nxt_assert(task->thread->engine == ra->work.data);
|
||||
nxt_assert(ra->use_count == 0);
|
||||
@@ -514,18 +500,16 @@ nxt_router_ra_release(nxt_task_t *task, nxt_req_app_link_t *ra)
|
||||
rc = ra->rc;
|
||||
|
||||
if (rc != NULL) {
|
||||
c = rc->conn;
|
||||
|
||||
if (nxt_slow_path(ra->err_code != 0)) {
|
||||
nxt_router_gen_error(task, c, ra->err_code, ra->err_str);
|
||||
nxt_http_request_error(task, rc->ap->request, ra->err_code);
|
||||
|
||||
} else {
|
||||
rc->app_port = ra->app_port;
|
||||
rc->msg_info = ra->msg_info;
|
||||
|
||||
if (rc->app->timeout != 0) {
|
||||
c->read_timer.handler = nxt_router_app_timeout;
|
||||
nxt_timer_add(task->thread->engine, &c->read_timer,
|
||||
rc->ap->timer.handler = nxt_router_app_timeout;
|
||||
nxt_timer_add(task->thread->engine, &rc->ap->timer,
|
||||
rc->app->timeout);
|
||||
}
|
||||
|
||||
@@ -693,10 +677,6 @@ nxt_router_rc_unlink(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
||||
|
||||
rc->ap = NULL;
|
||||
}
|
||||
|
||||
nxt_queue_remove(&rc->link);
|
||||
|
||||
rc->conn = NULL;
|
||||
}
|
||||
|
||||
|
||||
@@ -1078,6 +1058,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
|
||||
offsetof(nxt_socket_conf_t, max_body_size),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("idle_timeout"),
|
||||
NXT_CONF_MAP_MSEC,
|
||||
offsetof(nxt_socket_conf_t, idle_timeout),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("header_read_timeout"),
|
||||
NXT_CONF_MAP_MSEC,
|
||||
@@ -1089,6 +1075,12 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
|
||||
NXT_CONF_MAP_MSEC,
|
||||
offsetof(nxt_socket_conf_t, body_read_timeout),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("send_timeout"),
|
||||
NXT_CONF_MAP_MSEC,
|
||||
offsetof(nxt_socket_conf_t, send_timeout),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -1296,8 +1288,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
skcf->large_header_buffers = 4;
|
||||
skcf->body_buffer_size = 16 * 1024;
|
||||
skcf->max_body_size = 2 * 1024 * 1024;
|
||||
skcf->idle_timeout = 65000;
|
||||
skcf->header_read_timeout = 5000;
|
||||
skcf->body_read_timeout = 5000;
|
||||
skcf->send_timeout = 5000;
|
||||
|
||||
if (http != NULL) {
|
||||
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
|
||||
@@ -1308,7 +1302,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
}
|
||||
}
|
||||
|
||||
skcf->listen->handler = nxt_router_conn_init;
|
||||
skcf->listen->handler = nxt_http_conn_init;
|
||||
skcf->router_conf = tmcf->conf;
|
||||
skcf->router_conf->count++;
|
||||
skcf->application = nxt_router_listener_application(tmcf,
|
||||
@@ -2377,92 +2371,20 @@ nxt_router_thread_exit_handler(nxt_task_t *task, void *obj, void *data)
|
||||
}
|
||||
|
||||
|
||||
static const nxt_conn_state_t nxt_router_conn_read_header_state
|
||||
nxt_aligned(64) =
|
||||
{
|
||||
.ready_handler = nxt_router_conn_http_header_parse,
|
||||
.close_handler = nxt_router_conn_close,
|
||||
.error_handler = nxt_router_conn_error,
|
||||
|
||||
.timer_handler = nxt_router_conn_timeout,
|
||||
.timer_value = nxt_router_conn_timeout_value,
|
||||
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
|
||||
};
|
||||
|
||||
|
||||
static const nxt_conn_state_t nxt_router_conn_read_body_state
|
||||
nxt_aligned(64) =
|
||||
{
|
||||
.ready_handler = nxt_router_conn_http_body_read,
|
||||
.close_handler = nxt_router_conn_close,
|
||||
.error_handler = nxt_router_conn_error,
|
||||
|
||||
.timer_handler = nxt_router_conn_timeout,
|
||||
.timer_value = nxt_router_conn_timeout_value,
|
||||
.timer_data = offsetof(nxt_socket_conf_t, body_read_timeout),
|
||||
.timer_autoreset = 1,
|
||||
};
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
size_t size;
|
||||
nxt_conn_t *c;
|
||||
nxt_socket_conf_t *skcf;
|
||||
nxt_event_engine_t *engine;
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
|
||||
c = obj;
|
||||
joint = data;
|
||||
|
||||
nxt_debug(task, "router conn init");
|
||||
|
||||
c->joint = joint;
|
||||
joint->count++;
|
||||
|
||||
skcf = joint->socket_conf;
|
||||
c->local = skcf->sockaddr;
|
||||
|
||||
size = skcf->header_buffer_size;
|
||||
c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
||||
|
||||
c->socket.data = NULL;
|
||||
|
||||
engine = task->thread->engine;
|
||||
c->read_work_queue = &engine->fast_work_queue;
|
||||
c->write_work_queue = &engine->fast_work_queue;
|
||||
|
||||
c->read_state = &nxt_router_conn_read_header_state;
|
||||
|
||||
nxt_conn_read(engine, c);
|
||||
}
|
||||
|
||||
|
||||
static const nxt_conn_state_t nxt_router_conn_write_state
|
||||
nxt_aligned(64) =
|
||||
{
|
||||
.ready_handler = nxt_router_conn_ready,
|
||||
.close_handler = nxt_router_conn_close,
|
||||
.error_handler = nxt_router_conn_error,
|
||||
};
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
void *data)
|
||||
{
|
||||
size_t dump_size;
|
||||
nxt_int_t ret;
|
||||
nxt_buf_t *b, *last;
|
||||
nxt_conn_t *c;
|
||||
nxt_event_engine_t *engine;
|
||||
nxt_http_request_t *r;
|
||||
nxt_req_conn_link_t *rc;
|
||||
nxt_app_parse_ctx_t *ar;
|
||||
|
||||
b = msg->buf;
|
||||
rc = data;
|
||||
|
||||
c = rc->conn;
|
||||
|
||||
dump_size = nxt_buf_used_size(b);
|
||||
|
||||
if (dump_size > 300) {
|
||||
@@ -2477,16 +2399,16 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
b = NULL;
|
||||
}
|
||||
|
||||
engine = task->thread->engine;
|
||||
|
||||
nxt_timer_disable(engine, &c->read_timer);
|
||||
ar = rc->ap;
|
||||
|
||||
if (msg->port_msg.last != 0) {
|
||||
nxt_debug(task, "router data create last buf");
|
||||
|
||||
last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
|
||||
last = nxt_http_request_last_buffer(task, ar->request);
|
||||
if (nxt_slow_path(last == NULL)) {
|
||||
/* TODO pogorevaTb */
|
||||
nxt_app_http_req_done(task, ar);
|
||||
nxt_router_rc_unlink(task, rc);
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_buf_chain_add(&b, last);
|
||||
@@ -2495,8 +2417,8 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
|
||||
} else {
|
||||
if (rc->app->timeout != 0) {
|
||||
c->read_timer.handler = nxt_router_app_timeout;
|
||||
nxt_timer_add(engine, &c->read_timer, rc->app->timeout);
|
||||
ar->timer.handler = nxt_router_app_timeout;
|
||||
nxt_timer_add(task->thread->engine, &ar->timer, rc->app->timeout);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2509,16 +2431,67 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
msg->buf = NULL;
|
||||
}
|
||||
|
||||
if (c->write == NULL) {
|
||||
c->write = b;
|
||||
c->write_state = &nxt_router_conn_write_state;
|
||||
r = ar->request;
|
||||
|
||||
nxt_conn_write(task->thread->engine, c);
|
||||
if (r->header_sent) {
|
||||
nxt_buf_chain_add(&r->out, b);
|
||||
nxt_http_request_send_body(task, r, NULL);
|
||||
|
||||
} else {
|
||||
nxt_debug(task, "router data attach out bufs to existing chain");
|
||||
ret = nxt_http_parse_fields(&ar->resp_parser, &b->mem);
|
||||
if (nxt_slow_path(ret != NXT_DONE)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
nxt_buf_chain_add(&c->write, b);
|
||||
r->resp.fields = ar->resp_parser.fields;
|
||||
|
||||
ret = nxt_http_fields_process(r->resp.fields,
|
||||
&nxt_response_fields_hash, r);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
if (nxt_buf_mem_used_size(&b->mem) != 0) {
|
||||
nxt_buf_chain_add(&r->out, b);
|
||||
}
|
||||
|
||||
r->state = &nxt_http_request_send_state;
|
||||
|
||||
nxt_http_request_header_send(task, r);
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
fail:
|
||||
|
||||
nxt_app_http_req_done(task, ar);
|
||||
nxt_router_rc_unlink(task, rc);
|
||||
|
||||
nxt_http_request_error(task, r, NXT_HTTP_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
|
||||
|
||||
static const nxt_http_request_state_t nxt_http_request_send_state
|
||||
nxt_aligned(64) =
|
||||
{
|
||||
.ready_handler = nxt_http_request_send_body,
|
||||
.error_handler = nxt_http_request_close_handler,
|
||||
};
|
||||
|
||||
|
||||
static void
|
||||
nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_buf_t *out;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
r = obj;
|
||||
|
||||
out = r->out;
|
||||
|
||||
if (out != NULL) {
|
||||
r->out = NULL;
|
||||
nxt_http_request_send(task, r, out);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2562,98 +2535,12 @@ nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
}
|
||||
}
|
||||
|
||||
nxt_router_gen_error(task, rc->conn, 500,
|
||||
"Application terminated unexpectedly");
|
||||
nxt_http_request_error(task, rc->ap->request, NXT_HTTP_SERVICE_UNAVAILABLE);
|
||||
|
||||
nxt_router_rc_unlink(task, rc);
|
||||
}
|
||||
|
||||
|
||||
nxt_inline const char *
|
||||
nxt_router_text_by_code(int code)
|
||||
{
|
||||
switch (code) {
|
||||
case 400: return "Bad request";
|
||||
case 404: return "Not found";
|
||||
case 403: return "Forbidden";
|
||||
case 408: return "Request Timeout";
|
||||
case 411: return "Length Required";
|
||||
case 413: return "Request Entity Too Large";
|
||||
case 500:
|
||||
default: return "Internal server error";
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static nxt_buf_t *
|
||||
nxt_router_get_error_buf(nxt_task_t *task, nxt_mp_t *mp, int code,
|
||||
const char* str)
|
||||
{
|
||||
nxt_buf_t *b, *last;
|
||||
|
||||
b = nxt_buf_mem_alloc(mp, 16384, 0);
|
||||
if (nxt_slow_path(b == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
b->mem.free = nxt_sprintf(b->mem.free, b->mem.end,
|
||||
"HTTP/1.0 %d %s\r\n"
|
||||
"Content-Type: text/plain\r\n"
|
||||
"Connection: close\r\n\r\n",
|
||||
code, nxt_router_text_by_code(code));
|
||||
|
||||
b->mem.free = nxt_cpymem(b->mem.free, str, nxt_strlen(str));
|
||||
|
||||
last = nxt_buf_sync_alloc(mp, NXT_BUF_SYNC_LAST);
|
||||
|
||||
if (nxt_slow_path(last == NULL)) {
|
||||
nxt_mp_free(mp, b);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nxt_buf_chain_add(&b, last);
|
||||
|
||||
return b;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
|
||||
const char* str)
|
||||
{
|
||||
nxt_mp_t *mp;
|
||||
nxt_buf_t *b;
|
||||
|
||||
/* TODO: fix when called in the middle of response */
|
||||
|
||||
nxt_log_alert(task->log, "error %d: %s", code, str);
|
||||
|
||||
if (c->socket.fd == -1) {
|
||||
return;
|
||||
}
|
||||
|
||||
mp = c->mem_pool;
|
||||
|
||||
b = nxt_router_get_error_buf(task, mp, code, str);
|
||||
if (nxt_slow_path(b == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (c->write == NULL) {
|
||||
c->write = b;
|
||||
c->write_state = &nxt_router_conn_write_state;
|
||||
|
||||
nxt_conn_write(task->thread->engine, c);
|
||||
|
||||
} else {
|
||||
nxt_debug(task, "router data attach out bufs to existing chain");
|
||||
|
||||
nxt_buf_chain_add(&c->write, b);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||
void *data)
|
||||
@@ -3228,283 +3115,22 @@ nxt_router_app_port(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
|
||||
void
|
||||
nxt_router_process_http_request(nxt_task_t *task, nxt_app_parse_ctx_t *ar)
|
||||
{
|
||||
size_t size;
|
||||
nxt_int_t ret;
|
||||
nxt_buf_t *buf;
|
||||
nxt_conn_t *c;
|
||||
nxt_sockaddr_t *local;
|
||||
nxt_app_parse_ctx_t *ap;
|
||||
nxt_app_request_body_t *b;
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
nxt_app_request_header_t *h;
|
||||
|
||||
c = obj;
|
||||
ap = data;
|
||||
buf = c->read;
|
||||
joint = c->joint;
|
||||
|
||||
nxt_debug(task, "router conn http header parse");
|
||||
|
||||
if (ap == NULL) {
|
||||
ap = nxt_app_http_req_init(task);
|
||||
if (nxt_slow_path(ap == NULL)) {
|
||||
nxt_router_gen_error(task, c, 500,
|
||||
"Failed to allocate parse context");
|
||||
return;
|
||||
}
|
||||
|
||||
c->socket.data = ap;
|
||||
|
||||
ap->r.remote.start = nxt_sockaddr_address(c->remote);
|
||||
ap->r.remote.length = c->remote->address_length;
|
||||
|
||||
/*
|
||||
* TODO: need an application flag to get local address
|
||||
* required by "SERVER_ADDR" in Pyhton and PHP. Not used in Go.
|
||||
*/
|
||||
local = nxt_router_local_addr(task, c);
|
||||
|
||||
if (nxt_fast_path(local != NULL)) {
|
||||
ap->r.local.start = nxt_sockaddr_address(local);
|
||||
ap->r.local.length = local->address_length;
|
||||
}
|
||||
|
||||
ap->r.header.buf = buf;
|
||||
}
|
||||
|
||||
h = &ap->r.header;
|
||||
b = &ap->r.body;
|
||||
|
||||
ret = nxt_app_http_req_header_parse(task, ap, buf);
|
||||
|
||||
nxt_debug(task, "http parse request header: %d", ret);
|
||||
|
||||
switch (nxt_expect(NXT_DONE, ret)) {
|
||||
|
||||
case NXT_DONE:
|
||||
nxt_debug(task, "router request header parsing complete, "
|
||||
"content length: %O, preread: %uz",
|
||||
h->parsed_content_length, nxt_buf_mem_used_size(&buf->mem));
|
||||
|
||||
if (b->done) {
|
||||
nxt_router_process_http_request(task, c, ap);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (joint->socket_conf->max_body_size > 0
|
||||
&& (size_t) h->parsed_content_length
|
||||
> joint->socket_conf->max_body_size)
|
||||
{
|
||||
nxt_router_gen_error(task, c, 413, "Content-Length too big");
|
||||
return;
|
||||
}
|
||||
|
||||
if (nxt_buf_mem_free_size(&buf->mem) == 0) {
|
||||
size = nxt_min(joint->socket_conf->body_buffer_size,
|
||||
(size_t) h->parsed_content_length);
|
||||
|
||||
buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
||||
if (nxt_slow_path(buf->next == NULL)) {
|
||||
nxt_router_gen_error(task, c, 500, "Failed to allocate "
|
||||
"buffer for request body");
|
||||
return;
|
||||
}
|
||||
|
||||
c->read = buf->next;
|
||||
|
||||
b->preread_size += nxt_buf_mem_used_size(&buf->mem);
|
||||
}
|
||||
|
||||
if (b->buf == NULL) {
|
||||
b->buf = c->read;
|
||||
}
|
||||
|
||||
c->read_state = &nxt_router_conn_read_body_state;
|
||||
break;
|
||||
|
||||
case NXT_ERROR:
|
||||
nxt_router_gen_error(task, c, 400, "Request header parse error");
|
||||
return;
|
||||
|
||||
default: /* NXT_AGAIN */
|
||||
|
||||
if (c->read->mem.free == c->read->mem.end) {
|
||||
size = joint->socket_conf->large_header_buffer_size;
|
||||
|
||||
if (size <= (size_t) nxt_buf_mem_used_size(&buf->mem)
|
||||
|| ap->r.header.bufs
|
||||
>= joint->socket_conf->large_header_buffers)
|
||||
{
|
||||
nxt_router_gen_error(task, c, 413,
|
||||
"Too long request headers");
|
||||
return;
|
||||
}
|
||||
|
||||
buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
||||
if (nxt_slow_path(buf->next == NULL)) {
|
||||
nxt_router_gen_error(task, c, 500,
|
||||
"Failed to allocate large header "
|
||||
"buffer");
|
||||
return;
|
||||
}
|
||||
|
||||
ap->r.header.bufs++;
|
||||
|
||||
size = c->read->mem.free - c->read->mem.pos;
|
||||
|
||||
c->read = nxt_buf_cpy(buf->next, c->read->mem.pos, size);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
nxt_conn_read(task->thread->engine, c);
|
||||
}
|
||||
|
||||
|
||||
static nxt_sockaddr_t *
|
||||
nxt_router_local_addr(nxt_task_t *task, nxt_conn_t *c)
|
||||
{
|
||||
int ret;
|
||||
size_t size, length;
|
||||
socklen_t socklen;
|
||||
nxt_sockaddr_t *sa;
|
||||
|
||||
if (c->local != NULL) {
|
||||
return c->local;
|
||||
}
|
||||
|
||||
/* AF_UNIX should not get in here. */
|
||||
|
||||
switch (c->remote->u.sockaddr.sa_family) {
|
||||
#if (NXT_INET6)
|
||||
case AF_INET6:
|
||||
socklen = sizeof(struct sockaddr_in6);
|
||||
length = NXT_INET6_ADDR_STR_LEN;
|
||||
size = offsetof(nxt_sockaddr_t, u) + socklen + length;
|
||||
break;
|
||||
#endif
|
||||
case AF_INET:
|
||||
default:
|
||||
socklen = sizeof(struct sockaddr_in);
|
||||
length = NXT_INET_ADDR_STR_LEN;
|
||||
size = offsetof(nxt_sockaddr_t, u) + socklen + length;
|
||||
break;
|
||||
}
|
||||
|
||||
sa = nxt_mp_get(c->mem_pool, size);
|
||||
if (nxt_slow_path(sa == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
sa->socklen = socklen;
|
||||
sa->length = length;
|
||||
|
||||
ret = getsockname(c->socket.fd, &sa->u.sockaddr, &socklen);
|
||||
if (nxt_slow_path(ret != 0)) {
|
||||
nxt_log(task, NXT_LOG_CRIT, "getsockname(%d) failed", c->socket.fd);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
c->local = sa;
|
||||
|
||||
nxt_sockaddr_text(sa);
|
||||
|
||||
/*
|
||||
* TODO: here we can adjust the end of non-freeable block
|
||||
* in c->mem_pool to the end of actual sockaddr length.
|
||||
*/
|
||||
|
||||
return sa;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_http_body_read(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
size_t size;
|
||||
nxt_int_t ret;
|
||||
nxt_buf_t *buf;
|
||||
nxt_conn_t *c;
|
||||
nxt_app_parse_ctx_t *ap;
|
||||
nxt_app_request_body_t *b;
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
nxt_app_request_header_t *h;
|
||||
|
||||
c = obj;
|
||||
ap = data;
|
||||
buf = c->read;
|
||||
|
||||
nxt_debug(task, "router conn http body read");
|
||||
|
||||
nxt_assert(ap != NULL);
|
||||
|
||||
b = &ap->r.body;
|
||||
h = &ap->r.header;
|
||||
|
||||
ret = nxt_app_http_req_body_read(task, ap, buf);
|
||||
|
||||
nxt_debug(task, "http read request body: %d", ret);
|
||||
|
||||
switch (nxt_expect(NXT_DONE, ret)) {
|
||||
|
||||
case NXT_DONE:
|
||||
nxt_router_process_http_request(task, c, ap);
|
||||
return;
|
||||
|
||||
case NXT_ERROR:
|
||||
nxt_router_gen_error(task, c, 500, "Read body error");
|
||||
return;
|
||||
|
||||
default: /* NXT_AGAIN */
|
||||
|
||||
if (nxt_buf_mem_free_size(&buf->mem) == 0) {
|
||||
joint = c->joint;
|
||||
|
||||
b->preread_size += nxt_buf_mem_used_size(&buf->mem);
|
||||
|
||||
size = nxt_min(joint->socket_conf->body_buffer_size,
|
||||
(size_t) h->parsed_content_length - b->preread_size);
|
||||
|
||||
buf->next = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
||||
if (nxt_slow_path(buf->next == NULL)) {
|
||||
nxt_router_gen_error(task, c, 500, "Failed to allocate "
|
||||
"buffer for request body");
|
||||
return;
|
||||
}
|
||||
|
||||
c->read = buf->next;
|
||||
}
|
||||
|
||||
nxt_debug(task, "router request body read again, rest: %uz",
|
||||
h->parsed_content_length - b->preread_size);
|
||||
}
|
||||
|
||||
nxt_conn_read(task->thread->engine, c);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
||||
nxt_app_parse_ctx_t *ap)
|
||||
{
|
||||
nxt_int_t res;
|
||||
nxt_app_t *app;
|
||||
nxt_port_t *port;
|
||||
nxt_event_engine_t *engine;
|
||||
nxt_req_app_link_t ra_local, *ra;
|
||||
nxt_req_conn_link_t *rc;
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
|
||||
joint = c->joint;
|
||||
app = joint->socket_conf->application;
|
||||
nxt_int_t res;
|
||||
nxt_app_t *app;
|
||||
nxt_port_t *port;
|
||||
nxt_event_engine_t *engine;
|
||||
nxt_http_request_t *r;
|
||||
nxt_req_app_link_t ra_local, *ra;
|
||||
nxt_req_conn_link_t *rc;
|
||||
|
||||
r = ar->request;
|
||||
app = r->socket_conf->application;
|
||||
|
||||
if (app == NULL) {
|
||||
nxt_router_gen_error(task, c, 500,
|
||||
"Application is NULL in socket_conf");
|
||||
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3516,27 +3142,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
||||
sizeof(nxt_req_conn_link_t));
|
||||
|
||||
if (nxt_slow_path(rc == NULL)) {
|
||||
nxt_router_gen_error(task, c, 500, "Failed to allocate "
|
||||
"req<->conn link");
|
||||
|
||||
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
|
||||
return;
|
||||
}
|
||||
|
||||
rc->stream = nxt_port_rpc_ex_stream(rc);
|
||||
rc->conn = c;
|
||||
rc->app = app;
|
||||
|
||||
nxt_router_app_use(task, app, 1);
|
||||
|
||||
nxt_timer_disable(engine, &c->read_timer);
|
||||
|
||||
nxt_queue_insert_tail(&c->requests, &rc->link);
|
||||
|
||||
nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
|
||||
rc->stream, c, engine);
|
||||
|
||||
rc->ap = ap;
|
||||
c->socket.data = NULL;
|
||||
rc->ap = ar;
|
||||
|
||||
ra = &ra_local;
|
||||
nxt_router_ra_init(task, ra, rc);
|
||||
@@ -3912,82 +3527,13 @@ fail:
|
||||
}
|
||||
|
||||
|
||||
static const nxt_conn_state_t nxt_router_conn_close_state
|
||||
const nxt_conn_state_t nxt_router_conn_close_state
|
||||
nxt_aligned(64) =
|
||||
{
|
||||
.ready_handler = nxt_router_conn_free,
|
||||
};
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_buf_t *b;
|
||||
nxt_bool_t last;
|
||||
nxt_conn_t *c;
|
||||
nxt_work_queue_t *wq;
|
||||
|
||||
nxt_debug(task, "router conn ready %p", obj);
|
||||
|
||||
c = obj;
|
||||
b = c->write;
|
||||
|
||||
wq = &task->thread->engine->fast_work_queue;
|
||||
|
||||
last = 0;
|
||||
|
||||
while (b != NULL) {
|
||||
if (!nxt_buf_is_sync(b)) {
|
||||
if (nxt_buf_used_size(b) > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (nxt_buf_is_last(b)) {
|
||||
last = 1;
|
||||
}
|
||||
|
||||
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
|
||||
|
||||
b = b->next;
|
||||
}
|
||||
|
||||
c->write = b;
|
||||
|
||||
if (b != NULL) {
|
||||
nxt_debug(task, "router conn %p has more data to write", obj);
|
||||
|
||||
nxt_conn_write(task->thread->engine, c);
|
||||
|
||||
} else {
|
||||
nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
|
||||
last);
|
||||
|
||||
if (last != 0) {
|
||||
nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
|
||||
|
||||
nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
|
||||
c->socket.data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_conn_t *c;
|
||||
|
||||
c = obj;
|
||||
|
||||
nxt_debug(task, "router conn close");
|
||||
|
||||
c->write_state = &nxt_router_conn_close_state;
|
||||
|
||||
nxt_conn_close(task->thread->engine, c);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
@@ -4004,31 +3550,12 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_conn_t *c;
|
||||
nxt_event_engine_t *engine;
|
||||
nxt_req_conn_link_t *rc;
|
||||
nxt_app_parse_ctx_t *ap;
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
|
||||
c = obj;
|
||||
ap = data;
|
||||
|
||||
nxt_debug(task, "router conn close done");
|
||||
|
||||
if (ap != NULL) {
|
||||
nxt_app_http_req_done(task, ap);
|
||||
|
||||
c->socket.data = NULL;
|
||||
}
|
||||
|
||||
nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
|
||||
|
||||
nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
|
||||
|
||||
nxt_router_rc_unlink(task, rc);
|
||||
|
||||
nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
nxt_queue_remove(&c->link);
|
||||
|
||||
engine = task->thread->engine;
|
||||
@@ -4044,66 +3571,19 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_error(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_conn_t *c;
|
||||
|
||||
c = obj;
|
||||
|
||||
nxt_debug(task, "router conn error");
|
||||
|
||||
if (c->socket.fd != -1) {
|
||||
c->write_state = &nxt_router_conn_close_state;
|
||||
|
||||
nxt_conn_close(task->thread->engine, c);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_conn_t *c;
|
||||
nxt_timer_t *timer;
|
||||
|
||||
timer = obj;
|
||||
|
||||
nxt_debug(task, "router conn timeout");
|
||||
|
||||
c = nxt_read_timer_conn(timer);
|
||||
|
||||
if (c->read_state == &nxt_router_conn_read_header_state) {
|
||||
nxt_router_gen_error(task, c, 408, "Read header timeout");
|
||||
|
||||
} else {
|
||||
nxt_router_gen_error(task, c, 408, "Read body timeout");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_conn_t *c;
|
||||
nxt_timer_t *timer;
|
||||
nxt_timer_t *timer;
|
||||
nxt_app_parse_ctx_t *ar;
|
||||
|
||||
timer = obj;
|
||||
|
||||
nxt_debug(task, "router app timeout");
|
||||
|
||||
c = nxt_read_timer_conn(timer);
|
||||
ar = nxt_timer_data(timer, nxt_app_parse_ctx_t, timer);
|
||||
|
||||
nxt_router_gen_error(task, c, 408, "Application timeout");
|
||||
}
|
||||
|
||||
|
||||
static nxt_msec_t
|
||||
nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
|
||||
{
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
|
||||
joint = c->joint;
|
||||
|
||||
return nxt_value_at(nxt_msec_t, joint->socket_conf, data);
|
||||
if (!ar->request->header_sent) {
|
||||
nxt_http_request_error(task, ar->request, NXT_HTTP_SERVICE_UNAVAILABLE);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user