Work queues refactoring.

This commit is contained in:
Igor Sysoev
2017-01-27 11:35:11 +03:00
parent 6886b83c1f
commit ba0391577b
35 changed files with 408 additions and 771 deletions

View File

@@ -105,7 +105,6 @@ NXT_LIB_SRCS=" \
src/nxt_list.c \
src/nxt_buf.c \
src/nxt_buf_pool.c \
src/nxt_buf_filter.c \
src/nxt_recvbuf.c \
src/nxt_sendbuf.c \
src/nxt_thread_time.c \
@@ -125,10 +124,14 @@ NXT_LIB_SRCS=" \
src/nxt_event_conn_job_sendfile.c \
src/nxt_event_conn_proxy.c \
src/nxt_job.c \
src/nxt_job_file.c \
src/nxt_job_resolve.c \
src/nxt_sockaddr.c \
src/nxt_listen_socket.c \
"
NXT_LIB_SRC0=" \
src/nxt_buf_filter.c \
src/nxt_job_file.c \
src/nxt_stream_source.c \
src/nxt_upstream_source.c \
src/nxt_http_source.c \

View File

@@ -19,6 +19,7 @@ static nxt_app_request_t *nxt_app_request_create(nxt_socket_t s,
static void nxt_app_conn_update(nxt_thread_t *thr, nxt_event_conn_t *c,
nxt_log_t *log);
static nxt_int_t nxt_app_write_finish(nxt_app_request_t *r);
static void nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out);
static void nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data);
static void nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data);
@@ -29,7 +30,7 @@ static void nxt_app_delivery_timeout(nxt_task_t *task, void *obj, void *data);
static nxt_msec_t nxt_app_delivery_timer_value(nxt_event_conn_t *c,
uintptr_t data);
static void nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c);
static void nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r);
static void nxt_app_close_request(nxt_task_t *task, void *obj, void *data);
typedef struct nxt_app_http_parse_state_s nxt_app_http_parse_state_t;
@@ -40,6 +41,13 @@ struct nxt_app_http_parse_state_s {
u_char *end, nxt_app_http_parse_state_t *state);
};
typedef struct {
nxt_work_t work;
nxt_buf_t buf;
} nxt_app_buf_t;
static nxt_int_t nxt_app_http_parse_request(nxt_app_request_t *r, u_char *buf,
size_t size);
static nxt_int_t nxt_app_http_parse_request_line(nxt_app_request_header_t *h,
@@ -83,13 +91,11 @@ nxt_app_start(nxt_cycle_t *cycle)
return NXT_ERROR;
}
link = nxt_malloc(sizeof(nxt_thread_link_t));
link = nxt_zalloc(sizeof(nxt_thread_link_t));
if (nxt_fast_path(link != NULL)) {
link->start = nxt_app_thread;
link->data = cycle;
link->engine = NULL;
link->exit = NULL;
return nxt_thread_create(&handle, link);
}
@@ -136,8 +142,12 @@ nxt_app_thread(void *ctx)
ls = cycle->listen_sockets->elts;
for ( ;; ) {
nxt_log_debug(thr->log, "wait on accept");
s = accept(ls->socket, NULL, NULL);
nxt_thread_time_update(thr);
if (nxt_slow_path(s == -1)) {
err = nxt_socket_errno;
@@ -190,6 +200,8 @@ nxt_app_thread(void *ctx)
nxt_app->run(r);
nxt_log_debug(thr->log, "app request done");
if (nxt_slow_path(nxt_app_write_finish(r) == NXT_ERROR)) {
goto fail;
}
@@ -577,11 +589,12 @@ nxt_app_http_read_body(nxt_app_request_t *r, u_char *data, size_t len)
nxt_int_t
nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
{
void *start;
size_t free;
nxt_err_t err;
nxt_buf_t *b, *out, **next;
nxt_uint_t bufs;
nxt_event_conn_t *c;
nxt_app_buf_t *ab;
out = NULL;
next = &out;
@@ -619,10 +632,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
if (bufs == nxt_app_buf_max_number) {
bufs = 0;
*next = NULL;
c = r->event_conn;
nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
&c->task, c, out, &nxt_main_log);
nxt_app_buf_send(r->event_conn, out);
out = NULL;
next = &out;
@@ -658,11 +669,20 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
(void) nxt_thread_mutex_unlock(&nxt_app_mutex);
if (b == NULL) {
b = nxt_buf_mem_alloc(nxt_app_mem_pool, 4096, 0);
if (nxt_slow_path(b == NULL)) {
start = nxt_malloc(4096);
if (nxt_slow_path(start == NULL)) {
return NXT_ERROR;
}
ab = nxt_zalloc(sizeof(nxt_app_buf_t));
if (nxt_slow_path(ab == NULL)) {
return NXT_ERROR;
}
b = &ab->buf;
nxt_buf_mem_init(b, start, 4096);
b->completion_handler = nxt_app_buf_completion;
nxt_app_buf_current_number++;
@@ -675,10 +695,8 @@ nxt_app_write(nxt_app_request_t *r, const u_char *data, size_t len)
if (out != NULL) {
*next = NULL;
c = r->event_conn;
nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
&c->task, c, out, &nxt_main_log);
nxt_app_buf_send(r->event_conn, out);
}
return NXT_OK;
@@ -689,7 +707,6 @@ static nxt_int_t
nxt_app_write_finish(nxt_app_request_t *r)
{
nxt_buf_t *b, *out;
nxt_event_conn_t *c;
b = nxt_buf_sync_alloc(r->mem_pool, NXT_BUF_SYNC_LAST);
if (nxt_slow_path(b == NULL)) {
@@ -709,15 +726,25 @@ nxt_app_write_finish(nxt_app_request_t *r)
out = b;
}
c = r->event_conn;
nxt_event_engine_post(nxt_app_engine, nxt_app_delivery_handler,
&c->task, c, out, &nxt_main_log);
nxt_app_buf_send(r->event_conn, out);
return NXT_OK;
}
static void
nxt_app_buf_send(nxt_event_conn_t *c, nxt_buf_t *out)
{
nxt_app_buf_t *ab;
ab = nxt_container_of(out, nxt_app_buf_t, buf);
nxt_work_set(&ab->work, nxt_app_delivery_handler, &c->task, c, out);
nxt_event_engine_post(nxt_app_engine, &ab->work);
}
static void
nxt_app_buf_completion(nxt_task_t *task, void *obj, void *data)
{
@@ -762,8 +789,8 @@ nxt_app_delivery_handler(nxt_task_t *task, void *obj, void *data)
if (c->socket.timedout || c->socket.error != 0) {
nxt_buf_chain_add(&nxt_app_buf_done, b);
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
nxt_work_queue_add(c->write_work_queue, nxt_app_delivery_completion,
task, c, NULL);
return;
}
@@ -799,7 +826,7 @@ nxt_app_delivery_ready(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "app delivery ready");
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_work_queue_add(c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
}
@@ -808,11 +835,8 @@ static void
nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b, *bn, *free;
nxt_thread_t *thread;
nxt_app_request_t *r;
thread = task->thread;
nxt_debug(task, "app delivery completion");
free = NULL;
@@ -832,7 +856,9 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
if (nxt_buf_is_last(b)) {
r = (nxt_app_request_t *) b->parent;
nxt_app_close_request(task, r);
nxt_work_queue_add(&task->thread->engine->final_work_queue,
nxt_app_close_request, task, r, NULL);
}
}
@@ -850,7 +876,7 @@ nxt_app_delivery_completion(nxt_task_t *task, void *obj, void *data)
(void) nxt_thread_mutex_unlock(&nxt_app_mutex);
nxt_thread_time_update(thread);
nxt_thread_time_update(task->thread);
(void) nxt_thread_cond_signal(&nxt_app_cond);
}
@@ -903,20 +929,22 @@ nxt_app_delivery_done(nxt_task_t *task, nxt_event_conn_t *c)
c->write = NULL;
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_work_queue_add(c->write_work_queue,
nxt_app_delivery_completion, task, c, NULL);
}
static void
nxt_app_close_request(nxt_task_t *task, nxt_app_request_t *r)
nxt_app_close_request(nxt_task_t *task, void *obj, void *data)
{
nxt_event_conn_t *c;
nxt_app_request_t *r;
r = obj;
c = r->event_conn;
nxt_debug(task, "app close connection");
c = r->event_conn;
nxt_event_conn_close(task, c);
nxt_mem_pool_destroy(c->mem_pool);

View File

@@ -10,6 +10,18 @@
static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data);
void
nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size)
{
b->size = NXT_BUF_MEM_SIZE;
b->mem.start = start;
b->mem.pos = start;
b->mem.free = start;
b->mem.end = start + size;
}
nxt_buf_t *
nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size, nxt_uint_t flags)
{

View File

@@ -226,6 +226,7 @@ nxt_buf_used_size(b) \
nxt_buf_mem_used_size(&(b)->mem))
NXT_EXPORT void nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size);
NXT_EXPORT nxt_buf_t *nxt_buf_mem_alloc(nxt_mem_pool_t *mp, size_t size,
nxt_uint_t flags);
NXT_EXPORT nxt_buf_t *nxt_buf_file_alloc(nxt_mem_pool_t *mp, size_t size,

View File

@@ -141,7 +141,7 @@ nxt_chan_write_enable(nxt_task_t *task, nxt_chan_t *chan)
chan->socket.task = &chan->task;
chan->socket.write_work_queue = &task->thread->work_queue.main;
chan->socket.write_work_queue = &task->thread->engine->fast_work_queue;
chan->socket.write_handler = nxt_chan_write_handler;
chan->socket.error_handler = nxt_chan_error_handler;
}
@@ -290,9 +290,8 @@ nxt_chan_write_handler(nxt_task_t *task, void *obj, void *data)
fail:
nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
nxt_chan_error_handler, task, &chan->socket,
NULL);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_chan_error_handler, task, &chan->socket, NULL);
}
@@ -308,7 +307,7 @@ nxt_chan_read_enable(nxt_task_t *task, nxt_chan_t *chan)
chan->socket.task = &chan->task;
chan->socket.read_work_queue = &task->thread->work_queue.main;
chan->socket.read_work_queue = &task->thread->engine->fast_work_queue;
chan->socket.read_handler = nxt_chan_read_handler;
chan->socket.error_handler = nxt_chan_error_handler;
@@ -378,9 +377,8 @@ nxt_chan_read_handler(nxt_task_t *task, void *obj, void *data)
/* n == 0 || n == NXT_ERROR */
nxt_thread_work_queue_add(task->thread, &task->thread->work_queue.main,
nxt_chan_error_handler, task,
&chan->socket, NULL);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_chan_error_handler, task, &chan->socket, NULL);
return;
}
}

