Introducing tasks.

This commit is contained in:
Igor Sysoev
2017-01-23 19:56:03 +03:00
parent 16cbf3c076
commit de532922d9
71 changed files with 1694 additions and 1499 deletions

View File

@@ -49,16 +49,17 @@ static nxt_buf_t *nxt_fastcgi_request_create(nxt_fastcgi_source_t *fs);
static nxt_int_t nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs,
nxt_fastcgi_param_t *param);
static void nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj,
static void nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj,
void *data);
static void nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj,
static void nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj,
static void nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj,
void *data);
static void nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr,
static void nxt_fastcgi_source_sync_buffer(nxt_task_t *task,
nxt_fastcgi_source_t *fs, nxt_buf_t *b);
static nxt_int_t nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs);
static nxt_int_t nxt_fastcgi_source_header_process(nxt_task_t *task,
nxt_fastcgi_source_t *fs);
static nxt_int_t nxt_fastcgi_source_status(nxt_upstream_source_t *us,
nxt_name_value_t *nv);
static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
@@ -66,11 +67,12 @@ static nxt_int_t nxt_fastcgi_source_content_length(nxt_upstream_source_t *us,
static void nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs,
nxt_buf_t *b);
static void nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj,
static void nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj,
void *data);
static nxt_buf_t *nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp);
static void nxt_fastcgi_source_error(nxt_stream_source_t *stream);
static void nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs);
static void nxt_fastcgi_source_error(nxt_task_t *task,
nxt_stream_source_t *stream);
static void nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs);
/*
@@ -117,7 +119,7 @@ static const uint8_t nxt_fastcgi_stdin_record[] = {
void
nxt_fastcgi_source_handler(nxt_upstream_source_t *us,
nxt_fastcgi_source_handler(nxt_task_t *task, nxt_upstream_source_t *us,
nxt_fastcgi_source_request_create_t request_create)
{
nxt_stream_source_t *stream;
@@ -181,13 +183,13 @@ nxt_fastcgi_source_handler(nxt_upstream_source_t *us,
nxt_memzero(&fs->u.header, sizeof(nxt_http_split_header_parse_t));
fs->u.header.mem_pool = fs->upstream->buffers.mem_pool;
nxt_stream_source_connect(stream);
nxt_stream_source_connect(task, stream);
return;
}
fail:
nxt_fastcgi_source_fail(fs);
nxt_fastcgi_source_fail(task, fs);
}
@@ -383,7 +385,7 @@ nxt_fastcgi_next_param(nxt_fastcgi_source_t *fs, nxt_fastcgi_param_t *param)
static void
nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
nxt_fastcgi_source_record_filter(nxt_task_t *task, void *obj, void *data)
{
size_t size;
u_char *p;
@@ -394,18 +396,18 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
fsr = obj;
in = data;
nxt_log_debug(thr->log, "fastcgi source record filter");
nxt_debug(task, "fastcgi source record filter");
if (nxt_slow_path(fsr->parse.done)) {
return;
}
nxt_fastcgi_record_parse(&fsr->parse, in);
nxt_fastcgi_record_parse(task, &fsr->parse, in);
fs = nxt_container_of(fsr, nxt_fastcgi_source_t, record);
if (fsr->parse.error) {
nxt_fastcgi_source_fail(fs);
nxt_fastcgi_source_fail(task, fs);
return;
}
@@ -413,8 +415,9 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
/*
* Output all parsed before a FastCGI record error and close upstream.
*/
nxt_thread_current_work_queue_add(thr, nxt_fastcgi_source_record_error,
fs, NULL, thr->log);
nxt_thread_current_work_queue_add(task->thread,
nxt_fastcgi_source_record_error,
task, fs, NULL);
}
/* Log FastCGI stderr output. */
@@ -430,36 +433,36 @@ nxt_fastcgi_source_record_filter(nxt_thread_t *thr, void *obj, void *data)
size = (p + 1) - b->mem.pos;
if (size != 0) {
nxt_log_error(NXT_LOG_ERR, thr->log,
"upstream sent in FastCGI stderr: \"%*s\"",
size, b->mem.pos);
nxt_log(task, NXT_LOG_ERR,
"upstream sent in FastCGI stderr: \"%*s\"",
size, b->mem.pos);
}
b->completion_handler(thr, b, b->parent);
b->completion_handler(task, b, b->parent);
}
/* Process FastCGI stdout output. */
if (fsr->parse.out[0] != NULL) {
nxt_source_filter(thr, fs->upstream->work_queue, &fsr->next,
fsr->parse.out[0]);
nxt_source_filter(task->thread, fs->upstream->work_queue, task,
&fsr->next, fsr->parse.out[0]);
}
}
static void
nxt_fastcgi_source_record_error(nxt_thread_t *thr, void *obj, void *data)
nxt_fastcgi_source_record_error(nxt_task_t *task, void *obj, void *data)
{
nxt_fastcgi_source_t *fs;
fs = obj;
nxt_fastcgi_source_fail(fs);
nxt_fastcgi_source_fail(task, fs);
}
static void
nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
nxt_fastcgi_source_header_filter(nxt_task_t *task, void *obj, void *data)
{
nxt_int_t ret;
nxt_buf_t *b;
@@ -469,10 +472,10 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
b = data;
do {
nxt_log_debug(thr->log, "fastcgi source header filter");
nxt_debug(task, "fastcgi source header filter");
if (nxt_slow_path(nxt_buf_is_sync(b))) {
nxt_fastcgi_source_sync_buffer(thr, fs, b);
nxt_fastcgi_source_sync_buffer(task, fs, b);
return;
}
@@ -483,7 +486,7 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
break;
}
ret = nxt_fastcgi_source_header_process(fs);
ret = nxt_fastcgi_source_header_process(task, fs);
if (nxt_slow_path(ret != NXT_OK)) {
break;
@@ -491,7 +494,7 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
}
if (nxt_fast_path(ret == NXT_DONE)) {
nxt_log_debug(thr->log, "fastcgi source header done");
nxt_debug(task, "fastcgi source header done");
nxt_fastcgi_source_header_ready(fs, b);
return;
}
@@ -500,16 +503,16 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
if (ret != NXT_ERROR) {
/* n == NXT_DECLINED: "\r" is not followed by "\n" */
nxt_log_error(NXT_LOG_ERR, thr->log,
"upstream sent invalid header line: \"%*s\\r...\"",
fs->u.header.parse.header_end
- fs->u.header.parse.header_name_start,
fs->u.header.parse.header_name_start);
nxt_log(task, NXT_LOG_ERR,
"upstream sent invalid header line: \"%*s\\r...\"",
fs->u.header.parse.header_end
- fs->u.header.parse.header_name_start,
fs->u.header.parse.header_name_start);
}
/* ret == NXT_ERROR */
nxt_fastcgi_source_fail(fs);
nxt_fastcgi_source_fail(task, fs);
return;
}
@@ -520,45 +523,41 @@ nxt_fastcgi_source_header_filter(nxt_thread_t *thr, void *obj, void *data)
static void
nxt_fastcgi_source_sync_buffer(nxt_thread_t *thr, nxt_fastcgi_source_t *fs,
nxt_fastcgi_source_sync_buffer(nxt_task_t *task, nxt_fastcgi_source_t *fs,
nxt_buf_t *b)
{
if (nxt_buf_is_last(b)) {
nxt_log_error(NXT_LOG_ERR, thr->log,
"upstream closed prematurely connection");
nxt_log(task, NXT_LOG_ERR, "upstream closed prematurely connection");
} else {
nxt_log_error(NXT_LOG_ERR, thr->log, "%ui buffers %uz each are not "
"enough to process upstream response header",
fs->upstream->buffers.max,
fs->upstream->buffers.size);
nxt_log(task, NXT_LOG_ERR, "%ui buffers %uz each are not "
"enough to process upstream response header",
fs->upstream->buffers.max, fs->upstream->buffers.size);
}
/* The stream source sends only the last and the nobuf sync buffer. */
nxt_fastcgi_source_fail(fs);
nxt_fastcgi_source_fail(task, fs);
}
static nxt_int_t
nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs)
nxt_fastcgi_source_header_process(nxt_task_t *task, nxt_fastcgi_source_t *fs)
{
size_t len;
nxt_thread_t *thr;
nxt_name_value_t *nv;
nxt_lvlhsh_query_t lhq;
nxt_http_header_parse_t *hp;
nxt_upstream_name_value_t *unv;
thr = nxt_thread();
hp = &fs->u.header.parse;
len = hp->header_name_end - hp->header_name_start;
if (len > 255) {
nxt_log_error(NXT_LOG_INFO, thr->log,
"upstream sent too long header field name: \"%*s\"",
len, hp->header_name_start);
nxt_log(task, NXT_LOG_INFO,
"upstream sent too long header field name: \"%*s\"",
len, hp->header_name_start);
return NXT_ERROR;
}
@@ -574,8 +573,8 @@ nxt_fastcgi_source_header_process(nxt_fastcgi_source_t *fs)
nv->value_len = hp->header_end - hp->header_start;
nv->value_start = hp->header_start;
nxt_log_debug(thr->log, "http header: \"%*s: %*s\"",
nv->name_len, nv->name_start, nv->value_len, nv->value_start);
nxt_debug(task, "http header: \"%*s: %*s\"",
nv->name_len, nv->name_start, nv->value_len, nv->value_start);
lhq.key_hash = nv->hash;
lhq.key.len = nv->name_len;
@@ -682,7 +681,7 @@ nxt_fastcgi_source_header_ready(nxt_fastcgi_source_t *fs, nxt_buf_t *b)
*/
static void
nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
nxt_fastcgi_source_body_filter(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b, *in;
nxt_fastcgi_source_t *fs;
@@ -690,7 +689,7 @@ nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
fs = obj;
in = data;
nxt_log_debug(thr->log, "fastcgi source body filter");
nxt_debug(task, "fastcgi source body filter");
for (b = in; b != NULL; b = b->next) {
@@ -701,7 +700,8 @@ nxt_fastcgi_source_body_filter(nxt_thread_t *thr, void *obj, void *data)
}
if (fs->next != NULL) {
nxt_source_filter(thr, fs->upstream->work_queue, fs->next, in);
nxt_source_filter(task->thread, fs->upstream->work_queue, task,
fs->next, in);
return;
}
@@ -729,7 +729,7 @@ nxt_fastcgi_source_last_buf(nxt_fastcgi_parse_t *fp)
static void
nxt_fastcgi_source_error(nxt_stream_source_t *stream)
nxt_fastcgi_source_error(nxt_task_t *task, nxt_stream_source_t *stream)
{
nxt_fastcgi_source_t *fs;
@@ -737,20 +737,16 @@ nxt_fastcgi_source_error(nxt_stream_source_t *stream)
fs = stream->upstream->protocol_source;
nxt_fastcgi_source_fail(fs);
nxt_fastcgi_source_fail(task, fs);
}
static void
nxt_fastcgi_source_fail(nxt_fastcgi_source_t *fs)
nxt_fastcgi_source_fail(nxt_task_t *task, nxt_fastcgi_source_t *fs)
{
nxt_thread_t *thr;
thr = nxt_thread();
nxt_log_debug(thr->log, "fastcgi source fail");
nxt_debug(task, "fastcgi source fail");
/* TODO: fail, next upstream, or bad gateway */
fs->upstream->state->error_handler(thr, fs, NULL);
fs->upstream->state->error_handler(task, fs, NULL);
}