HTTP: using r->mem_pool retention counter for response buffers.

This commit is contained in:
Igor Sysoev
2018-04-03 16:28:26 +03:00
parent 151160c110
commit fa04c05aa0
9 changed files with 154 additions and 50 deletions

View File

@@ -144,7 +144,7 @@ nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags)
{
nxt_buf_t *b;
b = nxt_mp_zalloc(mp, NXT_BUF_SYNC_SIZE);
b = nxt_mp_zalloc(mp, NXT_BUF_MEM_SIZE);
if (nxt_fast_path(b != NULL)) {
b->data = mp;

View File

@@ -101,8 +101,8 @@ struct nxt_buf_s {
};
#define NXT_BUF_SYNC_SIZE offsetof(nxt_buf_t, mem.free)
#define NXT_BUF_MEM_SIZE offsetof(nxt_buf_t, file)
#define NXT_BUF_SYNC_SIZE NXT_BUF_MEM_SIZE
#define NXT_BUF_FILE_SIZE sizeof(nxt_buf_t)
#define NXT_BUF_MMAP_SIZE NXT_BUF_FILE_SIZE
#define NXT_BUF_PORT_MMAP_SIZE NXT_BUF_MEM_SIZE

View File

@@ -21,9 +21,12 @@ static void nxt_h1p_request_header_send(nxt_task_t *task,
nxt_http_request_t *r);
static void nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
nxt_inline void nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r);
static nxt_buf_t *nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
static void nxt_h1p_sent(nxt_task_t *task, void *obj, void *data);
static void nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *last);
static void nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto);
static void nxt_h1p_keepalive(nxt_task_t *task, nxt_h1proto_t *h1p,
nxt_conn_t *c);
@@ -68,6 +71,13 @@ const nxt_http_proto_send_t nxt_http_proto_send[3] = {
};
const nxt_http_proto_discard_t nxt_http_proto_discard[3] = {
nxt_h1p_request_discard,
NULL,
NULL,
};
const nxt_http_proto_close_t nxt_http_proto_close[3] = {
nxt_h1p_request_close,
NULL,
@@ -687,11 +697,9 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
} nxt_list_loop;
header = nxt_buf_mem_alloc(r->mem_pool, size, 0);
header = nxt_http_buf_mem(task, r, size);
if (nxt_slow_path(header == NULL)) {
/* The internal server error is set just for logging. */
r->status = NXT_HTTP_INTERNAL_SERVER_ERROR;
nxt_h1p_conn_close(task, h1p->conn, h1p);
nxt_h1p_request_error(task, r);
return;
}
@@ -738,6 +746,13 @@ nxt_h1p_request_header_send(nxt_task_t *task, nxt_http_request_t *r)
}
nxt_inline void
nxt_h1p_request_error(nxt_task_t *task, nxt_http_request_t *r)
{
r->state->error_handler(task, r, r->proto.h1);
}
static const nxt_conn_state_t nxt_h1p_send_state
nxt_aligned(64) =
{
@@ -764,7 +779,7 @@ nxt_h1p_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
if (r->proto.h1->chunked) {
out = nxt_h1p_chunk_create(task, r, out);
if (nxt_slow_path(out == NULL)) {
nxt_h1p_conn_error(task, c, c->socket.data);
nxt_h1p_request_error(task, r);
return;
}
}
@@ -796,7 +811,7 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
for (b = out; b != NULL; b = b->next) {
if (nxt_buf_is_last(b)) {
tail = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0);
tail = nxt_http_buf_mem(task, r, chunk_size);
if (nxt_slow_path(tail == NULL)) {
return NULL;
}
@@ -821,7 +836,7 @@ nxt_h1p_chunk_create(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
return out;
}
header = nxt_buf_mem_alloc(r->mem_pool, chunk_size, 0);
header = nxt_http_buf_mem(task, r, chunk_size);
if (nxt_slow_path(header == NULL)) {
return NULL;
}
@@ -853,6 +868,31 @@ nxt_h1p_sent(nxt_task_t *task, void *obj, void *data)
}
static void
nxt_h1p_request_discard(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *last)
{
nxt_buf_t *b;
nxt_conn_t *c;
nxt_h1proto_t *h1p;
nxt_work_queue_t *wq;
nxt_debug(task, "h1p request discard");
h1p = r->proto.h1;
h1p->keepalive = 0;
c = h1p->conn;
b = c->write;
c->write = NULL;
wq = &task->thread->engine->fast_work_queue;
nxt_sendbuf_drain(task, wq, b);
nxt_sendbuf_drain(task, wq, last);
}
static void
nxt_h1p_request_close(nxt_task_t *task, nxt_http_proto_t proto)
{
@@ -967,7 +1007,7 @@ nxt_h1p_conn_close(nxt_task_t *task, void *obj, void *data)
r = h1p->request;
if (r != NULL) {
r->state->error_handler(task, r, r->proto.h1);
nxt_h1p_request_error(task, r);
return;
}
}

View File

@@ -122,6 +122,8 @@ struct nxt_http_request_s {
nxt_sockaddr_t *remote;
nxt_sockaddr_t *local;
nxt_buf_t *last;
nxt_http_response_t resp;
nxt_http_status_t status:16;
@@ -129,6 +131,7 @@ struct nxt_http_request_s {
uint8_t protocol; /* 2 bits */
uint8_t logged; /* 1 bit */
uint8_t header_sent; /* 1 bit */
uint8_t error; /* 1 bit */
};
@@ -140,6 +143,8 @@ typedef void (*nxt_http_proto_header_send_t)(nxt_task_t *task,
nxt_http_request_t *r);
typedef void (*nxt_http_proto_send_t)(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
typedef void (*nxt_http_proto_discard_t)(nxt_task_t *task,
nxt_http_request_t *r, nxt_buf_t *last);
typedef void (*nxt_http_proto_close_t)(nxt_task_t *task,
nxt_http_proto_t proto);
@@ -157,9 +162,10 @@ void nxt_http_request_local_addr(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_header_send(nxt_task_t *task, nxt_http_request_t *r);
void nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r,
nxt_buf_t *out);
void nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r);
nxt_buf_t *nxt_http_request_last_buffer(nxt_task_t *task,
nxt_http_request_t *r);
nxt_buf_t *nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r,
size_t size);
nxt_buf_t *nxt_http_buf_last(nxt_http_request_t *r);
void nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data);
void nxt_http_request_close_handler(nxt_task_t *task, void *obj, void *data);
nxt_int_t nxt_http_request_host(void *ctx, nxt_http_field_t *field,
@@ -177,6 +183,7 @@ extern const nxt_http_proto_body_read_t nxt_http_proto_body_read[];
extern const nxt_http_proto_local_addr_t nxt_http_proto_local_addr[];
extern const nxt_http_proto_header_send_t nxt_http_proto_header_send[];
extern const nxt_http_proto_send_t nxt_http_proto_send[];
extern const nxt_http_proto_discard_t nxt_http_proto_discard[];
extern const nxt_http_proto_close_t nxt_http_proto_close[];

View File

@@ -12,7 +12,7 @@ static void nxt_http_request_send_error_body(nxt_task_t *task, void *r,
void *data);
static const nxt_http_request_state_t nxt_http_request_send_state;
static const nxt_http_request_state_t nxt_http_request_send_error_body_state;
static const char error[] =
@@ -28,10 +28,12 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
nxt_debug(task, "http request error: %d", status);
if (r->header_sent) {
if (r->header_sent || r->error) {
goto fail;
}
r->error = (status == NXT_HTTP_INTERNAL_SERVER_ERROR);
r->status = status;
r->resp.fields = nxt_list_create(r->mem_pool, 8, sizeof(nxt_http_field_t));
@@ -49,36 +51,36 @@ nxt_http_request_error(nxt_task_t *task, nxt_http_request_t *r,
r->resp.content_length = NULL;
r->resp.content_length_n = sizeof(error) - 1;
r->state = &nxt_http_request_send_state;
r->state = &nxt_http_request_send_error_body_state;
nxt_http_request_header_send(task, r);
return;
fail:
nxt_http_request_release(task, r);
nxt_http_request_error_handler(task, r, r->proto.any);
}
static const nxt_http_request_state_t nxt_http_request_send_state
static const nxt_http_request_state_t nxt_http_request_send_error_body_state
nxt_aligned(64) =
{
.ready_handler = nxt_http_request_send_error_body,
.error_handler = nxt_http_request_close_handler,
.error_handler = nxt_http_request_error_handler,
};
static void
nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *out, *last;
nxt_buf_t *out;
nxt_http_request_t *r;
r = obj;
nxt_debug(task, "http request send error body");
out = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
out = nxt_http_buf_mem(task, r, 0);
if (nxt_slow_path(out == NULL)) {
goto fail;
}
@@ -88,18 +90,13 @@ nxt_http_request_send_error_body(nxt_task_t *task, void *obj, void *data)
out->mem.free = out->mem.start + sizeof(error) - 1;
out->mem.end = out->mem.free;
last = nxt_http_request_last_buffer(task, r);
if (nxt_slow_path(last == NULL)) {
goto fail;
}
out->next = last;
out->next = nxt_http_buf_last(r);
nxt_http_request_send(task, r, out);
return;
fail:
// TODO
nxt_http_request_release(task, r);
nxt_http_request_error_handler(task, r, r->proto.any);
}

View File

@@ -10,6 +10,8 @@
static void nxt_http_request_start(nxt_task_t *task, void *obj, void *data);
static void nxt_http_app_request(nxt_task_t *task, void *obj, void *data);
static void nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj,
void *data);
static void nxt_http_request_done(nxt_task_t *task, void *obj, void *data);
static u_char *nxt_http_date(u_char *buf, nxt_realtime_t *now, struct tm *tm,
@@ -82,6 +84,7 @@ nxt_http_request_t *
nxt_http_request_create(nxt_task_t *task)
{
nxt_mp_t *mp;
nxt_buf_t *last;
nxt_http_request_t *r;
mp = nxt_mp_create(1024, 128, 256, 32);
@@ -99,6 +102,17 @@ nxt_http_request_create(nxt_task_t *task)
goto fail;
}
last = nxt_mp_zget(mp, NXT_BUF_SYNC_SIZE);
if (nxt_slow_path(last == NULL)) {
goto fail;
}
nxt_buf_set_sync(last);
nxt_buf_set_last(last);
last->completion_handler = nxt_http_request_done;
last->parent = r;
r->last = last;
r->mem_pool = mp;
r->content_length_n = -1;
r->resp.content_length_n = -1;
@@ -109,6 +123,7 @@ nxt_http_request_create(nxt_task_t *task)
fail:
nxt_mp_release(mp);
return NULL;
}
@@ -349,26 +364,51 @@ nxt_http_request_send(nxt_task_t *task, nxt_http_request_t *r, nxt_buf_t *out)
nxt_buf_t *
nxt_http_request_last_buffer(nxt_task_t *task, nxt_http_request_t *r)
nxt_http_buf_mem(nxt_task_t *task, nxt_http_request_t *r, size_t size)
{
nxt_buf_t *b;
b = nxt_buf_mem_alloc(r->mem_pool, 0, 0);
b = nxt_buf_mem_alloc(r->mem_pool, size, 0);
if (nxt_fast_path(b != NULL)) {
nxt_buf_set_sync(b);
nxt_buf_set_last(b);
b->completion_handler = nxt_http_request_done;
b->completion_handler = nxt_http_request_mem_buf_completion;
b->parent = r;
nxt_mp_retain(r->mem_pool);
} else {
nxt_http_request_release(task, r);
nxt_http_request_error(task, r, NXT_HTTP_INTERNAL_SERVER_ERROR);
}
return b;
}
static void
nxt_http_request_mem_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_http_request_t *r;
b = obj;
r = data;
nxt_mp_free(r->mem_pool, b);
nxt_mp_release(r->mem_pool);
}
nxt_buf_t *
nxt_http_buf_last(nxt_http_request_t *r)
{
nxt_buf_t *last;
last = r->last;
r->last = NULL;
return last;
}
static void
nxt_http_request_done(nxt_task_t *task, void *obj, void *data)
{
@@ -383,12 +423,19 @@ nxt_http_request_done(nxt_task_t *task, void *obj, void *data)
void
nxt_http_request_release(nxt_task_t *task, nxt_http_request_t *r)
nxt_http_request_error_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_debug(task, "http request release");
nxt_http_proto_t proto;
nxt_http_request_t *r;
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_http_request_close_handler, task, r, r->proto.any);
r = obj;
proto.any = data;
nxt_debug(task, "http request error handler");
if (proto.any != NULL) {
nxt_http_proto_discard[r->protocol](task, r, nxt_http_buf_last(r));
}
}