View File

@@ -153,7 +153,7 @@ nxt_cycle_create(nxt_thread_t *thr, nxt_task_t *task, nxt_cycle_t *previous,
nxt_log_debug(thr->log, "new cycle: %p", cycle);
nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_start,
nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_start,
task, cycle, NULL);
return NXT_OK;
@@ -583,7 +583,7 @@ nxt_cycle_quit(nxt_task_t *task, nxt_cycle_t *cycle)
nxt_cycle_close_idle_connections(thr, task);
if (done) {
nxt_thread_work_queue_add(thr, &thr->work_queue.main, nxt_cycle_exit,
nxt_work_queue_add(&thr->engine->fast_work_queue, nxt_cycle_exit,
task, cycle, NULL);
}
}

View File

@@ -648,8 +648,7 @@ nxt_epoll_commit_changes(nxt_task_t *task, nxt_epoll_event_set_t *es)
nxt_log(ev->task, NXT_LOG_CRIT, "epoll_ctl(%d, %d, %d) failed %E",
es->epoll, ch->op, ch->fd, nxt_errno);
nxt_thread_work_queue_add(task->thread,
&task->thread->work_queue.main,
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_epoll_error_handler,
ev->task, ev, ev->data);
@@ -719,12 +718,12 @@ nxt_epoll_add_signal(nxt_epoll_event_set_t *es, nxt_event_signals_t *signals)
nxt_main_log_debug("signalfd(): %d", fd);
thr = nxt_thread();
es->signalfd.data = signals->handler;
es->signalfd.read_work_queue = nxt_thread_main_work_queue();
es->signalfd.read_work_queue = &thr->engine->fast_work_queue;
es->signalfd.read_handler = nxt_epoll_signalfd_handler;
es->signalfd.log = &nxt_main_log;
thr = nxt_thread();
es->signalfd.task = &thr->engine->task;
ee.events = EPOLLIN;
@@ -805,12 +804,12 @@ nxt_epoll_enable_post(nxt_event_set_t *event_set, nxt_work_handler_t handler)
nxt_main_log_debug("eventfd(): %d", es->eventfd.fd);
es->eventfd.read_work_queue = nxt_thread_main_work_queue();
thr = nxt_thread();
es->eventfd.read_work_queue = &thr->engine->fast_work_queue;
es->eventfd.read_handler = nxt_epoll_eventfd_handler;
es->eventfd.data = es;
es->eventfd.log = &nxt_main_log;
thr = nxt_thread();
es->eventfd.task = &thr->engine->task;
ee.events = EPOLLIN | EPOLLET;
@@ -960,8 +959,7 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
error = 0;
nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
ev->read_handler,
nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
ev->task, ev, ev->data);
} else if (event_set->epoll.mode == 0) {
@@ -981,8 +979,7 @@ nxt_epoll_poll(nxt_task_t *task, nxt_event_set_t *event_set,
error = 0;
nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
ev->write_handler,
nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
ev->task, ev, ev->data);
} else if (event_set->epoll.mode == 0) {

View File

@@ -85,8 +85,8 @@ nxt_event_conn_create(nxt_mem_pool_t *mp, nxt_log_t *log)
c->max_chunk = NXT_INT32_T_MAX;
c->sendfile = NXT_CONN_SENDFILE_UNSET;
c->socket.read_work_queue = &thr->work_queue.main;
c->socket.write_work_queue = &thr->work_queue.main;
c->socket.read_work_queue = &thr->engine->fast_work_queue;
c->socket.write_work_queue = &thr->engine->fast_work_queue;
nxt_event_conn_timer_init(&c->read_timer, c, c->socket.read_work_queue);
nxt_event_conn_timer_init(&c->write_timer, c, c->socket.write_work_queue);
@@ -143,10 +143,6 @@ nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c)
thr = task->thread;
nxt_thread_work_queue_drop(thr, c);
nxt_thread_work_queue_drop(thr, &c->read_timer);
nxt_thread_work_queue_drop(thr, &c->write_timer);
engine = thr->engine;
nxt_event_timer_delete(engine, &c->read_timer);
@@ -168,7 +164,7 @@ nxt_event_conn_close(nxt_task_t *task, nxt_event_conn_t *c)
handler = nxt_event_conn_shutdown_socket;
}
nxt_thread_work_queue_add(thr, wq, handler, task,
nxt_work_queue_add(wq, handler, task,
(void *) (uintptr_t) c->socket.fd, NULL);
} else {
@@ -188,8 +184,7 @@ nxt_event_conn_shutdown_socket(nxt_task_t *task, void *obj, void *data)
nxt_socket_shutdown(s, SHUT_RDWR);
nxt_thread_work_queue_add(task->thread,
&task->thread->engine->close_work_queue,
nxt_work_queue_add(&task->thread->engine->close_work_queue,
nxt_event_conn_close_socket, task,
(void *) (uintptr_t) s, NULL);
}

View File

@@ -192,7 +192,7 @@ typedef struct {
nxt_event_conn_io_handle(thr, wq, handler, task, c, data) \
do { \
if (thr->engine->batch != 0) { \
nxt_thread_work_queue_add(thr, wq, handler, task, c, data); \
nxt_work_queue_add(wq, handler, task, c, data); \
\
} else { \
handler(task, c, data); \
@@ -301,9 +301,8 @@ NXT_EXPORT void nxt_event_conn_job_sendfile(nxt_task_t *task,
#define \
nxt_event_conn_connect_enqueue(thr, task, c) \
nxt_thread_work_queue_add(thr, &thr->engine->socket_work_queue, \
nxt_event_conn_batch_socket, \
task, c, c->socket.data)
nxt_work_queue_add(&thr->engine->socket_work_queue, \
nxt_event_conn_batch_socket, task, c, c->socket.data)
#define \
@@ -311,7 +310,7 @@ nxt_event_conn_read_enqueue(thr, task, c) \
do { \
c->socket.read_work_queue = &thr->engine->read_work_queue; \
\
nxt_thread_work_queue_add(thr, &thr->engine->read_work_queue, \
nxt_work_queue_add(&thr->engine->read_work_queue, \
c->io->read, task, c, c->socket.data); \
} while (0)
@@ -321,7 +320,7 @@ nxt_event_conn_write_enqueue(thr, task, c) \
do { \
c->socket.write_work_queue = &thr->engine->write_work_queue; \
\
nxt_thread_work_queue_add(thr, &thr->engine->write_work_queue, \
nxt_work_queue_add(&thr->engine->write_work_queue, \
c->io->write, task, c, c->socket.data); \
} while (0)

View File

@@ -50,7 +50,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->socket.read_work_queue = &engine->accept_work_queue;
} else {
cls->socket.read_work_queue = &task->thread->work_queue.main;
cls->socket.read_work_queue = &engine->fast_work_queue;
cls->batch = 1;
}
@@ -62,7 +62,7 @@ nxt_event_conn_listen(nxt_task_t *task, nxt_listen_socket_t *ls)
cls->listen = ls;
cls->timer.work_queue = &task->thread->work_queue.main;
cls->timer.work_queue = &engine->fast_work_queue;
cls->timer.handler = nxt_event_conn_listen_timer_handler;
cls->timer.log = &nxt_main_log;
@@ -221,14 +221,14 @@ nxt_event_conn_accept(nxt_task_t *task, nxt_event_conn_listen_t *cls,
c->listen->handler(task, c, NULL);
} else {
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
c->listen->handler, task, c, NULL);
nxt_work_queue_add(c->write_work_queue, c->listen->handler,
task, c, NULL);
}
next = nxt_event_conn_accept_next(task, cls);
if (next != NULL && cls->socket.read_ready) {
nxt_thread_work_queue_add(task->thread, cls->socket.read_work_queue,
nxt_work_queue_add(cls->socket.read_work_queue,
cls->accept, task, cls, next);
}
}

View File

@@ -17,7 +17,7 @@ nxt_event_conn_connect(nxt_task_t *task, nxt_event_conn_t *c)
engine = task->thread->engine;
if (engine->batch != 0) {
nxt_thread_work_queue_add(task->thread, &engine->socket_work_queue,
nxt_work_queue_add(&engine->socket_work_queue,
nxt_event_conn_batch_socket, task, c, data);
return;
}
@@ -47,8 +47,7 @@ nxt_event_conn_batch_socket(nxt_task_t *task, void *obj, void *data)
handler = c->write_state->error_handler;
}
nxt_thread_work_queue_add(task->thread,
&task->thread->engine->connect_work_queue,
nxt_work_queue_add(&task->thread->engine->connect_work_queue,
handler, task, c, data);
}

View File

@@ -157,9 +157,10 @@ done:
fast:
nxt_thread_pool_post(task->thread->thread_pool,
nxt_event_conn_job_sendfile_handler,
&jbs->job.task, jbs, c);
nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
jbs->job.task, jbs, c);
nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
}
@@ -257,7 +258,7 @@ nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_event_conn_t *c,
break;
}
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
nxt_work_queue_add(c->write_work_queue,
b->completion_handler, task, b, b->parent);
b = b->next;

View File

@@ -456,7 +456,7 @@ nxt_event_conn_proxy_read_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
}
if (rb->mem.start != rb->mem.end) {
nxt_thread_work_queue_push(task->thread, source->read_work_queue,
nxt_work_queue_add(source->read_work_queue,
nxt_event_conn_proxy_read,
task, source, source->socket.data);
break;
@@ -665,7 +665,7 @@ nxt_event_conn_proxy_write_process(nxt_task_t *task, nxt_event_conn_proxy_t *p,
nxt_buf_free(sink->mem_pool, wb);
}
nxt_thread_work_queue_push(task->thread, source->read_work_queue,
nxt_work_queue_add(source->read_work_queue,
nxt_event_conn_proxy_read, task, source,
source->socket.data);
}
@@ -1008,8 +1008,6 @@ nxt_event_conn_proxy_complete(nxt_task_t *task, nxt_event_conn_proxy_t *p)
nxt_event_conn_close(task, p->peer);
} else if (p->delayed) {
nxt_thread_work_queue_drop(task->thread, &p->peer->write_timer);
nxt_queue_remove(&p->peer->link);
nxt_event_timer_delete(task->thread->engine, &p->peer->write_timer);
}

View File

@@ -20,8 +20,7 @@ nxt_event_conn_read(nxt_task_t *task, nxt_event_conn_t *c)
wq = &task->thread->engine->read_work_queue;
c->socket.read_work_queue = wq;
nxt_thread_work_queue_add(task->thread, wq, handler, task, c,
c->socket.data);
nxt_work_queue_add( wq, handler, task, c, c->socket.data);
return;
}
@@ -134,8 +133,8 @@ ready:
done:
if (batch) {
nxt_thread_work_queue_add(task->thread, c->read_work_queue, handler,
task, c, data);
nxt_work_queue_add(c->read_work_queue, handler, task, c, data);
} else {
handler(task, c, data);
}

View File

@@ -20,8 +20,8 @@ static void nxt_event_engine_signal_pipe_error(nxt_task_t *task, void *obj,
void *data);
static void nxt_event_engine_signal_handler(nxt_task_t *task, void *obj,
void *data);
static const nxt_event_sig_t *nxt_event_engine_signal_find(nxt_task_t *task,
nxt_uint_t signo);
static nxt_work_handler_t nxt_event_engine_queue_pop(nxt_event_engine_t *engine,
nxt_task_t **task, void **obj, void **data);
nxt_event_engine_t *
@@ -52,8 +52,21 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
}
}
nxt_thread_work_queue_create(thr, 0);
engine->current_work_queue = &engine->fast_work_queue;
nxt_work_queue_cache_create(&engine->work_queue_cache, 0);
engine->fast_work_queue.cache = &engine->work_queue_cache;
engine->accept_work_queue.cache = &engine->work_queue_cache;
engine->read_work_queue.cache = &engine->work_queue_cache;
engine->socket_work_queue.cache = &engine->work_queue_cache;
engine->connect_work_queue.cache = &engine->work_queue_cache;
engine->write_work_queue.cache = &engine->work_queue_cache;
engine->shutdown_work_queue.cache = &engine->work_queue_cache;
engine->close_work_queue.cache = &engine->work_queue_cache;
engine->final_work_queue.cache = &engine->work_queue_cache;
nxt_work_queue_name(&engine->fast_work_queue, "fast");
nxt_work_queue_name(&engine->accept_work_queue, "accept");
nxt_work_queue_name(&engine->read_work_queue, "read");
nxt_work_queue_name(&engine->socket_work_queue, "socket");
@@ -61,12 +74,7 @@ nxt_event_engine_create(nxt_thread_t *thr, const nxt_event_set_ops_t *event_set,
nxt_work_queue_name(&engine->write_work_queue, "write");
nxt_work_queue_name(&engine->shutdown_work_queue, "shutdown");
nxt_work_queue_name(&engine->close_work_queue, "close");
#if (NXT_THREADS)
nxt_locked_work_queue_create(&engine->work_queue, 7);
#endif
nxt_work_queue_name(&engine->final_work_queue, "final");
if (signals != NULL) {
engine->signals = nxt_event_engine_signals(signals);
@@ -134,7 +142,7 @@ event_set_fail:
signals_fail:
nxt_free(engine->signals);
nxt_thread_work_queue_destroy(thr);
nxt_work_queue_cache_destroy(&engine->work_queue_cache);
nxt_free(engine->fibers);
fibers_fail:
@@ -193,9 +201,9 @@ nxt_event_engine_signal_pipe_create(nxt_event_engine_t *engine)
}
pipe->event.fd = pipe->fds[0];
pipe->event.read_work_queue = &engine->task.thread->work_queue.main;
pipe->event.read_work_queue = &engine->fast_work_queue;
pipe->event.read_handler = nxt_event_engine_signal_pipe;
pipe->event.write_work_queue = &engine->task.thread->work_queue.main;
pipe->event.write_work_queue = &engine->fast_work_queue;
pipe->event.error_handler = nxt_event_engine_signal_pipe_error;
pipe->event.log = &nxt_main_log;
@@ -237,12 +245,11 @@ nxt_event_engine_signal_pipe_close(nxt_task_t *task, void *obj, void *data)
void
nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_handler_t handler,
nxt_task_t *task, void *obj, void *data, nxt_log_t *log)
nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
{
nxt_thread_log_debug("event engine post");
nxt_locked_work_queue_add(&engine->work_queue, handler, task, obj, data);
nxt_locked_work_queue_add(&engine->locked_work_queue, work);
nxt_event_engine_signal(engine, 0);
}
@@ -277,7 +284,6 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
u_char signo;
nxt_bool_t post;
nxt_event_fd_t *ev;
const nxt_event_sig_t *sigev;
u_char buf[128];
ev = obj;
@@ -299,12 +305,8 @@ nxt_event_engine_signal_pipe(nxt_task_t *task, void *obj, void *data)
post = 1;
} else {
sigev = nxt_event_engine_signal_find(task, signo);
if (nxt_fast_path(sigev != NULL)) {
sigev->handler(task, (void *) (uintptr_t) signo,
(void *) sigev->name);
}
nxt_event_engine_signal_handler(task,
(void *) (uintptr_t) signo, NULL);
}
}
@@ -320,11 +322,13 @@ static void
nxt_event_engine_post_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_thread_t *thread;
nxt_event_engine_t *engine;
thread = task->thread;
engine = thread->engine;
nxt_locked_work_queue_move(thread, &thread->engine->work_queue,
&thread->work_queue.main);
nxt_locked_work_queue_move(thread, &engine->locked_work_queue,
&engine->fast_work_queue);
}
@@ -351,31 +355,17 @@ nxt_event_engine_signal_handler(nxt_task_t *task, void *obj, void *data)
signo = (uintptr_t) obj;
sigev = nxt_event_engine_signal_find(task, signo);
if (nxt_fast_path(sigev != NULL)) {
sigev->handler(task, (void *) (uintptr_t) signo, (void *) sigev->name);
}
}
static const nxt_event_sig_t *
nxt_event_engine_signal_find(nxt_task_t *task, nxt_uint_t signo)
{
const nxt_event_sig_t *sigev;
for (sigev = task->thread->engine->signals->sigev;
sigev->signo != 0;
sigev++)
{
if (signo == (nxt_uint_t) sigev->signo) {
return sigev;
sigev->handler(task, (void *) signo, (void *) sigev->name);
return;
}
}
nxt_log(task, NXT_LOG_CRIT, "signal %ui handler not found", signo);
return NULL;
}
@@ -397,7 +387,7 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
nxt_event_engine_signals_stop(engine);
/*
* Add to thread main work queue the signal events possibly
* Add to engine fast work queue the signal events possibly
* received before the blocking signal processing.
*/
nxt_event_engine_signal_pipe(task, &engine->pipe->event, NULL);
@@ -406,9 +396,9 @@ nxt_event_engine_change(nxt_thread_t *thr, nxt_task_t *task,
if (engine->pipe != NULL && event_set->enable_post != NULL) {
/*
* An engine pipe must be closed after all signal events
* added above to thread main work queue will be processed.
* added above to engine fast work queue will be processed.
*/
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_work_queue_add(&engine->final_work_queue,
nxt_event_engine_signal_pipe_close,
&engine->task, engine->pipe, NULL);
@@ -455,8 +445,7 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
nxt_event_engine_signal_pipe_free(engine);
nxt_free(engine->signals);
nxt_locked_work_queue_destroy(&engine->work_queue);
nxt_thread_work_queue_destroy(nxt_thread());
nxt_work_queue_cache_destroy(&engine->work_queue_cache);
engine->event->free(engine->event_set);
@@ -466,6 +455,35 @@ nxt_event_engine_free(nxt_event_engine_t *engine)
}
static nxt_work_handler_t
nxt_event_engine_queue_pop(nxt_event_engine_t *engine, nxt_task_t **task,
void **obj, void **data)
{
nxt_work_queue_t *wq;
wq = engine->current_work_queue;
if (wq->head == NULL) {
wq = &engine->fast_work_queue;
while (wq->head == NULL) {
engine->current_work_queue++;
wq = engine->current_work_queue;
if (wq > &engine->final_work_queue) {
engine->current_work_queue = &engine->fast_work_queue;
return NULL;
}
}
}
nxt_debug(&engine->task, "work queue: %s", wq->name);
return nxt_work_queue_pop(wq, task, obj, data);
}
void
nxt_event_engine_start(nxt_event_engine_t *engine)
{
@@ -487,40 +505,25 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
/* A return point from fibers. */
}
thr->log = &nxt_main_log;
for ( ;; ) {
for ( ;; ) {
handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
handler = nxt_event_engine_queue_pop(engine, &task, &obj, &data);
if (handler == NULL) {
break;
}
handler(task, obj, data);
thr->log = &nxt_main_log;
}
for ( ;; ) {
handler = nxt_thread_last_work_queue_pop(thr, &task, &obj, &data);
if (handler == NULL) {
break;
}
handler(task, obj, data);
thr->log = &nxt_main_log;
}
/* Attach some event engine work queues in preferred order. */
nxt_work_queue_attach(thr, &engine->accept_work_queue);
nxt_work_queue_attach(thr, &engine->read_work_queue);
timeout = nxt_event_timer_find(engine);
engine->event->poll(task, engine->event_set, timeout);
engine->event->poll(&engine->task, engine->event_set, timeout);
/*
* Look up expired timers only if a new zero timer has been

View File

@@ -33,6 +33,9 @@ struct nxt_event_engine_s {
*/
nxt_event_engine_pipe_t *pipe;
nxt_work_queue_cache_t work_queue_cache;
nxt_work_queue_t *current_work_queue;
nxt_work_queue_t fast_work_queue;
nxt_work_queue_t accept_work_queue;
nxt_work_queue_t read_work_queue;
nxt_work_queue_t socket_work_queue;
@@ -40,8 +43,9 @@ struct nxt_event_engine_s {
nxt_work_queue_t write_work_queue;
nxt_work_queue_t shutdown_work_queue;
nxt_work_queue_t close_work_queue;
nxt_work_queue_t final_work_queue;
nxt_locked_work_queue_t work_queue;
nxt_locked_work_queue_t locked_work_queue;
nxt_event_signals_t *signals;
@@ -68,8 +72,7 @@ NXT_EXPORT void nxt_event_engine_free(nxt_event_engine_t *engine);
NXT_EXPORT void nxt_event_engine_start(nxt_event_engine_t *engine);
NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data,
nxt_log_t *log);
nxt_work_t *work);
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
nxt_uint_t signo);
@@ -84,14 +87,4 @@ nxt_thread_event_engine(void)
}
nxt_inline nxt_work_queue_t *
nxt_thread_main_work_queue(void)
{
nxt_thread_t *thr;
thr = nxt_thread();
return &thr->work_queue.main;
}
#endif /* _NXT_EVENT_ENGINE_H_INCLUDED_ */

