Initial version.
This commit is contained in:
610
src/nxt_work_queue.c
Normal file
610
src/nxt_work_queue.c
Normal file
@@ -0,0 +1,610 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) Igor Sysoev
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#include <nxt_main.h>
|
||||
|
||||
|
||||
/*
|
||||
* Available work items are crucial for overall engine operation, so
|
||||
* the items are preallocated in two chunks: cache and spare chunks.
|
||||
* By default each chunk preallocates 409 work items on two or four
|
||||
* CPU pages depending on platform. If all items in a cache chunk are
|
||||
* exhausted then a spare chunk becomes a cache chunk, and a new spare
|
||||
* chunk is allocated. This two-step allocation mitigates low memory
|
||||
* condition impact on work queue operation. However, if both chunks
|
||||
* are exhausted then a thread will sleep in reliance on another thread
|
||||
* frees some memory. However, this may lead to deadlock and probably
|
||||
* a process should be aborted. This behaviour should be considered as
|
||||
* abort on program stack exhaustion.
|
||||
*
|
||||
* The cache and spare chunks initially are also allocated in two steps:
|
||||
* a spare chunk is allocated first, then it becomes the cache chunk and
|
||||
* a new spare chunk is allocated again.
|
||||
*/
|
||||
|
||||
static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
|
||||
nxt_thread_spinlock_t *lock);
|
||||
static void nxt_work_queue_sleep(nxt_thread_spinlock_t *lock);
|
||||
static nxt_work_queue_t *nxt_thread_current_work_queue(nxt_thread_t *thr);
|
||||
static nxt_work_handler_t nxt_locked_work_queue_pop_work(
|
||||
nxt_locked_work_queue_t *lwq, void **obj, void **data, nxt_log_t **log);
|
||||
|
||||
|
||||
/* It should be adjusted with the "work_queue_bucket_items" directive. */
|
||||
static nxt_uint_t nxt_work_queue_bucket_items = 409;
|
||||
|
||||
|
||||
void
|
||||
nxt_thread_work_queue_create(nxt_thread_t *thr, size_t chunk_size)
|
||||
{
|
||||
nxt_memzero(&thr->work_queue, sizeof(nxt_thread_work_queue_t));
|
||||
|
||||
nxt_work_queue_name(&thr->work_queue.main, "main");
|
||||
nxt_work_queue_name(&thr->work_queue.last, "last");
|
||||
|
||||
if (chunk_size == 0) {
|
||||
chunk_size = nxt_work_queue_bucket_items;
|
||||
}
|
||||
|
||||
/* nxt_work_queue_chunk_t already has one work item. */
|
||||
thr->work_queue.cache.chunk_size = chunk_size - 1;
|
||||
|
||||
while (thr->work_queue.cache.next == NULL) {
|
||||
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_thread_work_queue_destroy(nxt_thread_t *thr)
|
||||
{
|
||||
nxt_work_queue_chunk_t *chunk, *next;
|
||||
|
||||
for (chunk = thr->work_queue.cache.chunk; chunk; chunk = next) {
|
||||
next = chunk->next;
|
||||
nxt_free(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_work_queue_allocate(nxt_work_queue_cache_t *cache,
|
||||
nxt_thread_spinlock_t *lock)
|
||||
{
|
||||
size_t size;
|
||||
nxt_uint_t i, n;
|
||||
nxt_work_t *work;
|
||||
nxt_work_queue_chunk_t *chunk;
|
||||
|
||||
n = cache->chunk_size;
|
||||
size = sizeof(nxt_work_queue_chunk_t) + n * sizeof(nxt_work_t);
|
||||
|
||||
chunk = nxt_malloc(size);
|
||||
|
||||
if (nxt_fast_path(chunk != NULL)) {
|
||||
|
||||
chunk->next = cache->chunk;
|
||||
cache->chunk = chunk;
|
||||
work = &chunk->work;
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
work[i].next = &work[i + 1];
|
||||
}
|
||||
|
||||
work[i].next = NULL;
|
||||
work++;
|
||||
|
||||
} else if (cache->spare != NULL) {
|
||||
|
||||
work = NULL;
|
||||
|
||||
} else {
|
||||
nxt_work_queue_sleep(lock);
|
||||
return;
|
||||
}
|
||||
|
||||
cache->next = cache->spare;
|
||||
cache->spare = work;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_work_queue_sleep(nxt_thread_spinlock_t *lock)
|
||||
{
|
||||
if (lock != NULL) {
|
||||
nxt_thread_spin_unlock(lock);
|
||||
}
|
||||
|
||||
nxt_nanosleep(100 * 1000000); /* 100ms */
|
||||
|
||||
if (lock != NULL) {
|
||||
nxt_thread_spin_lock(lock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Add a work to a work queue tail. */
|
||||
|
||||
void
|
||||
nxt_thread_work_queue_add(nxt_thread_t *thr, nxt_work_queue_t *wq,
|
||||
nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
nxt_work_queue_attach(thr, wq);
|
||||
|
||||
for ( ;; ) {
|
||||
work = thr->work_queue.cache.next;
|
||||
|
||||
if (nxt_fast_path(work != NULL)) {
|
||||
thr->work_queue.cache.next = work->next;
|
||||
work->next = NULL;
|
||||
|
||||
work->handler = handler;
|
||||
work->obj = obj;
|
||||
work->data = data;
|
||||
work->log = log;
|
||||
|
||||
if (wq->tail != NULL) {
|
||||
wq->tail->next = work;
|
||||
|
||||
} else {
|
||||
wq->head = work;
|
||||
}
|
||||
|
||||
wq->tail = work;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Push a work to a work queue head. */
|
||||
|
||||
void
|
||||
nxt_thread_work_queue_push(nxt_thread_t *thr, nxt_work_queue_t *wq,
|
||||
nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
nxt_work_queue_attach(thr, wq);
|
||||
|
||||
for ( ;; ) {
|
||||
work = thr->work_queue.cache.next;
|
||||
|
||||
if (nxt_fast_path(work != NULL)) {
|
||||
thr->work_queue.cache.next = work->next;
|
||||
work->next = wq->head;
|
||||
|
||||
work->handler = handler;
|
||||
work->obj = obj;
|
||||
work->data = data;
|
||||
work->log = log;
|
||||
|
||||
wq->head = work;
|
||||
|
||||
if (wq->tail == NULL) {
|
||||
wq->tail = work;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Attach a work queue to a thread work queue. */
|
||||
|
||||
void
|
||||
nxt_work_queue_attach(nxt_thread_t *thr, nxt_work_queue_t *wq)
|
||||
{
|
||||
if (wq->next == NULL && wq != thr->work_queue.tail) {
|
||||
|
||||
if (thr->work_queue.tail != NULL) {
|
||||
thr->work_queue.tail->next = wq;
|
||||
|
||||
} else {
|
||||
thr->work_queue.head = wq;
|
||||
}
|
||||
|
||||
thr->work_queue.tail = wq;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Pop a work from a thread work queue head. */
|
||||
|
||||
nxt_work_handler_t
|
||||
nxt_thread_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
|
||||
nxt_log_t **log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
nxt_work_queue_t *wq;
|
||||
|
||||
wq = nxt_thread_current_work_queue(thr);
|
||||
|
||||
if (wq != NULL) {
|
||||
|
||||
work = wq->head;
|
||||
|
||||
if (work != NULL) {
|
||||
wq->head = work->next;
|
||||
|
||||
if (work->next == NULL) {
|
||||
wq->tail = NULL;
|
||||
}
|
||||
|
||||
*obj = work->obj;
|
||||
nxt_prefetch(*obj);
|
||||
*data = work->data;
|
||||
nxt_prefetch(*data);
|
||||
|
||||
work->next = thr->work_queue.cache.next;
|
||||
thr->work_queue.cache.next = work;
|
||||
|
||||
*log = work->log;
|
||||
|
||||
#if (NXT_DEBUG)
|
||||
|
||||
if (work->handler == NULL) {
|
||||
nxt_log_alert(thr->log, "null work handler");
|
||||
nxt_abort();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
return work->handler;
|
||||
}
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static nxt_work_queue_t *
|
||||
nxt_thread_current_work_queue(nxt_thread_t *thr)
|
||||
{
|
||||
nxt_work_queue_t *wq, *next;
|
||||
|
||||
for (wq = thr->work_queue.head; wq != NULL; wq = next) {
|
||||
|
||||
if (wq->head != NULL) {
|
||||
nxt_log_debug(thr->log, "work queue: %s", wq->name);
|
||||
return wq;
|
||||
}
|
||||
|
||||
/* Detach empty work queue. */
|
||||
next = wq->next;
|
||||
wq->next = NULL;
|
||||
thr->work_queue.head = next;
|
||||
}
|
||||
|
||||
thr->work_queue.tail = NULL;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/* Drop a work with specified data from a thread work queue. */
|
||||
|
||||
void
|
||||
nxt_thread_work_queue_drop(nxt_thread_t *thr, void *data)
|
||||
{
|
||||
nxt_work_t *work, *prev, *next, **link;
|
||||
nxt_work_queue_t *wq;
|
||||
|
||||
for (wq = thr->work_queue.head; wq != NULL; wq = wq->next) {
|
||||
|
||||
prev = NULL;
|
||||
link = &wq->head;
|
||||
|
||||
for (work = wq->head; work != NULL; work = next) {
|
||||
|
||||
next = work->next;
|
||||
|
||||
if (data != work->obj) {
|
||||
prev = work;
|
||||
link = &work->next;
|
||||
|
||||
} else {
|
||||
if (next == NULL) {
|
||||
wq->tail = prev;
|
||||
}
|
||||
|
||||
nxt_log_debug(thr->log, "work queue drop");
|
||||
|
||||
*link = next;
|
||||
|
||||
work->next = thr->work_queue.cache.next;
|
||||
thr->work_queue.cache.next = work;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Add a work to the thread last work queue's tail. */
|
||||
|
||||
void
|
||||
nxt_thread_last_work_queue_add(nxt_thread_t *thr, nxt_work_handler_t handler,
|
||||
void *obj, void *data, nxt_log_t *log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
for ( ;; ) {
|
||||
work = thr->work_queue.cache.next;
|
||||
|
||||
if (nxt_fast_path(work != NULL)) {
|
||||
thr->work_queue.cache.next = work->next;
|
||||
work->next = NULL;
|
||||
|
||||
work->handler = handler;
|
||||
work->obj = obj;
|
||||
work->data = data;
|
||||
work->log = log;
|
||||
|
||||
if (thr->work_queue.last.tail != NULL) {
|
||||
thr->work_queue.last.tail->next = work;
|
||||
|
||||
} else {
|
||||
thr->work_queue.last.head = work;
|
||||
}
|
||||
|
||||
thr->work_queue.last.tail = work;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_work_queue_allocate(&thr->work_queue.cache, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Pop a work from the thread last work queue's head. */
|
||||
|
||||
nxt_work_handler_t
|
||||
nxt_thread_last_work_queue_pop(nxt_thread_t *thr, void **obj, void **data,
|
||||
nxt_log_t **log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
work = thr->work_queue.last.head;
|
||||
|
||||
if (work != NULL) {
|
||||
nxt_log_debug(thr->log, "work queue: %s", thr->work_queue.last.name);
|
||||
|
||||
thr->work_queue.last.head = work->next;
|
||||
|
||||
if (work->next == NULL) {
|
||||
thr->work_queue.last.tail = NULL;
|
||||
}
|
||||
|
||||
*obj = work->obj;
|
||||
nxt_prefetch(*obj);
|
||||
*data = work->data;
|
||||
nxt_prefetch(*data);
|
||||
|
||||
work->next = thr->work_queue.cache.next;
|
||||
thr->work_queue.cache.next = work;
|
||||
|
||||
*log = work->log;
|
||||
|
||||
#if (NXT_DEBUG)
|
||||
|
||||
if (work->handler == NULL) {
|
||||
nxt_log_alert(thr->log, "null work handler");
|
||||
nxt_abort();
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
return work->handler;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_work_queue_destroy(nxt_work_queue_t *wq)
|
||||
{
|
||||
nxt_thread_t *thr;
|
||||
nxt_work_queue_t *q;
|
||||
|
||||
thr = nxt_thread();
|
||||
|
||||
/* Detach from a thread work queue. */
|
||||
|
||||
if (thr->work_queue.head == wq) {
|
||||
thr->work_queue.head = wq->next;
|
||||
q = NULL;
|
||||
goto found;
|
||||
}
|
||||
|
||||
for (q = thr->work_queue.head; q != NULL; q = q->next) {
|
||||
if (q->next == wq) {
|
||||
q->next = wq->next;
|
||||
goto found;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
|
||||
found:
|
||||
|
||||
if (thr->work_queue.tail == wq) {
|
||||
thr->work_queue.tail = q;
|
||||
}
|
||||
|
||||
/* Move all queue's works to a thread work queue cache. */
|
||||
|
||||
if (wq->tail != NULL) {
|
||||
wq->tail->next = thr->work_queue.cache.next;
|
||||
}
|
||||
|
||||
if (wq->head != NULL) {
|
||||
thr->work_queue.cache.next = wq->head;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Locked work queue operations. */
|
||||
|
||||
void
|
||||
nxt_locked_work_queue_create(nxt_locked_work_queue_t *lwq, size_t chunk_size)
|
||||
{
|
||||
nxt_memzero(lwq, sizeof(nxt_locked_work_queue_t));
|
||||
|
||||
if (chunk_size == 0) {
|
||||
chunk_size = nxt_work_queue_bucket_items;
|
||||
}
|
||||
|
||||
lwq->cache.chunk_size = chunk_size;
|
||||
|
||||
while (lwq->cache.next == NULL) {
|
||||
nxt_work_queue_allocate(&lwq->cache, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_locked_work_queue_destroy(nxt_locked_work_queue_t *lwq)
|
||||
{
|
||||
nxt_work_queue_chunk_t *chunk, *next;
|
||||
|
||||
for (chunk = lwq->cache.chunk; chunk; chunk = next) {
|
||||
next = chunk->next;
|
||||
nxt_free(chunk);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Add a work to a locked work queue tail. */
|
||||
|
||||
void
|
||||
nxt_locked_work_queue_add(nxt_locked_work_queue_t *lwq,
|
||||
nxt_work_handler_t handler, void *obj, void *data, nxt_log_t *log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
nxt_thread_spin_lock(&lwq->lock);
|
||||
|
||||
for ( ;; ) {
|
||||
work = lwq->cache.next;
|
||||
|
||||
if (nxt_fast_path(work != NULL)) {
|
||||
lwq->cache.next = work->next;
|
||||
|
||||
work->next = NULL;
|
||||
work->handler = handler;
|
||||
work->obj = obj;
|
||||
work->data = data;
|
||||
work->log = log;
|
||||
|
||||
if (lwq->tail != NULL) {
|
||||
lwq->tail->next = work;
|
||||
|
||||
} else {
|
||||
lwq->head = work;
|
||||
}
|
||||
|
||||
lwq->tail = work;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
nxt_work_queue_allocate(&lwq->cache, &lwq->lock);
|
||||
}
|
||||
|
||||
nxt_thread_spin_unlock(&lwq->lock);
|
||||
}
|
||||
|
||||
|
||||
/* Pop a work from a locked work queue head. */
|
||||
|
||||
nxt_work_handler_t
|
||||
nxt_locked_work_queue_pop(nxt_locked_work_queue_t *lwq, void **obj,
|
||||
void **data, nxt_log_t **log)
|
||||
{
|
||||
nxt_work_handler_t handler;
|
||||
|
||||
nxt_thread_spin_lock(&lwq->lock);
|
||||
|
||||
handler = nxt_locked_work_queue_pop_work(lwq, obj, data, log);
|
||||
|
||||
nxt_thread_spin_unlock(&lwq->lock);
|
||||
|
||||
return handler;
|
||||
}
|
||||
|
||||
|
||||
static nxt_work_handler_t
|
||||
nxt_locked_work_queue_pop_work(nxt_locked_work_queue_t *lwq, void **obj,
|
||||
void **data, nxt_log_t **log)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
work = lwq->head;
|
||||
|
||||
if (work == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
*obj = work->obj;
|
||||
nxt_prefetch(*obj);
|
||||
*data = work->data;
|
||||
nxt_prefetch(*data);
|
||||
|
||||
lwq->head = work->next;
|
||||
|
||||
if (work->next == NULL) {
|
||||
lwq->tail = NULL;
|
||||
}
|
||||
|
||||
work->next = lwq->cache.next;
|
||||
lwq->cache.next = work;
|
||||
|
||||
*log = work->log;
|
||||
|
||||
return work->handler;
|
||||
}
|
||||
|
||||
|
||||
/* Move all works from a locked work queue to a usual work queue. */
|
||||
|
||||
void
|
||||
nxt_locked_work_queue_move(nxt_thread_t *thr, nxt_locked_work_queue_t *lwq,
|
||||
nxt_work_queue_t *wq)
|
||||
{
|
||||
void *obj, *data;
|
||||
nxt_log_t *log;
|
||||
nxt_work_handler_t handler;
|
||||
|
||||
/* Locked work queue head can be tested without a lock. */
|
||||
|
||||
if (nxt_fast_path(lwq->head == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_thread_spin_lock(&lwq->lock);
|
||||
|
||||
for ( ;; ) {
|
||||
handler = nxt_locked_work_queue_pop_work(lwq, &obj, &data, &log);
|
||||
|
||||
if (handler == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
nxt_thread_work_queue_add(thr, wq, handler, obj, data, log);
|
||||
}
|
||||
|
||||
nxt_thread_spin_unlock(&lwq->lock);
|
||||
}
|
||||
Reference in New Issue
Block a user