View File

@@ -2636,7 +2636,7 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
size_t dump_size;
nxt_int_t ret;
nxt_buf_t *b, *last;
nxt_buf_t *b;
nxt_http_request_t *r;
nxt_req_conn_link_t *rc;
nxt_app_parse_ctx_t *ar;
@@ -2663,17 +2663,16 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
return;
}
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
last = nxt_http_request_last_buffer(task, ar->request);
if (nxt_slow_path(last == NULL)) {
if (ar->request->error) {
nxt_app_http_req_done(task, ar);
nxt_router_rc_unlink(task, rc);
return;
}
nxt_buf_chain_add(&b, last);
if (msg->port_msg.last != 0) {
nxt_debug(task, "router data create last buf");
nxt_buf_chain_add(&b, nxt_http_buf_last(ar->request));
nxt_router_rc_unlink(task, rc);

View File

@@ -382,7 +382,7 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
nxt_prefetch(b->next);
if (nxt_buf_used_size(b) != 0) {
if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
break;
}
@@ -393,3 +393,16 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
return b;
}
void
nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
{
while (b != NULL) {
nxt_prefetch(b->next);
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
}
}

View File

@@ -124,6 +124,7 @@ ssize_t nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm,
nxt_buf_t *nxt_sendbuf_update(nxt_buf_t *b, size_t sent);
nxt_buf_t *nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq,
nxt_buf_t *b);
void nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b);
#endif /* _NXT_SENDBUF_H_INCLUDED_ */