View File

@@ -313,8 +313,7 @@ nxt_event_timer_expire(nxt_thread_t *thr, nxt_msec_t now)
if (ev->state != NXT_EVENT_TIMER_DISABLED) {
ev->state = NXT_EVENT_TIMER_DISABLED;
nxt_thread_work_queue_add(thr, ev->work_queue, ev->handler,
ev->task, ev, NULL);
nxt_work_queue_add(ev->work_queue, ev->handler, ev->task, ev, NULL);
}
}
}

View File

@@ -16,7 +16,7 @@ static void nxt_fiber_timer_handler(nxt_task_t *task, void *obj, void *data);
#define \
nxt_fiber_enqueue(thr, task, fib) \
nxt_thread_work_queue_add(thr, &(thr)->work_queue.main, \
nxt_work_queue_add(&(thr)->engine->fast_work_queue, \
nxt_fiber_switch_handler, task, fib, NULL)
@@ -392,7 +392,7 @@ nxt_fiber_sleep(nxt_task_t *task, nxt_msec_t timeout)
fib = task->thread->fiber;
fib->timer.work_queue = &task->thread->work_queue.main;
fib->timer.work_queue = &task->thread->engine->fast_work_queue;
fib->timer.handler = nxt_fiber_timer_handler;
fib->timer.log = &nxt_main_log;

View File

@@ -44,8 +44,6 @@ nxt_job_create(nxt_mem_pool_t *mp, size_t size)
/* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
nxt_queue_self(&job->link);
job->task.ident = nxt_task_next_ident();
return job;
}
@@ -58,8 +56,6 @@ nxt_job_init(nxt_job_t *job, size_t size)
nxt_job_set_name(job, "job");
nxt_queue_self(&job->link);
job->task.ident = nxt_task_next_ident();
}
@@ -118,8 +114,11 @@ nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
job->engine = task->thread->engine;
ret = nxt_thread_pool_post(job->thread_pool, nxt_job_thread_trampoline,
&job->task, job, (void *) handler);
nxt_work_set(&job->work, nxt_job_thread_trampoline,
job->task, job, (void *) handler);
ret = nxt_thread_pool_post(job->thread_pool, &job->work);
if (ret == NXT_OK) {
return;
}
@@ -129,7 +128,7 @@ nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
#endif
handler(&job->task, job, job->data);
handler(job->task, job, job->data);
}
@@ -146,15 +145,13 @@ nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
job = obj;
handler = (nxt_work_handler_t) data;
job->task.log = job->log;
nxt_debug(task, "%s thread", job->name);
if (nxt_slow_path(job->cancel)) {
nxt_job_return(task, job, job->abort_handler);
} else {
handler(&job->task, job, job->data);
handler(job->task, job, job->data);
}
}
@@ -170,8 +167,12 @@ nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
if (job->engine != NULL) {
/* A return function is called in thread pool thread context. */
nxt_event_engine_post(job->engine, nxt_job_thread_return_handler,
&job->task, job, (void *) handler, job->log);
nxt_work_set(&job->work, nxt_job_thread_return_handler,
job->task, job, (void *) handler);
nxt_event_engine_post(job->engine, &job->work);
return;
}
@@ -182,8 +183,8 @@ nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
handler = job->abort_handler;
}
nxt_thread_work_queue_push(task->thread, &task->thread->work_queue.main,
handler, &job->task, job, job->data);
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
handler, job->task, job, job->data);
}
@@ -198,14 +199,14 @@ nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
job = obj;
handler = (nxt_work_handler_t) data;
job->task.thread = task->thread;
job->task->thread = task->thread;
if (nxt_slow_path(job->cancel)) {
nxt_debug(task, "%s cancellation", job->name);
handler = job->abort_handler;
}
handler(&job->task, job, job->data);
handler(job->task, job, job->data);
}
#endif

