Added basic HTTP request processing in router.

- request to connection mapping in engine;
- requests queue in connection;
- engine port creation;
- connected ports hash for each process;
- engine port data messages processing (app responses);
This commit is contained in:
Max Romanov
2017-06-23 19:20:08 +03:00
parent 4a1b59c27a
commit b8f126dcdf
9 changed files with 565 additions and 35 deletions

View File

@@ -6,6 +6,7 @@
*/
#include <nxt_router.h>
#include <nxt_application.h>
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task,
@@ -64,6 +65,9 @@ static void nxt_router_conf_release(nxt_task_t *task,
static void nxt_router_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_process_http_request(nxt_task_t *task,
nxt_conn_t *c, nxt_app_parse_ctx_t *ap);
static void nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
@@ -79,6 +83,11 @@ nxt_router_start(nxt_task_t *task, nxt_runtime_t *rt)
nxt_router_temp_conf_t *tmcf;
const nxt_event_interface_t *interface;
ret = nxt_app_http_init(task, rt);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
router = nxt_zalloc(sizeof(nxt_router_t));
if (nxt_slow_path(router == NULL)) {
return NXT_ERROR;
@@ -519,6 +528,7 @@ nxt_router_engine_joints_create(nxt_task_t *task, nxt_mp_t *mp,
joint->count = 1;
joint->socket_conf = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
joint->engine = recf->engine;
}
return NXT_OK;
@@ -578,7 +588,10 @@ static nxt_int_t
nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_event_engine_t *engine)
{
nxt_mp_t *mp;
nxt_int_t ret;
nxt_port_t *port;
nxt_process_t *process;
nxt_thread_link_t *link;
nxt_thread_handle_t handle;
@@ -596,6 +609,36 @@ nxt_router_thread_create(nxt_task_t *task, nxt_runtime_t *rt,
nxt_queue_insert_tail(&rt->engines, &engine->link);
process = nxt_runtime_process_find(rt, nxt_pid);
if (nxt_slow_path(process == NULL)) {
return NXT_ERROR;
}
port = nxt_process_port_new(process);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
mp = nxt_mp_create(1024, 128, 256, 32);
if (nxt_slow_path(mp == NULL)) {
return NXT_ERROR;
}
port->mem_pool = mp;
port->engine = 0;
port->type = NXT_PROCESS_ROUTER;
engine->port = port;
nxt_runtime_port_add(rt, port);
ret = nxt_thread_create(&handle, link);
if (nxt_slow_path(ret != NXT_OK)) {
@@ -636,15 +679,29 @@ nxt_router_engine_post(nxt_router_engine_conf_t *recf)
}
static void
nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
nxt_port_handler_t nxt_router_process_port_handlers[] = {
NULL,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_mmap_handler,
nxt_router_data_handler,
};
static void
nxt_router_thread_start(void *data)
{
nxt_task_t *task;
nxt_thread_t *thread;
nxt_thread_link_t *link;
nxt_event_engine_t *engine;
link = data;
engine = link->engine;
task = &engine->task;
thread = nxt_thread();
@@ -657,6 +714,9 @@ nxt_router_thread_start(void *data)
thread->task = &engine->task;
thread->fiber = &engine->fibers->fiber;
engine->port->socket.task = task;
nxt_port_create(task, engine->port, nxt_router_process_port_handlers);
engine->mem_pool = nxt_mp_create(4096, 128, 1024, 64);
nxt_event_engine_start(engine);
@@ -913,51 +973,159 @@ nxt_router_conn_init(nxt_task_t *task, void *obj, void *data)
static const nxt_conn_state_t nxt_router_conn_write_state
nxt_aligned(64) =
{
.ready_handler = nxt_router_conn_close,
.ready_handler = nxt_router_conn_ready,
.close_handler = nxt_router_conn_close,
.error_handler = nxt_router_conn_error,
};
static void
nxt_router_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
size_t dump_size;
nxt_buf_t *b, *i, *last;
nxt_conn_t *c;
nxt_work_queue_t *wq;
nxt_req_conn_link_t *rc;
nxt_event_engine_t *engine;
b = msg->buf;
engine = task->thread->engine;
wq = &engine->fast_work_queue;
rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
if (nxt_slow_path(rc == NULL)) {
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
/* complete buffer(s) */
for (i = b; i != NULL; i = i->next) {
i->mem.pos = i->mem.free;
nxt_work_queue_add(wq, i->completion_handler, task, i, i->parent);
}
return;
}
c = rc->conn;
dump_size = nxt_buf_used_size(b);
if (dump_size > 300) {
dump_size = 300;
}
nxt_debug(task, "%srouter data (%z): %*s",
msg->port_msg.last ? "last " : "", msg->size, dump_size,
b->mem.pos);
if (msg->size == 0) {
b = NULL;
}
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
last = nxt_buf_sync_alloc(c->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(last == NULL)) {
/* TODO pogorevaTb */
}
nxt_buf_chain_add(&b, last);
}
if (b == NULL) {
return;
}
if (c->write == NULL) {
c->write = b;
c->write_state = &nxt_router_conn_write_state;
nxt_conn_write(task->thread->engine, c);
} else {
nxt_debug(task, "router data attach out bufs to existing chain");
nxt_buf_chain_add(&c->write, b);
}
}
nxt_inline nxt_port_t *
nxt_router_app_port(nxt_task_t *task)
{
nxt_port_t *port;
nxt_runtime_t *rt;
rt = task->thread->runtime;
nxt_runtime_port_each(rt, port) {
if (nxt_pid == port->pid) {
continue;
}
if (port->type == NXT_PROCESS_WORKER) {
return port;
}
} nxt_runtime_port_loop;
return NULL;
}
static void
nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
{
size_t size;
size_t size, preread;
nxt_int_t ret;
nxt_buf_t *b;
nxt_conn_t *c;
nxt_app_parse_ctx_t *ap;
nxt_socket_conf_joint_t *joint;
nxt_http_request_parse_t *rp;
nxt_app_request_header_t *h;
c = obj;
rp = data;
ap = data;
b = c->read;
nxt_debug(task, "router conn http header parse");
if (rp == NULL) {
rp = nxt_mp_zget(c->mem_pool, sizeof(nxt_http_request_parse_t));
if (nxt_slow_path(rp == NULL)) {
if (ap == NULL) {
ap = nxt_mp_zget(c->mem_pool, sizeof(nxt_app_parse_ctx_t));
if (nxt_slow_path(ap == NULL)) {
nxt_router_conn_close(task, c, data);
return;
}
c->socket.data = rp;
ret = nxt_http_parse_request_init(rp, c->mem_pool);
ret = nxt_app_http_req_init(task, ap);
if (nxt_slow_path(ret != NXT_OK)) {
nxt_router_conn_close(task, c, data);
return;
}
c->socket.data = ap;
}
ret = nxt_http_parse_request(rp, &c->read->mem);
h = &ap->r.header;
ret = nxt_app_http_req_parse(task, ap, b);
nxt_debug(task, "http parse request: %d", ret);
switch (nxt_expect(NXT_DONE, ret)) {
case NXT_DONE:
break;
preread = nxt_buf_mem_used_size(&b->mem);
nxt_debug(task, "router request header parsing complete, "
"content length: %O, preread: %uz",
h->parsed_content_length, preread);
nxt_router_process_http_request(task, c, ap);
return;
case NXT_ERROR:
nxt_router_conn_close(task, c, data);
@@ -965,32 +1133,122 @@ nxt_router_conn_http_header_parse(nxt_task_t *task, void *obj, void *data)
default: /* NXT_AGAIN */
if (c->read->mem.free == c->read->mem.end) {
joint = c->listen->socket.data;
size = joint->socket_conf->large_header_buffer_size,
if (h->done == 0) {
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(b == NULL)) {
nxt_router_conn_close(task, c, data);
return;
if (c->read->mem.free == c->read->mem.end) {
joint = c->listen->socket.data;
size = joint->socket_conf->large_header_buffer_size;
if (size > (size_t) nxt_buf_mem_size(&b->mem)) {
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
if (nxt_slow_path(b == NULL)) {
nxt_router_conn_close(task, c, data);
return;
}
size = c->read->mem.free - c->read->mem.pos;
nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
b->mem.free += size;
c->read = b;
} else {
// TODO 500 Too long request headers
nxt_log_alert(task->log, "Too long request headers");
}
}
size = c->read->mem.free - c->read->mem.pos;
nxt_memcpy(b->mem.pos, c->read->mem.pos, size);
b->mem.free += size;
c->read = b;
}
nxt_conn_read(task->thread->engine, c);
return;
if (ap->r.body.done == 0) {
preread = nxt_buf_mem_used_size(&b->mem);
if (h->parsed_content_length - preread >
(size_t) nxt_buf_mem_free_size(&b->mem)) {
b = nxt_buf_mem_alloc(c->mem_pool, h->parsed_content_length, 0);
if (nxt_slow_path(b == NULL)) {
// TODO 500 Failed to allocate buffer for request body
nxt_log_alert(task->log, "Failed to allocate buffer for "
"request body");
}
b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos,
preread);
c->read = b;
}
nxt_debug(task, "router request body read again, rest: %uz",
h->parsed_content_length - preread);
}
}
c->write = c->read;
c->write->mem.pos = c->write->mem.start;
c->write_state = &nxt_router_conn_write_state;
nxt_conn_read(task->thread->engine, c);
}
nxt_conn_write(task->thread->engine, c);
static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap)
{
nxt_port_t *port, *c_port;
nxt_req_id_t req_id;
nxt_app_wmsg_t wmsg;
nxt_event_engine_t *engine;
nxt_req_conn_link_t *rc;
if (nxt_slow_path(nxt_app == NULL)) {
// 500 Application not found
nxt_log_alert(task->log, "application is NULL");
}
port = nxt_router_app_port(task);
if (nxt_slow_path(port == NULL)) {
// 500 Application port not found
nxt_log_alert(task->log, "application port not found");
}
engine = task->thread->engine;
do {
req_id = nxt_random(&nxt_random_data);
} while (nxt_event_engine_request_find(engine, req_id) != NULL);
rc = nxt_conn_request_add(c, req_id);
if (nxt_slow_path(rc == NULL)) {
// 500 Failed to allocate req->conn link
nxt_log_alert(task->log, "failed to allocate req->conn link");
}
nxt_event_engine_request_add(engine, rc);
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
req_id, c, engine);
c_port = nxt_process_connected_port_find(port->process,
engine->port->pid,
engine->port->id);
if (nxt_slow_path(c_port != engine->port)) {
(void) nxt_port_send_port(task, port, engine->port);
nxt_process_connected_port_add(port->process, engine->port);
}
wmsg.port = port;
wmsg.write = NULL;
wmsg.buf = &wmsg.write;
wmsg.stream = req_id;
(void)nxt_app->prepare_msg(task, &ap->r, &wmsg);
nxt_debug(task, "about to send %d bytes buffer to worker port %d",
nxt_buf_used_size(wmsg.write),
wmsg.port->socket.fd);
(void) nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
-1, req_id, engine->port->id, wmsg.write);
}
@@ -1001,6 +1259,59 @@ static const nxt_conn_state_t nxt_router_conn_close_state
};
static void
nxt_router_conn_ready(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_bool_t last;
nxt_conn_t *c;
nxt_work_queue_t *wq;
nxt_debug(task, "router conn ready %p", obj);
c = obj;
b = c->write;
wq = &task->thread->engine->fast_work_queue;
last = 0;
while (b != NULL) {
if (!nxt_buf_is_sync(b)) {
if (nxt_buf_used_size(b) > 0) {
break;
}
}
if (nxt_buf_is_last(b)) {
last = 1;
}
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
}
c->write = b;
if (b != NULL) {
nxt_debug(task, "router conn %p has more data to write", obj);
nxt_conn_write(task->thread->engine, c);
} else {
nxt_debug(task, "router conn %p no more data to write, last = %d", obj,
last);
if (last != 0) {
nxt_debug(task, "enqueue router conn close %p (ready handler)", c);
nxt_work_queue_add(wq, nxt_router_conn_close, task, c,
c->socket.data);
}
}
}
static void
nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
{
@@ -1020,6 +1331,7 @@ static void
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
{
nxt_conn_t *c;
nxt_req_conn_link_t *rc;
nxt_socket_conf_joint_t *joint;
c = obj;
@@ -1029,6 +1341,14 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
joint = c->listen->socket.data;
nxt_router_conf_release(task, joint);
nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
nxt_event_engine_request_remove(task->thread->engine, rc);
} nxt_queue_loop;
nxt_mp_destroy(c->mem_pool);
}