Upstream chunked transfer encoding support.
This commit is contained in:
@@ -90,6 +90,7 @@ NXT_LIB_SRCS=" \
|
||||
src/nxt_http_return.c \
|
||||
src/nxt_http_static.c \
|
||||
src/nxt_http_proxy.c \
|
||||
src/nxt_http_chunk_parse.c \
|
||||
src/nxt_application.c \
|
||||
src/nxt_external.c \
|
||||
src/nxt_port_hash.c \
|
||||
@@ -107,7 +108,6 @@ NXT_LIB_SRC0=" \
|
||||
src/nxt_stream_source.c \
|
||||
src/nxt_upstream_source.c \
|
||||
src/nxt_http_source.c \
|
||||
src/nxt_http_chunk_parse.c \
|
||||
src/nxt_fastcgi_source.c \
|
||||
src/nxt_fastcgi_record_parse.c \
|
||||
\
|
||||
|
||||
@@ -99,6 +99,7 @@ static nxt_int_t nxt_h1p_peer_header_parse(nxt_http_peer_t *peer,
|
||||
nxt_buf_mem_t *bm);
|
||||
static void nxt_h1p_peer_read(nxt_task_t *task, nxt_http_peer_t *peer);
|
||||
static void nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer, nxt_buf_t *out);
|
||||
static void nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_h1p_peer_error(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_h1p_peer_send_timeout(nxt_task_t *task, void *obj, void *data);
|
||||
@@ -106,6 +107,8 @@ static void nxt_h1p_peer_read_timeout(nxt_task_t *task, void *obj, void *data);
|
||||
static nxt_msec_t nxt_h1p_peer_timer_value(nxt_conn_t *c, uintptr_t data);
|
||||
static void nxt_h1p_peer_close(nxt_task_t *task, nxt_http_peer_t *peer);
|
||||
static void nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data);
|
||||
static nxt_int_t nxt_h1p_peer_transfer_encoding(void *ctx,
|
||||
nxt_http_field_t *field, uintptr_t data);
|
||||
|
||||
#if (NXT_TLS)
|
||||
static const nxt_conn_state_t nxt_http_idle_state;
|
||||
@@ -178,7 +181,7 @@ static nxt_lvlhsh_t nxt_h1p_peer_fields_hash;
|
||||
|
||||
static nxt_http_field_proc_t nxt_h1p_peer_fields[] = {
|
||||
{ nxt_string("Connection"), &nxt_http_proxy_skip, 0 },
|
||||
{ nxt_string("Transfer-Encoding"), &nxt_http_proxy_skip, 0 },
|
||||
{ nxt_string("Transfer-Encoding"), &nxt_h1p_peer_transfer_encoding, 0 },
|
||||
{ nxt_string("Server"), &nxt_http_proxy_skip, 0 },
|
||||
{ nxt_string("Date"), &nxt_http_proxy_date, 0 },
|
||||
{ nxt_string("Content-Length"), &nxt_http_proxy_content_length, 0 },
|
||||
@@ -2139,9 +2142,6 @@ nxt_h1p_peer_connect(nxt_task_t *task, nxt_http_peer_t *peer)
|
||||
peer->proto.h1 = h1p;
|
||||
h1p->request = r;
|
||||
|
||||
c->socket.task = task;
|
||||
c->read_timer.task = task;
|
||||
c->write_timer.task = task;
|
||||
c->socket.data = peer;
|
||||
c->remote = peer->server->sockaddr;
|
||||
|
||||
@@ -2238,7 +2238,8 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
|
||||
r = peer->request;
|
||||
|
||||
size = r->method->length + sizeof(" ") + r->target.length
|
||||
+ sizeof(" HTTP/1.0\r\n")
|
||||
+ sizeof(" HTTP/1.1\r\n")
|
||||
+ sizeof("Connection: close\r\n")
|
||||
+ sizeof("\r\n");
|
||||
|
||||
nxt_list_each(field, r->fields) {
|
||||
@@ -2261,7 +2262,8 @@ nxt_h1p_peer_header_send(nxt_task_t *task, nxt_http_peer_t *peer)
|
||||
p = nxt_cpymem(p, r->method->start, r->method->length);
|
||||
*p++ = ' ';
|
||||
p = nxt_cpymem(p, r->target.start, r->target.length);
|
||||
p = nxt_cpymem(p, " HTTP/1.0\r\n", 11);
|
||||
p = nxt_cpymem(p, " HTTP/1.1\r\n", 11);
|
||||
p = nxt_cpymem(p, "Connection: close\r\n", 19);
|
||||
|
||||
nxt_list_each(field, r->fields) {
|
||||
|
||||
@@ -2466,6 +2468,7 @@ nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
|
||||
nxt_int_t ret;
|
||||
nxt_buf_t *b;
|
||||
nxt_conn_t *c;
|
||||
nxt_h1proto_t *h1p;
|
||||
nxt_http_peer_t *peer;
|
||||
nxt_http_request_t *r;
|
||||
nxt_event_engine_t *engine;
|
||||
@@ -2503,11 +2506,26 @@ nxt_h1p_peer_header_read_done(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
c->read = NULL;
|
||||
|
||||
if (nxt_buf_mem_used_size(&b->mem) != 0) {
|
||||
peer->body = b;
|
||||
peer->header_received = 1;
|
||||
|
||||
h1p = peer->proto.h1;
|
||||
|
||||
if (h1p->chunked) {
|
||||
if (r->resp.content_length != NULL) {
|
||||
peer->status = NXT_HTTP_BAD_GATEWAY;
|
||||
break;
|
||||
}
|
||||
|
||||
peer->header_received = 1;
|
||||
h1p->chunked_parse.mem_pool = c->mem_pool;
|
||||
|
||||
} else if (r->resp.content_length_n > 0) {
|
||||
h1p->remainder = r->resp.content_length_n;
|
||||
}
|
||||
|
||||
if (nxt_buf_mem_used_size(&b->mem) != 0) {
|
||||
nxt_h1p_peer_body_process(task, peer, b);
|
||||
return;
|
||||
}
|
||||
|
||||
r->state->ready_handler(task, r, peer);
|
||||
return;
|
||||
@@ -2613,18 +2631,54 @@ static const nxt_conn_state_t nxt_h1p_peer_read_state
|
||||
static void
|
||||
nxt_h1p_peer_read_done(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_buf_t *out;
|
||||
nxt_conn_t *c;
|
||||
nxt_http_peer_t *peer;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
c = obj;
|
||||
peer = data;
|
||||
|
||||
nxt_debug(task, "h1p peer read done");
|
||||
|
||||
peer->body = c->read;
|
||||
out = c->read;
|
||||
c->read = NULL;
|
||||
|
||||
nxt_h1p_peer_body_process(task, peer, out);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_h1p_peer_body_process(nxt_task_t *task, nxt_http_peer_t *peer,
|
||||
nxt_buf_t *out)
|
||||
{
|
||||
size_t length;
|
||||
nxt_h1proto_t *h1p;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
h1p = peer->proto.h1;
|
||||
|
||||
if (h1p->chunked) {
|
||||
out = nxt_http_chunk_parse(task, &h1p->chunked_parse, out);
|
||||
|
||||
if (h1p->chunked_parse.chunk_error || h1p->chunked_parse.error) {
|
||||
peer->status = NXT_HTTP_BAD_GATEWAY;
|
||||
r = peer->request;
|
||||
r->state->error_handler(task, r, peer);
|
||||
return;
|
||||
}
|
||||
|
||||
if (h1p->chunked_parse.last) {
|
||||
nxt_buf_chain_add(&out, nxt_http_buf_last(peer->request));
|
||||
peer->closed = 1;
|
||||
}
|
||||
|
||||
} else if (h1p->remainder > 0) {
|
||||
length = nxt_buf_chain_length(out);
|
||||
h1p->remainder -= length;
|
||||
}
|
||||
|
||||
peer->body = out;
|
||||
|
||||
r = peer->request;
|
||||
r->state->ready_handler(task, r, peer);
|
||||
}
|
||||
@@ -2644,8 +2698,8 @@ nxt_h1p_peer_closed(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
if (peer->header_received) {
|
||||
peer->body = nxt_http_buf_last(r);
|
||||
|
||||
peer->closed = 1;
|
||||
r->inconsistent = (peer->proto.h1->remainder != 0);
|
||||
|
||||
r->state->ready_handler(task, r, peer);
|
||||
|
||||
@@ -2777,3 +2831,22 @@ nxt_h1p_peer_free(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
nxt_conn_free(task, c);
|
||||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_h1p_peer_transfer_encoding(void *ctx, nxt_http_field_t *field,
|
||||
uintptr_t data)
|
||||
{
|
||||
nxt_http_request_t *r;
|
||||
|
||||
r = ctx;
|
||||
field->skip = 1;
|
||||
|
||||
if (field->value_length == 7
|
||||
&& nxt_memcmp(field->value, "chunked", 7) == 0)
|
||||
{
|
||||
r->peer->proto.h1->chunked = 1;
|
||||
}
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ typedef struct nxt_h1p_websocket_timer_s nxt_h1p_websocket_timer_t;
|
||||
|
||||
struct nxt_h1proto_s {
|
||||
nxt_http_request_parse_t parser;
|
||||
nxt_http_chunk_parse_t chunked_parse;
|
||||
nxt_off_t remainder;
|
||||
|
||||
uint8_t nbuffers;
|
||||
uint8_t header_buffer_slot;
|
||||
|
||||
@@ -119,7 +119,6 @@ typedef struct {
|
||||
nxt_upstream_server_t *server;
|
||||
nxt_list_t *fields;
|
||||
nxt_buf_t *body;
|
||||
nxt_off_t remainder;
|
||||
|
||||
nxt_http_status_t status:16;
|
||||
nxt_http_protocol_t protocol:8; /* 2 bits */
|
||||
|
||||
@@ -21,13 +21,17 @@ static nxt_int_t nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp,
|
||||
nxt_buf_t ***tail, nxt_buf_t *in);
|
||||
|
||||
|
||||
static void nxt_http_chunk_buf_completion(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
|
||||
|
||||
nxt_buf_t *
|
||||
nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
|
||||
nxt_buf_t *in)
|
||||
{
|
||||
u_char c, ch;
|
||||
nxt_int_t ret;
|
||||
nxt_buf_t *b, *out, *nb, **tail;
|
||||
nxt_buf_t *b, *out, *next, **tail;
|
||||
enum {
|
||||
sw_start = 0,
|
||||
sw_chunk_size,
|
||||
@@ -37,12 +41,13 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
|
||||
sw_chunk,
|
||||
} state;
|
||||
|
||||
next = NULL;
|
||||
out = NULL;
|
||||
tail = &out;
|
||||
|
||||
state = hcp->state;
|
||||
|
||||
for (b = in; b != NULL; b = b->next) {
|
||||
for (b = in; b != NULL; b = next) {
|
||||
|
||||
hcp->pos = b->mem.pos;
|
||||
|
||||
@@ -60,7 +65,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
|
||||
|
||||
if (nxt_slow_path(ret == NXT_ERROR)) {
|
||||
hcp->error = 1;
|
||||
goto done;
|
||||
return out;
|
||||
}
|
||||
|
||||
state = sw_chunk_end_newline;
|
||||
@@ -152,7 +157,7 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
|
||||
continue;
|
||||
}
|
||||
|
||||
goto done;
|
||||
return out;
|
||||
}
|
||||
|
||||
goto chunk_error;
|
||||
@@ -168,15 +173,15 @@ nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
|
||||
|
||||
if (b->retain == 0) {
|
||||
/* No chunk data was found in a buffer. */
|
||||
nxt_thread_current_work_queue_add(task->thread,
|
||||
b->completion_handler,
|
||||
task, b, b->parent);
|
||||
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
|
||||
b->completion_handler, task, b, b->parent);
|
||||
|
||||
}
|
||||
|
||||
next:
|
||||
|
||||
continue;
|
||||
next = b->next;
|
||||
b->next = NULL;
|
||||
}
|
||||
|
||||
hcp->state = state;
|
||||
@@ -187,20 +192,6 @@ chunk_error:
|
||||
|
||||
hcp->chunk_error = 1;
|
||||
|
||||
done:
|
||||
|
||||
nb = nxt_buf_sync_alloc(hcp->mem_pool, NXT_BUF_SYNC_LAST);
|
||||
|
||||
if (nxt_fast_path(nb != NULL)) {
|
||||
*tail = nb;
|
||||
|
||||
} else {
|
||||
hcp->error = 1;
|
||||
}
|
||||
|
||||
// STUB: hcp->chunk_error = 1;
|
||||
// STUB: hcp->error = 1;
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
@@ -216,16 +207,6 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail,
|
||||
p = hcp->pos;
|
||||
size = in->mem.free - p;
|
||||
|
||||
if (hcp->chunk_size >= size && in->retain == 0) {
|
||||
/*
|
||||
* Use original buffer if the buffer is lesser than or equal
|
||||
* to a chunk size and this is the first chunk in the buffer.
|
||||
*/
|
||||
in->mem.pos = p;
|
||||
**tail = in;
|
||||
*tail = &in->next;
|
||||
|
||||
} else {
|
||||
b = nxt_buf_mem_alloc(hcp->mem_pool, 0, 0);
|
||||
if (nxt_slow_path(b == NULL)) {
|
||||
return NXT_ERROR;
|
||||
@@ -234,6 +215,9 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail,
|
||||
**tail = b;
|
||||
*tail = &b->next;
|
||||
|
||||
nxt_mp_retain(hcp->mem_pool);
|
||||
b->completion_handler = nxt_http_chunk_buf_completion;
|
||||
|
||||
b->parent = in;
|
||||
in->retain++;
|
||||
b->mem.pos = p;
|
||||
@@ -251,7 +235,6 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail,
|
||||
|
||||
b->mem.free = in->mem.free;
|
||||
b->mem.end = in->mem.free;
|
||||
}
|
||||
|
||||
hcp->chunk_size -= size;
|
||||
|
||||
@@ -261,3 +244,31 @@ nxt_http_chunk_buffer(nxt_http_chunk_parse_t *hcp, nxt_buf_t ***tail,
|
||||
|
||||
return NXT_HTTP_CHUNK_MIDDLE;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_http_chunk_buf_completion(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_mp_t *mp;
|
||||
nxt_buf_t *b, *next, *parent;
|
||||
|
||||
b = obj;
|
||||
parent = data;
|
||||
|
||||
nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
|
||||
|
||||
nxt_assert(data == b->parent);
|
||||
|
||||
do {
|
||||
next = b->next;
|
||||
parent = b->parent;
|
||||
mp = b->data;
|
||||
|
||||
nxt_mp_free(mp, b);
|
||||
nxt_mp_release(mp);
|
||||
|
||||
nxt_buf_parent_completion(task, parent);
|
||||
|
||||
b = next;
|
||||
} while (b != NULL);
|
||||
}
|
||||
|
||||
@@ -90,6 +90,19 @@ struct nxt_http_field_s {
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
u_char *pos;
|
||||
nxt_mp_t *mem_pool;
|
||||
|
||||
uint64_t chunk_size;
|
||||
|
||||
uint8_t state;
|
||||
uint8_t last; /* 1 bit */
|
||||
uint8_t chunk_error; /* 1 bit */
|
||||
uint8_t error; /* 1 bit */
|
||||
} nxt_http_chunk_parse_t;
|
||||
|
||||
|
||||
#define NXT_HTTP_FIELD_HASH_INIT 159406U
|
||||
#define nxt_http_field_hash_char(h, c) (((h) << 4) + (h) + (c))
|
||||
#define nxt_http_field_hash_end(h) (((h) >> 16) ^ (h))
|
||||
@@ -109,6 +122,9 @@ nxt_uint_t nxt_http_fields_hash_collisions(nxt_lvlhsh_t *hash,
|
||||
nxt_int_t nxt_http_fields_process(nxt_list_t *fields, nxt_lvlhsh_t *hash,
|
||||
void *ctx);
|
||||
|
||||
nxt_buf_t *nxt_http_chunk_parse(nxt_task_t *task, nxt_http_chunk_parse_t *hcp,
|
||||
nxt_buf_t *in);
|
||||
|
||||
|
||||
extern const nxt_lvlhsh_proto_t nxt_http_fields_hash_proto;
|
||||
|
||||
|
||||
@@ -27,8 +27,6 @@ static void nxt_http_proxy_header_send(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_http_proxy_header_sent(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_http_proxy_request_send(nxt_task_t *task,
|
||||
nxt_http_request_t *r, nxt_buf_t *out);
|
||||
static void nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data);
|
||||
static void nxt_http_proxy_buf_mem_completion(nxt_task_t *task, void *obj,
|
||||
void *data);
|
||||
@@ -253,10 +251,6 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
nxt_debug(task, "http proxy status: %d", peer->status);
|
||||
|
||||
if (r->resp.content_length_n > 0) {
|
||||
peer->remainder = r->resp.content_length_n;
|
||||
}
|
||||
|
||||
nxt_list_each(field, peer->fields) {
|
||||
|
||||
nxt_debug(task, "http proxy header: \"%*s: %*s\"",
|
||||
@@ -275,6 +269,8 @@ nxt_http_proxy_header_read(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
} nxt_list_loop;
|
||||
|
||||
r->state = &nxt_http_proxy_read_state;
|
||||
|
||||
nxt_http_request_header_send(task, r, nxt_http_proxy_send_body, peer);
|
||||
}
|
||||
|
||||
@@ -292,27 +288,13 @@ nxt_http_proxy_send_body(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
if (out != NULL) {
|
||||
peer->body = NULL;
|
||||
nxt_http_proxy_request_send(task, r, out);
|
||||
}
|
||||
|
||||
r->state = &nxt_http_proxy_read_state;
|
||||
|
||||
nxt_http_proto[peer->protocol].peer_read(task, peer);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_http_proxy_request_send(nxt_task_t *task, nxt_http_request_t *r,
|
||||
nxt_buf_t *out)
|
||||
{
|
||||
size_t length;
|
||||
|
||||
if (r->peer->remainder > 0) {
|
||||
length = nxt_buf_chain_length(out);
|
||||
r->peer->remainder -= length;
|
||||
}
|
||||
|
||||
nxt_http_request_send(task, r, out);
|
||||
|
||||
}
|
||||
|
||||
if (!peer->closed) {
|
||||
nxt_http_proto[peer->protocol].peer_read(task, peer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -328,7 +310,6 @@ static void
|
||||
nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_buf_t *out;
|
||||
nxt_bool_t last;
|
||||
nxt_http_peer_t *peer;
|
||||
nxt_http_request_t *r;
|
||||
|
||||
@@ -336,16 +317,15 @@ nxt_http_proxy_read(nxt_task_t *task, void *obj, void *data)
|
||||
peer = data;
|
||||
out = peer->body;
|
||||
peer->body = NULL;
|
||||
last = nxt_buf_is_last(out);
|
||||
|
||||
nxt_http_proxy_request_send(task, r, out);
|
||||
if (out != NULL) {
|
||||
nxt_http_request_send(task, r, out);
|
||||
}
|
||||
|
||||
if (!last) {
|
||||
if (!peer->closed) {
|
||||
nxt_http_proto[peer->protocol].peer_read(task, peer);
|
||||
|
||||
} else {
|
||||
r->inconsistent = (peer->remainder != 0);
|
||||
|
||||
nxt_http_proto[peer->protocol].peer_close(task, peer);
|
||||
|
||||
nxt_mp_release(r->mem_pool);
|
||||
@@ -422,7 +402,7 @@ nxt_http_proxy_error(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
nxt_mp_release(r->mem_pool);
|
||||
|
||||
nxt_http_request_error(task, r, peer->status);
|
||||
nxt_http_request_error(&r->task, r, peer->status);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user