View File

@@ -33,7 +33,7 @@
typedef struct {
void *data;
nxt_task_t task;
nxt_task_t *task;
nxt_work_handler_t abort_handler;
@@ -49,6 +49,8 @@ typedef struct {
nxt_log_t *log;
#endif
nxt_work_t work;
#if (NXT_DEBUG)
const char *name;
#endif

View File

@@ -121,5 +121,5 @@ fail:
freeaddrinfo(res);
}
nxt_job_return(&jbr->job.task, &jbr->job, handler);
nxt_job_return(jbr->job.task, &jbr->job, handler);
}

View File

@@ -531,11 +531,13 @@ static void
nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
{
struct kevent *kev, *end;
nxt_thread_t *thr;
nxt_thread_t *thread;
nxt_event_fd_t *ev;
nxt_event_file_t *fev;
nxt_work_queue_t *wq;
thr = nxt_thread();
thread = nxt_thread();
wq = &thread->engine->fast_work_queue;
end = &ks->changes[ks->nchanges];
for (kev = ks->changes; kev < end; kev++) {
@@ -545,15 +547,13 @@ nxt_kqueue_error(nxt_kqueue_event_set_t *ks)
case EVFILT_READ:
case EVFILT_WRITE:
ev = nxt_kevent_get_udata(kev->udata);
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_kqueue_fd_error_handler,
nxt_work_queue_add(wq, nxt_kqueue_fd_error_handler,
ev->task, ev, ev->data);
break;
case EVFILT_VNODE:
fev = nxt_kevent_get_udata(kev->udata);
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_kqueue_file_error_handler,
nxt_work_queue_add(wq, nxt_kqueue_file_error_handler,
fev->task, fev, fev->data);
break;
}
@@ -768,7 +768,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
}
event_task = task;
wq = &task->thread->work_queue.main;
wq = &task->thread->engine->fast_work_queue;
handler = nxt_kqueue_fd_error_handler;
obj = nxt_kevent_get_udata(kev->udata);
@@ -871,8 +871,7 @@ nxt_kqueue_poll(nxt_task_t *task, nxt_event_set_t *event_set,
continue;
}
nxt_thread_work_queue_add(task->thread, wq, handler,
event_task, obj, data);
nxt_work_queue_add(wq, handler, event_task, obj, data);
}
}
@@ -938,8 +937,8 @@ nxt_kqueue_event_conn_connected(nxt_task_t *task, void *obj, void *data)
nxt_event_timer_disable(&c->write_timer);
}
nxt_thread_work_queue_add(task->thread, c->write_work_queue,
c->write_state->ready_handler, task, c, data);
nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
task, c, data);
}
@@ -1020,8 +1019,8 @@ nxt_kqueue_event_conn_io_read(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "kevent fd:%d eof", c->socket.fd);
c->socket.closed = 1;
nxt_thread_work_queue_add(task->thread, c->read_work_queue,
c->read_state->close_handler, task, c, data);
nxt_work_queue_add(c->read_work_queue, c->read_state->close_handler,
task, c, data);
return;
}

