Introducing connection state io_read_handler.
This commit is contained in:
@@ -8,6 +8,7 @@
|
|||||||
#define _NXT_CONN_H_INCLUDED_
|
#define _NXT_CONN_H_INCLUDED_
|
||||||
|
|
||||||
|
|
||||||
|
typedef ssize_t (*nxt_conn_io_read_t)(nxt_conn_t *c);
|
||||||
typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
|
typedef nxt_msec_t (*nxt_conn_timer_value_t)(nxt_conn_t *c, uintptr_t data);
|
||||||
|
|
||||||
|
|
||||||
@@ -16,6 +17,8 @@ typedef struct {
|
|||||||
nxt_work_handler_t close_handler;
|
nxt_work_handler_t close_handler;
|
||||||
nxt_work_handler_t error_handler;
|
nxt_work_handler_t error_handler;
|
||||||
|
|
||||||
|
nxt_conn_io_read_t io_read_handler;
|
||||||
|
|
||||||
nxt_work_handler_t timer_handler;
|
nxt_work_handler_t timer_handler;
|
||||||
nxt_conn_timer_value_t timer_value;
|
nxt_conn_timer_value_t timer_value;
|
||||||
uintptr_t timer_data;
|
uintptr_t timer_data;
|
||||||
@@ -160,7 +163,6 @@ struct nxt_conn_s {
|
|||||||
nxt_sockaddr_t *local;
|
nxt_sockaddr_t *local;
|
||||||
const char *action;
|
const char *action;
|
||||||
|
|
||||||
uint8_t peek;
|
|
||||||
uint8_t blocked; /* 1 bit */
|
uint8_t blocked; /* 1 bit */
|
||||||
uint8_t delayed; /* 1 bit */
|
uint8_t delayed; /* 1 bit */
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,6 @@ void
|
|||||||
nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
|
nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
ssize_t n;
|
ssize_t n;
|
||||||
nxt_buf_t *b;
|
|
||||||
nxt_conn_t *c;
|
nxt_conn_t *c;
|
||||||
nxt_work_queue_t *wq;
|
nxt_work_queue_t *wq;
|
||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
@@ -56,19 +55,17 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
if (c->socket.read_ready) {
|
if (c->socket.read_ready) {
|
||||||
|
|
||||||
b = c->read;
|
if (state->io_read_handler == NULL) {
|
||||||
|
n = c->io->recvbuf(c, c->read);
|
||||||
if (c->peek == 0) {
|
|
||||||
n = c->io->recvbuf(c, b);
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
n = c->io->recv(c, b->mem.free, c->peek, MSG_PEEK);
|
n = state->io_read_handler(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (n > 0) {
|
if (n > 0) {
|
||||||
c->nbytes = n;
|
c->nbytes = n;
|
||||||
|
|
||||||
nxt_recvbuf_update(b, n);
|
nxt_recvbuf_update(c->read, n);
|
||||||
|
|
||||||
nxt_fd_event_block_read(engine, &c->socket);
|
nxt_fd_event_block_read(engine, &c->socket);
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@
|
|||||||
* nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
|
* nxt_h1p_request_ prefix is used for HTTP/1 protocol request methods.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static void nxt_h1p_conn_read_header(nxt_task_t *task, void *obj, void *data);
|
static ssize_t nxt_h1p_conn_io_read_handler(nxt_conn_t *c);
|
||||||
static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
|
static void nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
|
static void nxt_h1p_conn_request_init(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data);
|
static void nxt_h1p_conn_header_parse(nxt_task_t *task, void *obj, void *data);
|
||||||
@@ -53,7 +53,6 @@ static nxt_msec_t nxt_h1p_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
|
|||||||
|
|
||||||
|
|
||||||
static const nxt_conn_state_t nxt_h1p_idle_state;
|
static const nxt_conn_state_t nxt_h1p_idle_state;
|
||||||
static const nxt_conn_state_t nxt_h1p_proto_init_state;
|
|
||||||
static const nxt_conn_state_t nxt_h1p_header_parse_state;
|
static const nxt_conn_state_t nxt_h1p_header_parse_state;
|
||||||
static const nxt_conn_state_t nxt_h1p_read_body_state;
|
static const nxt_conn_state_t nxt_h1p_read_body_state;
|
||||||
static const nxt_conn_state_t nxt_h1p_send_state;
|
static const nxt_conn_state_t nxt_h1p_send_state;
|
||||||
@@ -151,62 +150,55 @@ nxt_http_conn_init(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
c->read_state = &nxt_h1p_idle_state;
|
c->read_state = &nxt_h1p_idle_state;
|
||||||
|
|
||||||
nxt_conn_wait(c);
|
nxt_conn_read(task->thread->engine, c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static const nxt_conn_state_t nxt_h1p_idle_state
|
static const nxt_conn_state_t nxt_h1p_idle_state
|
||||||
nxt_aligned(64) =
|
nxt_aligned(64) =
|
||||||
{
|
|
||||||
.ready_handler = nxt_h1p_conn_read_header,
|
|
||||||
.close_handler = nxt_h1p_conn_error,
|
|
||||||
.error_handler = nxt_h1p_conn_error,
|
|
||||||
|
|
||||||
.timer_handler = nxt_h1p_conn_timeout,
|
|
||||||
.timer_value = nxt_h1p_conn_timeout_value,
|
|
||||||
.timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
nxt_h1p_conn_read_header(nxt_task_t *task, void *obj, void *data)
|
|
||||||
{
|
|
||||||
size_t size;
|
|
||||||
nxt_conn_t *c;
|
|
||||||
nxt_socket_conf_joint_t *joint;
|
|
||||||
|
|
||||||
c = obj;
|
|
||||||
|
|
||||||
nxt_debug(task, "h1p conn read header");
|
|
||||||
|
|
||||||
joint = c->joint;
|
|
||||||
size = joint->socket_conf->header_buffer_size;
|
|
||||||
|
|
||||||
c->read = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
|
||||||
if (nxt_slow_path(c->read == NULL)) {
|
|
||||||
nxt_h1p_conn_error(task, c, c->socket.data);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
c->read_state = &nxt_h1p_proto_init_state;
|
|
||||||
|
|
||||||
nxt_conn_read(task->thread->engine, c);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static const nxt_conn_state_t nxt_h1p_proto_init_state
|
|
||||||
nxt_aligned(64) =
|
|
||||||
{
|
{
|
||||||
.ready_handler = nxt_h1p_conn_proto_init,
|
.ready_handler = nxt_h1p_conn_proto_init,
|
||||||
.close_handler = nxt_h1p_conn_error,
|
.close_handler = nxt_h1p_conn_error,
|
||||||
.error_handler = nxt_h1p_conn_error,
|
.error_handler = nxt_h1p_conn_error,
|
||||||
|
|
||||||
|
.io_read_handler = nxt_h1p_conn_io_read_handler,
|
||||||
|
|
||||||
.timer_handler = nxt_h1p_conn_timeout,
|
.timer_handler = nxt_h1p_conn_timeout,
|
||||||
.timer_value = nxt_h1p_conn_timeout_value,
|
.timer_value = nxt_h1p_conn_timeout_value,
|
||||||
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
|
.timer_data = offsetof(nxt_socket_conf_t, header_read_timeout),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static ssize_t
|
||||||
|
nxt_h1p_conn_io_read_handler(nxt_conn_t *c)
|
||||||
|
{
|
||||||
|
size_t size;
|
||||||
|
ssize_t n;
|
||||||
|
nxt_buf_t *b;
|
||||||
|
nxt_socket_conf_joint_t *joint;
|
||||||
|
|
||||||
|
joint = c->joint;
|
||||||
|
size = joint->socket_conf->header_buffer_size;
|
||||||
|
|
||||||
|
b = nxt_buf_mem_alloc(c->mem_pool, size, 0);
|
||||||
|
if (nxt_slow_path(b == NULL)) {
|
||||||
|
c->socket.error = NXT_ENOMEM;
|
||||||
|
return NXT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
n = c->io->recvbuf(c, b);
|
||||||
|
|
||||||
|
if (n > 0) {
|
||||||
|
c->read = b;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_mp_free(c->mem_pool, b);
|
||||||
|
}
|
||||||
|
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
|
nxt_h1p_conn_proto_init(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
@@ -1021,9 +1013,9 @@ nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p, nxt_conn_t *c)
|
|||||||
size = nxt_buf_mem_used_size(&in->mem);
|
size = nxt_buf_mem_used_size(&in->mem);
|
||||||
|
|
||||||
if (size == 0) {
|
if (size == 0) {
|
||||||
in->mem.pos = in->mem.start;
|
nxt_mp_free(c->mem_pool, in);
|
||||||
in->mem.free = in->mem.start;
|
|
||||||
|
|
||||||
|
c->read = NULL;
|
||||||
c->read_state = &nxt_h1p_keepalive_state;
|
c->read_state = &nxt_h1p_keepalive_state;
|
||||||
|
|
||||||
nxt_conn_read(task->thread->engine, c);
|
nxt_conn_read(task->thread->engine, c);
|
||||||
@@ -1048,6 +1040,8 @@ static const nxt_conn_state_t nxt_h1p_keepalive_state
|
|||||||
.close_handler = nxt_h1p_conn_error,
|
.close_handler = nxt_h1p_conn_error,
|
||||||
.error_handler = nxt_h1p_conn_error,
|
.error_handler = nxt_h1p_conn_error,
|
||||||
|
|
||||||
|
.io_read_handler = nxt_h1p_conn_io_read_handler,
|
||||||
|
|
||||||
.timer_handler = nxt_h1p_conn_timeout,
|
.timer_handler = nxt_h1p_conn_timeout,
|
||||||
.timer_value = nxt_h1p_conn_timeout_value,
|
.timer_value = nxt_h1p_conn_timeout_value,
|
||||||
.timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
|
.timer_data = offsetof(nxt_socket_conf_t, idle_timeout),
|
||||||
|
|||||||
Reference in New Issue
Block a user