Some basic HTTP handling in controller.

This commit is contained in:
Valentin Bartenev
2017-03-16 16:28:31 +03:00
parent e13cbdb439
commit e4e617469a

View File

@@ -10,6 +10,12 @@
#include <nxt_master_process.h> #include <nxt_master_process.h>
typedef struct {
nxt_http_request_parse_t parser;
size_t length;
} nxt_controller_request_t;
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c, static nxt_msec_t nxt_controller_conn_timeout_value(nxt_event_conn_t *c,
@@ -18,17 +24,54 @@ static void nxt_controller_conn_read_error(nxt_task_t *task, void *obj,
void *data); void *data);
static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, static void nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj,
void *data); void *data);
static void nxt_controller_conn_body_read(nxt_task_t *task, void *obj,
void *data);
static void nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_write_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj,
void *data);
static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data);
static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data);
static nxt_int_t nxt_controller_request_content_length(void *ctx,
nxt_str_t *name, nxt_str_t *value, uintptr_t data);
static void nxt_controller_process_request(nxt_task_t *task,
nxt_event_conn_t *c, nxt_controller_request_t *r);
static nxt_int_t nxt_controller_request_body_parse(nxt_buf_mem_t *b);
static nxt_http_fields_t nxt_controller_request_fields[] = {
{ nxt_string("Content-Length"),
&nxt_controller_request_content_length, 0 },
{ nxt_null_string, NULL, 0 }
};
static nxt_http_fields_hash_t *nxt_controller_request_fields_hash;
static const nxt_event_conn_state_t nxt_controller_conn_read_state; static const nxt_event_conn_state_t nxt_controller_conn_read_state;
static const nxt_event_conn_state_t nxt_controller_conn_body_read_state;
static const nxt_event_conn_state_t nxt_controller_conn_write_state;
static const nxt_event_conn_state_t nxt_controller_conn_close_state; static const nxt_event_conn_state_t nxt_controller_conn_close_state;
nxt_int_t nxt_int_t
nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt) nxt_controller_start(nxt_task_t *task, nxt_runtime_t *rt)
{ {
nxt_http_fields_hash_t *hash;
hash = nxt_http_fields_hash(nxt_controller_request_fields, rt->mem_pool);
if (nxt_slow_path(hash == NULL)) {
return NXT_ERROR;
}
nxt_controller_request_fields_hash = hash;
if (nxt_event_conn_listen(task, rt->controller_socket) != NXT_OK) { if (nxt_event_conn_listen(task, rt->controller_socket) != NXT_OK) {
return NXT_ERROR; return NXT_ERROR;
} }
@@ -116,14 +159,24 @@ nxt_runtime_controller_socket(nxt_task_t *task, nxt_runtime_t *rt)
static void static void
nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data) nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
{ {
nxt_buf_t *b; nxt_buf_t *b;
nxt_event_conn_t *c; nxt_event_conn_t *c;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_controller_request_t *r;
c = obj; c = obj;
nxt_debug(task, "controller conn init fd:%d", c->socket.fd); nxt_debug(task, "controller conn init fd:%d", c->socket.fd);
r = nxt_mem_zalloc(c->mem_pool, sizeof(nxt_controller_request_t));
if (nxt_slow_path(r == NULL)) {
nxt_controller_conn_free(task, c, NULL);
return;
}
r->parser.hash = nxt_controller_request_fields_hash;
r->parser.ctx = r;
b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0); b = nxt_buf_mem_alloc(c->mem_pool, 1024, 0);
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
nxt_controller_conn_free(task, c, NULL); nxt_controller_conn_free(task, c, NULL);
@@ -131,11 +184,13 @@ nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data)
} }
c->read = b; c->read = b;
c->socket.data = r;
c->socket.read_ready = 1; c->socket.read_ready = 1;
c->read_state = &nxt_controller_conn_read_state; c->read_state = &nxt_controller_conn_read_state;
engine = task->thread->engine; engine = task->thread->engine;
c->read_work_queue = &engine->read_work_queue; c->read_work_queue = &engine->read_work_queue;
c->write_work_queue = &engine->write_work_queue;
nxt_event_conn_read(engine, c); nxt_event_conn_read(engine, c);
} }
@@ -160,13 +215,71 @@ static const nxt_event_conn_state_t nxt_controller_conn_read_state
static void static void
nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data) nxt_controller_conn_read(nxt_task_t *task, void *obj, void *data)
{ {
nxt_event_conn_t *c; size_t preread;
nxt_buf_t *b;
nxt_int_t rc;
nxt_event_conn_t *c;
nxt_controller_request_t *r;
c = obj; c = obj;
r = data;
nxt_debug(task, "controller conn read"); nxt_debug(task, "controller conn read");
nxt_controller_conn_close(task, c, c->socket.data); nxt_queue_remove(&c->link);
nxt_queue_self(&c->link);
b = c->read;
rc = nxt_http_parse_request(&r->parser, &b->mem);
if (nxt_slow_path(rc != NXT_DONE)) {
if (rc == NXT_AGAIN) {
if (nxt_buf_mem_free_size(&b->mem) == 0) {
nxt_log(task, NXT_LOG_ERR, "too long request headers");
nxt_controller_conn_close(task, c, r);
return;
}
nxt_event_conn_read(task->thread->engine, c);
return;
}
/* rc == NXT_ERROR */
nxt_log(task, NXT_LOG_ERR, "parsing error");
nxt_controller_conn_close(task, c, r);
return;
}
preread = nxt_buf_mem_used_size(&b->mem);
nxt_debug(task, "controller request header parsing complete, "
"body length: %O, preread: %uz",
r->length, preread);
if (preread >= r->length) {
nxt_controller_process_request(task, c, r);
return;
}
if (r->length - preread > (size_t) nxt_buf_mem_free_size(&b->mem)) {
b = nxt_buf_mem_alloc(c->mem_pool, r->length, 0);
if (nxt_slow_path(b == NULL)) {
nxt_controller_conn_free(task, c, NULL);
return;
}
b->mem.free = nxt_cpymem(b->mem.free, c->read->mem.pos, preread);
c->read = b;
}
c->read_state = &nxt_controller_conn_body_read_state;
nxt_event_conn_read(task->thread->engine, c);
} }
@@ -186,7 +299,7 @@ nxt_controller_conn_read_error(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn read error"); nxt_debug(task, "controller conn read error");
nxt_controller_conn_close(task, c, c->socket.data); nxt_controller_conn_close(task, c, data);
} }
@@ -204,7 +317,121 @@ nxt_controller_conn_read_timeout(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn read timeout"); nxt_debug(task, "controller conn read timeout");
nxt_controller_conn_close(task, c, c->socket.data); nxt_controller_conn_close(task, c, data);
}
static const nxt_event_conn_state_t nxt_controller_conn_body_read_state
nxt_aligned(64) =
{
NXT_EVENT_NO_BUF_PROCESS,
NXT_EVENT_TIMER_AUTORESET,
nxt_controller_conn_body_read,
nxt_controller_conn_close,
nxt_controller_conn_read_error,
nxt_controller_conn_read_timeout,
nxt_controller_conn_timeout_value,
60 * 1000,
};
static void
nxt_controller_conn_body_read(nxt_task_t *task, void *obj, void *data)
{
size_t rest;
nxt_buf_t *b;
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn body read");
b = c->read;
rest = nxt_buf_mem_free_size(&b->mem);
if (rest == 0) {
nxt_debug(task, "controller conn body read complete");
nxt_controller_process_request(task, c, data);
return;
}
nxt_debug(task, "controller conn body read again, rest: %uz", rest);
nxt_event_conn_read(task->thread->engine, c);
}
static const nxt_event_conn_state_t nxt_controller_conn_write_state
nxt_aligned(64) =
{
NXT_EVENT_NO_BUF_PROCESS,
NXT_EVENT_TIMER_AUTORESET,
nxt_controller_conn_write,
NULL,
nxt_controller_conn_write_error,
nxt_controller_conn_write_timeout,
nxt_controller_conn_timeout_value,
60 * 1000,
};
static void
nxt_controller_conn_write(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn write");
b = c->write;
if (b->mem.pos != b->mem.free) {
nxt_event_conn_write(task->thread->engine, c);
return;
}
nxt_debug(task, "controller conn write complete");
nxt_controller_conn_close(task, c, data);
}
static void
nxt_controller_conn_write_error(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
c = obj;
nxt_debug(task, "controller conn write error");
nxt_controller_conn_close(task, c, data);
}
static void
nxt_controller_conn_write_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_timer_t *ev;
nxt_event_conn_t *c;
ev = obj;
c = nxt_event_write_timer_conn(ev);
c->socket.timedout = 1;
c->socket.closed = 1;
nxt_debug(task, "controller conn write timeout");
nxt_controller_conn_close(task, c, data);
} }
@@ -233,6 +460,8 @@ nxt_controller_conn_close(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "controller conn close"); nxt_debug(task, "controller conn close");
nxt_queue_remove(&c->link);
c->write_state = &nxt_controller_conn_close_state; c->write_state = &nxt_controller_conn_close_state;
nxt_event_conn_close(task->thread->engine, c); nxt_event_conn_close(task->thread->engine, c);
@@ -252,3 +481,66 @@ nxt_controller_conn_free(nxt_task_t *task, void *obj, void *data)
//nxt_free(c); //nxt_free(c);
} }
static nxt_int_t
nxt_controller_request_content_length(void *ctx, nxt_str_t *name,
nxt_str_t *value, uintptr_t data)
{
off_t length;
nxt_controller_request_t *r;
r = ctx;
length = nxt_off_t_parse(value->start, value->length);
if (nxt_fast_path(length > 0)) {
/* TODO length too big */
r->length = length;
return NXT_OK;
}
/* TODO logging (task?) */
return NXT_ERROR;
}
static void
nxt_controller_process_request(nxt_task_t *task, nxt_event_conn_t *c,
nxt_controller_request_t *r)
{
size_t size;
nxt_buf_t *b, *wb;
static const u_char response[] = "HTTP/1.0 200 OK\r\n\r\n";
b = c->read;
nxt_controller_request_body_parse(&b->mem);
size = nxt_buf_mem_used_size(&b->mem);
wb = nxt_buf_mem_alloc(c->mem_pool, sizeof(response) - 1 + size, 0);
if (nxt_slow_path(wb == NULL)) {
nxt_controller_conn_close(task, c, r);
return;
}
wb->mem.free = nxt_cpymem(wb->mem.free, response, sizeof(response) - 1);
wb->mem.free = nxt_cpymem(wb->mem.free, b->mem.pos, size);
c->write = wb;
c->write_state = &nxt_controller_conn_write_state;
nxt_event_conn_write(task->thread->engine, c);
}
static nxt_int_t
nxt_controller_request_body_parse(nxt_buf_mem_t *b)
{
/* TODO */
return NXT_OK;
}