View File

@@ -123,7 +123,7 @@ nxt_log_debug(_log, ...) \
#else
#define nxt_log_debug(...)
#define nxt_debug(...)
#define \
nxt_log_debug(...)

View File

@@ -58,7 +58,7 @@ nxt_log_moderate_allow(nxt_log_moderation_t *mod)
nxt_thread_spin_unlock(&mod->lock);
if (timer) {
mod->timer.work_queue = &thr->work_queue.main;
mod->timer.work_queue = &thr->engine->fast_work_queue;
mod->timer.handler = nxt_log_moderate_timer_handler;
mod->timer.log = &nxt_main_log;

View File

@@ -228,7 +228,7 @@ nxt_master_process_new_cycle(nxt_task_t *task, nxt_cycle_t *cycle)
cycle->timer.log = &nxt_main_log;
nxt_event_timer_ident(&cycle->timer, -1);
cycle->timer.work_queue = &thr->work_queue.main;
cycle->timer.work_queue = &thr->engine->fast_work_queue;
nxt_event_timer_add(thr->engine, &cycle->timer, 500);

View File

@@ -397,8 +397,8 @@ nxt_poll_commit_changes(nxt_thread_t *thr, nxt_poll_event_set_t *ps)
break;
}
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
ev->error_handler, ev->task, ev, ev->data);
nxt_work_queue_add(&thr->engine->fast_work_queue, ev->error_handler,
ev->task, ev, ev->data);
ret = NXT_ERROR;
@@ -608,8 +608,7 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
/* Mark the poll entry to ignore it by the kernel. */
pfd->fd = -1;
nxt_thread_work_queue_add(task->thread,
&task->thread->work_queue.main,
nxt_work_queue_add(&ev->task->thread->engine->fast_work_queue,
ev->error_handler,
ev->task, ev, ev->data);
goto next;
@@ -653,8 +652,8 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
}
nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
ev->read_handler, ev->task, ev, ev->data);
nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
ev->task, ev, ev->data);
}
if ((events & POLLOUT) || (error && ev->write_handler != NULL)) {
@@ -665,8 +664,7 @@ nxt_poll_set_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_poll_change(event_set, ev, NXT_POLL_DELETE, 0);
}
nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
ev->write_handler,
nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
ev->task, ev, ev->data);
}

View File

@@ -141,9 +141,8 @@ nxt_select_enable_read(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
thr = nxt_thread();
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_select_error_handler,
ev->task, ev, ev->data);
nxt_work_queue_add(&thr->engine->fast_work_queue,
nxt_select_error_handler, ev->task, ev, ev->data);
return;
}
@@ -174,9 +173,8 @@ nxt_select_enable_write(nxt_event_set_t *event_set, nxt_event_fd_t *ev)
if (fd < 0 || fd >= (nxt_fd_t) FD_SETSIZE) {
thr = nxt_thread();
nxt_thread_work_queue_add(thr, &thr->work_queue.main,
nxt_select_error_handler,
ev->task, ev, ev->data);
nxt_work_queue_add(&thr->engine->fast_work_queue,
nxt_select_error_handler, ev->task, ev, ev->data);
return;
}
@@ -365,8 +363,8 @@ nxt_select_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_select_disable_read(event_set, ev);
}
nxt_thread_work_queue_add(task->thread, ev->read_work_queue,
ev->read_handler, ev->task, ev, ev->data);
nxt_work_queue_add(ev->read_work_queue, ev->read_handler,
ev->task, ev, ev->data);
found = 1;
}
@@ -382,8 +380,7 @@ nxt_select_poll(nxt_task_t *task, nxt_event_set_t *event_set,
nxt_select_disable_write(event_set, ev);
}
nxt_thread_work_queue_add(task->thread, ev->write_work_queue,
ev->write_handler,
nxt_work_queue_add(ev->write_work_queue, ev->write_handler,
ev->task, ev, ev->data);
found = 1;
}

View File

@@ -343,8 +343,7 @@ nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b,
}
}
nxt_thread_work_queue_add(task->thread, wq, b->completion_handler, task,
b, b->parent);
nxt_work_queue_add(wq, b->completion_handler, task, b, b->parent);
b = b->next;
}

View File

@@ -495,7 +495,7 @@ nxt_job_sockaddr_parse(nxt_job_sockaddr_parse_t *jbs)
return;
}
nxt_job_return(&jbs->resolve.job.task, &jbs->resolve.job, handler);
nxt_job_return(jbs->resolve.job.task, &jbs->resolve.job, handler);
}

View File

@@ -180,16 +180,22 @@ nxt_thread_time_cleanup(void *data)
void
nxt_thread_exit(nxt_thread_t *thr)
{
nxt_thread_link_t *link;
nxt_log_debug(thr->log, "thread exit");
if (thr->link != NULL) {
nxt_event_engine_post(thr->link->engine, thr->link->exit,
&thr->link->engine->task,
(void *) (uintptr_t) thr->handle,
NULL, &nxt_main_log);
nxt_free(thr->link);
link = thr->link;
thr->link = NULL;
if (link != NULL) {
/*
* link->handler is already set to an exit handler,
* and link->task is already set to engine->task.
* The link should be freed by the exit handler.
*/
link->work.obj = thr->handle;
nxt_event_engine_post(link->engine, &link->work);
}
nxt_thread_time_free(thr);

View File

@@ -92,7 +92,7 @@ typedef struct {
nxt_thread_start_t start;
void *data;
nxt_event_engine_t *engine;
nxt_work_handler_t exit;
nxt_work_t work;
} nxt_thread_link_t;
@@ -179,7 +179,6 @@ struct nxt_thread_s {
nxt_thread_time_t time;
nxt_event_engine_t *engine;
nxt_thread_work_queue_t work_queue;
/*
* Although pointer to a current fiber should be a property of

View File

@@ -38,8 +38,7 @@ nxt_thread_pool_create(nxt_uint_t max_threads, nxt_nsec_t timeout,
nxt_int_t
nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
nxt_task_t *task, void *obj, void *data)
nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_t *work)
{
nxt_thread_log_debug("thread pool post");
@@ -47,7 +46,7 @@ nxt_thread_pool_post(nxt_thread_pool_t *tp, nxt_work_handler_t handler,
return NXT_ERROR;
}
nxt_locked_work_queue_add(&tp->work_queue, handler, task, obj, data);
nxt_locked_work_queue_add(&tp->work_queue, work);
(void) nxt_sem_post(&tp->sem);
@@ -66,6 +65,11 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
return NXT_OK;
}
if (tp->max_threads == 0) {
/* The pool is being destroyed. */
return NXT_ERROR;
}
nxt_thread_spin_lock(&tp->work_queue.lock);
ret = NXT_OK;
@@ -78,8 +82,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
if (nxt_fast_path(nxt_sem_init(&tp->sem, 0) == NXT_OK)) {
nxt_locked_work_queue_create(&tp->work_queue, 0);
link = nxt_malloc(sizeof(nxt_thread_link_t));
if (nxt_fast_path(link != NULL)) {
@@ -102,8 +104,6 @@ nxt_thread_pool_init(nxt_thread_pool_t *tp)
(void) nxt_atomic_fetch_add(&tp->threads, -1);
nxt_locked_work_queue_destroy(&tp->work_queue);
ret = NXT_ERROR;
}
@@ -142,8 +142,6 @@ nxt_thread_pool_start(void *ctx)
tp->init();
}
nxt_thread_work_queue_create(thr, 8);
for ( ;; ) {
nxt_thread_pool_wait(tp);
@@ -152,18 +150,8 @@ nxt_thread_pool_start(void *ctx)
if (nxt_fast_path(handler != NULL)) {
task->thread = thr;
nxt_log_debug(thr->log, "locked work queue");
handler(task, obj, data);
}
for ( ;; ) {
thr->log = &nxt_main_log;
handler = nxt_thread_work_queue_pop(thr, &task, &obj, &data);
if (handler == NULL) {
break;
}
handler(task, obj, data);
}
@@ -245,7 +233,7 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
thr = nxt_thread();
if (!tp->ready) {
nxt_thread_work_queue_add(thr, &thr->work_queue.main, tp->exit,
nxt_work_queue_add(&thr->engine->fast_work_queue, tp->exit,
&tp->task, tp, NULL);
return;
}
@@ -254,7 +242,9 @@ nxt_thread_pool_destroy(nxt_thread_pool_t *tp)
/* Disable new threads creation and mark a pool as being destroyed. */
tp->max_threads = 0;
nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp, NULL);
nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp, NULL);
nxt_thread_pool_post(tp, &tp->work);
}
}
@@ -293,25 +283,24 @@ nxt_thread_pool_exit(nxt_task_t *task, void *obj, void *data)
nxt_debug(task, "thread pool threads: %A", threads);
if (threads > 1) {
nxt_thread_pool_post(tp, nxt_thread_pool_exit, &tp->task, tp,
nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
(void *) (uintptr_t) thread->handle);
nxt_thread_pool_post(tp, &tp->work);
} else {
nxt_debug(task, "thread pool destroy");
nxt_event_engine_post(tp->engine, tp->exit, &tp->task, tp,
(void *) (uintptr_t) thread->handle,
&nxt_main_log);
nxt_sem_destroy(&tp->sem);
nxt_locked_work_queue_destroy(&tp->work_queue);
nxt_work_set(&tp->work, nxt_thread_pool_exit, &tp->task, tp,
(void *) (uintptr_t) thread->handle);
nxt_free(tp);
nxt_event_engine_post(tp->engine, &tp->work);
/* The "tp" memory should be freed by tp->exit handler. */
}
nxt_thread_work_queue_destroy(thread);
nxt_thread_exit(thread);
nxt_unreachable();

View File

@@ -20,6 +20,7 @@ struct nxt_thread_pool_s {
nxt_sem_t sem;
nxt_nsec_t timeout;
nxt_work_t work;
nxt_task_t task;
nxt_locked_work_queue_t work_queue;
@@ -37,7 +38,7 @@ NXT_EXPORT nxt_thread_pool_t *nxt_thread_pool_create(nxt_uint_t max_threads,
nxt_event_engine_t *engine, nxt_work_handler_t exit);
NXT_EXPORT void nxt_thread_pool_destroy(nxt_thread_pool_t *tp);
NXT_EXPORT nxt_int_t nxt_thread_pool_post(nxt_thread_pool_t *tp,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
nxt_work_t *work);
#endif /* _NXT_UNIX_THREAD_POOL_H_INCLUDED_ */

View File

@@ -25,12 +25,7 @@
* a new spare chunk is allocated again.
*/
static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
nxt_thread_spinlock_t *lock);
static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
static nxt_work_handler_t nxt_locked_work_queue_pop_work(
nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
/* It should be adjusted with the "work_queue_bucket_items" directive. */
@@ -38,32 +33,29 @@ static nxt_uint_t nxt_work_queue_bucket_items = 409;
void
nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)
nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
{
nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t));
nxt_work_queue_name(&thr->work_queue.main, "main");
nxt_work_queue_name(&thr->work_queue.last, "last");
nxt_memzero(cache, sizeof(nxt_work_queue_cache_t));
if (chunk_size == 0) {
chunk_size = nxt_work_queue_bucket_items;
}
/* nxt_work_queue_chunk_t already has one work item. */
thr->work_queue.cache.chunk_size = chunk_size - 1;
cache->chunk_size = chunk_size - 1;
while (thr->work_queue.cache.next == NULL) {
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
while (cache->next == NULL) {
nxt_work_queue_allocate(cache);
}
}
void
nxt_thread_work_queue_destroy(nxt_thread_t *thr)
nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache)
{
nxt_work_queue_chunk_t *chunk, *next;
for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) {
for (chunk = cache->chunk; chunk; chunk = next) {
next = chunk->next;
nxt_free(chunk);
}
@@ -71,8 +63,7 @@ nxt_thread_work_queue_destroy(nxt_thread_t *thr)
static void
nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
nxt_thread_spinlock_t *lock)
nxt_work_queue_allocate(nxt_work_queue_cache_t *cache)
{
size_t size;
nxt_uint_t i, n;
@@ -102,7 +93,6 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
work = NULL;
} else {
nxt_work_queue_sleep(lock);
return;
}
@@ -111,36 +101,19 @@ nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
}
static void
nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
{
if (lock != NULL) {
nxt_thread_spin_unlock(lock);
}
nxt_nanosleep(100 * 1000000); /* 100ms */
if (lock != NULL) {
nxt_thread_spin_lock(lock);
}
}
/* Add a work to a work queue tail. */
void
nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
nxt_work_queue_attach(thr, wq);
for ( ;; ) {
work = thr->work_queue.cache.next;
work = wq->cache->next;
if (nxt_fast_path(work != NULL)) {
thr->work_queue.cache.next = work->next;
wq->cache->next = work->next;
work->next = NULL;
work->handler = handler;
@@ -160,81 +133,19 @@ nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
return;
}
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
nxt_work_queue_allocate(wq->cache);
}
}
/* Push a work to a work queue head. */
void
nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
{
nxt_work_t *work;
nxt_work_queue_attach(thr, wq);
for ( ;; ) {
work = thr->work_queue.cache.next;
if (nxt_fast_path(work != NULL)) {
thr->work_queue.cache.next = work->next;
work->next = wq->head;
work->handler = handler;
work->obj = obj;
work->data = data;
wq->head = work;
if (wq->tail == NULL) {
wq->tail = work;
}
return;
}
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
}
}
/* Attach a work queue to a thread work queue. */
void
nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
{
if (wq->next == NULL && wq != thr->work_queue.tail) {
if (thr->work_queue.tail != NULL) {
thr->work_queue.tail->next = wq;
} else {
thr->work_queue.head = wq;
}
thr->work_queue.tail = wq;
}
}
/* Pop a work from a thread work queue head. */
nxt_work_handler_t
nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
void **data)
{
nxt_work_t *work;
nxt_work_queue_t *wq;
wq = nxt_thread_current_work_queue(thr);
if (wq != NULL) {
work = wq->head;
if (work != NULL) {
wq->head = work->next;
if (work->next == NULL) {
@@ -242,269 +153,27 @@ nxt_thread_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
}
*task = work->task;
*obj = work->obj;
nxt_prefetch(*obj);
*data = work->data;
nxt_prefetch(*data);
work->next = thr->work_queue.cache.next;
thr->work_queue.cache.next = work;
#if (NXT_DEBUG)
if (work->handler == NULL) {
nxt_log_alert(thr->log, "null work handler");
nxt_abort();
}
#endif
work->next = wq->cache->next;
wq->cache->next = work;
return work->handler;
}
}
return NULL;
}
static nxt_work_queue_t *
nxt_thread_current_work_queue(nxt_thread_t *thr)
{
nxt_work_queue_t *wq, *next;
for (wq = thr->work_queue.head; wq != NULL; wq = next) {
if (wq->head != NULL) {
nxt_log_debug(thr->log, "work queue: %s", wq->name);
return wq;
}
/* Detach empty work queue. */
next = wq->next;
wq->next = NULL;
thr->work_queue.head = next;
}
thr->work_queue.tail = NULL;
return NULL;
}
/* Drop a work with specified data from a thread work queue. */
void
nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
{
nxt_work_t *work, *prev, *next, **link;
nxt_work_queue_t *wq;
for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) {
prev = NULL;
link = &wq->head;
for (work = wq->head; work != NULL; work = next) {
next = work->next;
if (data != work->obj) {
prev = work;
link = &work->next;
} else {
if (next == NULL) {
wq->tail = prev;
}
nxt_log_debug(thr->log, "work queue drop");
*link = next;
work->next = thr->work_queue.cache.next;
thr->work_queue.cache.next = work;
}
}
}
}
/* Add a work to the thread last work queue's tail. */
void
nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
void *obj, void *data)
{
nxt_work_t *work;
for ( ;; ) {
work = thr->work_queue.cache.next;
if (nxt_fast_path(work != NULL)) {
thr->work_queue.cache.next = work->next;
work->next = NULL;
work->handler = handler;
work->obj = obj;
work->data = data;
if (thr->work_queue.last.tail != NULL) {
thr->work_queue.last.tail->next = work;
} else {
thr->work_queue.last.head = work;
}
thr->work_queue.last.tail = work;
return;
}
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
}
}
/* Pop a work from the thread last work queue's head. */
nxt_work_handler_t
nxt_thread_last_work_queue_pop(nxt_thread_t *thr, nxt_task_t **task, void **obj,
void **data)
{
nxt_work_t *work;
work = thr->work_queue.last.head;
if (work != NULL) {
nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
thr->work_queue.last.head = work->next;
if (work->next == NULL) {
thr->work_queue.last.tail = NULL;
}
*task = work->task;
*obj = work->obj;
nxt_prefetch(*obj);
*data = work->data;
nxt_prefetch(*data);
work->next = thr->work_queue.cache.next;
thr->work_queue.cache.next = work;
#if (NXT_DEBUG)
if (work->handler == NULL) {
nxt_log_alert(thr->log, "null work handler");
nxt_abort();
}
#endif
return work->handler;
}
return NULL;
}
void
nxt_work_queue_destroy(nxt_work_queue_t *wq)
{
nxt_thread_t *thr;
nxt_work_queue_t *q;
thr = nxt_thread();
/* Detach from a thread work queue. */
if (thr->work_queue.head == wq) {
thr->work_queue.head = wq->next;
q = NULL;
goto found;
}
for (q = thr->work_queue.head; q != NULL; q = q->next) {
if (q->next == wq) {
q->next = wq->next;
goto found;
}
}
return;
found:
if (thr->work_queue.tail == wq) {
thr->work_queue.tail = q;
}
/* Move all queue's works to a thread work queue cache. */
if (wq->tail != NULL) {
wq->tail->next = thr->work_queue.cache.next;
}
if (wq->head != NULL) {
thr->work_queue.cache.next = wq->head;
}
}
/* Locked work queue operations. */
void
nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size)
{
nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t));
if (chunk_size == 0) {
chunk_size = nxt_work_queue_bucket_items;
}
lwq->cache.chunk_size = chunk_size;
while (lwq->cache.next == NULL) {
nxt_work_queue_allocate(&lwq->cache, NULL);
}
}
void
nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
{
nxt_work_queue_chunk_t *chunk, *next;
for (chunk = lwq->cache.chunk; chunk; chunk = next) {
next = chunk->next;
nxt_free(chunk);
}
}
/* Add a work to a locked work queue tail. */
void
nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data)
nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq, nxt_work_t *work)
{
nxt_work_t *work;
nxt_thread_spin_lock(&lwq->lock);
for ( ;; ) {
work = lwq->cache.next;
if (nxt_fast_path(work != NULL)) {
lwq->cache.next = work->next;
work->next = NULL;
work->handler = handler;
work->task = task;
work->obj = obj;
work->data = data;
if (lwq->tail != NULL) {
lwq->tail->next = work;
@@ -514,12 +183,6 @@ nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
lwq->tail = work;
break;
}
nxt_work_queue_allocate(&lwq->cache, &lwq->lock);
}
nxt_thread_spin_unlock(&lwq->lock);
}
@@ -530,33 +193,21 @@ nxt_work_handler_t
nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
void **obj, void **data)
{
nxt_work_t *work;
nxt_work_handler_t handler;
handler = NULL;
nxt_thread_spin_lock(&lwq->lock);
handler = nxt_locked_work_queue_pop_work(lwq, task, obj, data);
nxt_thread_spin_unlock(&lwq->lock);
return handler;
}
static nxt_work_handler_t
nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
void **obj, void **data)
{
nxt_work_t *work;
work = lwq->head;
if (work == NULL) {
return NULL;
}
if (work != NULL) {
*task = work->task;
*obj = work->obj;
nxt_prefetch(*obj);
*data = work->data;
nxt_prefetch(*data);
@@ -566,10 +217,12 @@ nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, nxt_task_t **task,
lwq->tail = NULL;
}
work->next = lwq->cache.next;
lwq->cache.next = work;
handler = work->handler;
}
return work->handler;
nxt_thread_spin_unlock(&lwq->lock);
return handler;
}
@@ -579,29 +232,23 @@ void
nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
nxt_work_queue_t *wq)
{
void *obj, *data;
nxt_task_t *task;
nxt_work_handler_t handler;
/* Locked work queue head can be tested without a lock. */
if (nxt_fast_path(lwq->head == NULL)) {
return;
}
nxt_work_t *work;
nxt_thread_spin_lock(&lwq->lock);
for ( ;; ) {
handler = nxt_locked_work_queue_pop_work(lwq, &task, &obj, &data);
work = lwq->head;
if (handler == NULL) {
break;
}
task->thread = thr;
nxt_thread_work_queue_add(thr, wq, handler, task, obj, data);
}
lwq->head = NULL;
lwq->tail = NULL;
nxt_thread_spin_unlock(&lwq->lock);
while (work != NULL) {
work->task->thread = thr;
nxt_work_queue_add(wq, work->handler, work->task,
work->obj, work->data);
work = work->next;
}
}

View File

@@ -65,22 +65,13 @@ typedef struct nxt_work_queue_s nxt_work_queue_t;
struct nxt_work_queue_s {
nxt_work_t *head;
nxt_work_t *tail;
nxt_work_queue_t *next;
nxt_work_queue_cache_t *cache;
#if (NXT_DEBUG)
const char *name;
#endif
};
typedef struct {
nxt_work_queue_t *head;
nxt_work_queue_t *tail;
nxt_work_queue_t main;
nxt_work_queue_t last;
nxt_work_queue_cache_t cache;
} nxt_thread_work_queue_t;
typedef struct {
nxt_thread_spinlock_t lock;
nxt_work_t *head;
@@ -89,34 +80,26 @@ typedef struct {
} nxt_locked_work_queue_t;
NXT_EXPORT void nxt_thread_work_queue_create(nxt_thread_t *thr,
NXT_EXPORT void nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache,
size_t chunk_size);
NXT_EXPORT void nxt_thread_work_queue_destroy(nxt_thread_t *thr);
NXT_EXPORT void nxt_thread_work_queue_add(nxt_thread_t *thr,
nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task,
void *obj, void *data);
NXT_EXPORT void nxt_thread_work_queue_push(nxt_thread_t *thr,
nxt_work_queue_t *wq, nxt_work_handler_t handler, nxt_task_t *task,
void *obj, void *data);
NXT_EXPORT void nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq);
NXT_EXPORT nxt_work_handler_t nxt_thread_work_queue_pop(nxt_thread_t *thr,
NXT_EXPORT void nxt_work_queue_cache_destroy(nxt_work_queue_cache_t *cache);
NXT_EXPORT void nxt_work_queue_add(nxt_work_queue_t *wq,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
NXT_EXPORT nxt_work_handler_t nxt_work_queue_pop(nxt_work_queue_t *wq,
nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data);
#define \
nxt_thread_current_work_queue_add(thr, handler, task, obj, data) \
#define nxt_work_set(_work, _handler, _task, _obj, _data) \
do { \
nxt_thread_t *_thr = thr; \
nxt_work_t *work = _work; \
\
nxt_thread_work_queue_add(_thr, _thr->work_queue.head, \
handler, task, obj, data); \
work->handler = _handler; \
work->task = _task; \
work->obj = _obj; \
work->data = _data; \
} while (0)
NXT_EXPORT void nxt_work_queue_destroy(nxt_work_queue_t *wq);
#if (NXT_DEBUG)
#define \
@@ -131,17 +114,8 @@ nxt_work_queue_name(_wq, _name)
#endif
NXT_EXPORT void nxt_thread_last_work_queue_add(nxt_thread_t *thr,
nxt_work_handler_t handler, void *obj, void *data);
NXT_EXPORT nxt_work_handler_t nxt_thread_last_work_queue_pop(nxt_thread_t *thr,
nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq,
size_t chunk_size);
NXT_EXPORT void nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq);
NXT_EXPORT void nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
nxt_work_handler_t handler, nxt_task_t *task, void *obj, void *data);
nxt_work_t *work);
NXT_EXPORT nxt_work_handler_t nxt_locked_work_queue_pop(
nxt_locked_work_queue_t *lwq, nxt_task_t **task, void **obj, void **data);
NXT_EXPORT void nxt_locked_work_queue_move(nxt_thread_t *thr,