216 lines
4.1 KiB
C
216 lines
4.1 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_main.h>
|
|
|
|
|
|
#if (NXT_THREADS)
|
|
static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
|
|
static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
|
|
void *data);
|
|
#endif
|
|
|
|
|
|
void *
|
|
nxt_job_create(nxt_mp_t *mp, size_t size)
|
|
{
|
|
size_t cache_size;
|
|
nxt_job_t *job;
|
|
|
|
if (mp == NULL) {
|
|
mp = nxt_mp_create(1024, 128, 256, 32);
|
|
if (nxt_slow_path(mp == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
job = nxt_mp_zget(mp, size);
|
|
cache_size = 0;
|
|
|
|
} else {
|
|
job = nxt_mp_zalloc(mp, size);
|
|
cache_size = size;
|
|
}
|
|
|
|
if (nxt_fast_path(job != NULL)) {
|
|
job->cache_size = (uint16_t) cache_size;
|
|
job->mem_pool = mp;
|
|
nxt_job_set_name(job, "job");
|
|
}
|
|
|
|
/* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
|
|
nxt_queue_self(&job->link);
|
|
|
|
return job;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_job_init(nxt_job_t *job, size_t size)
|
|
{
|
|
nxt_memzero(job, size);
|
|
|
|
nxt_job_set_name(job, "job");
|
|
|
|
nxt_queue_self(&job->link);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_job_destroy(nxt_task_t *task, void *data)
|
|
{
|
|
nxt_job_t *job;
|
|
|
|
job = data;
|
|
|
|
nxt_queue_remove(&job->link);
|
|
|
|
if (job->cache_size == 0) {
|
|
|
|
if (job->mem_pool != NULL) {
|
|
nxt_mp_destroy(job->mem_pool);
|
|
}
|
|
|
|
} else {
|
|
nxt_mp_free(job->mem_pool, job);
|
|
}
|
|
}
|
|
|
|
|
|
#if 0
|
|
|
|
nxt_int_t
|
|
nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job)
|
|
{
|
|
nxt_mem_pool_cleanup_t *mpcl;
|
|
|
|
mpcl = nxt_mem_pool_cleanup(mp, 0);
|
|
|
|
if (nxt_fast_path(mpcl != NULL)) {
|
|
mpcl->handler = nxt_job_destroy;
|
|
mpcl->data = job;
|
|
return NXT_OK;
|
|
}
|
|
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
/*
|
|
* The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
|
|
* calls and to the "nxt_work_handler_t" are required by Sun C.
|
|
*/
|
|
|
|
void
|
|
nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
|
|
{
|
|
nxt_debug(task, "%s start", job->name);
|
|
|
|
#if (NXT_THREADS)
|
|
|
|
if (job->thread_pool != NULL) {
|
|
nxt_int_t ret;
|
|
|
|
job->engine = task->thread->engine;
|
|
|
|
nxt_work_set(&job->work, nxt_job_thread_trampoline,
|
|
job->task, job, (void *) handler);
|
|
|
|
ret = nxt_thread_pool_post(job->thread_pool, &job->work);
|
|
|
|
if (ret == NXT_OK) {
|
|
return;
|
|
}
|
|
|
|
handler = job->abort_handler;
|
|
}
|
|
|
|
#endif
|
|
|
|
handler(job->task, job, job->data);
|
|
}
|
|
|
|
|
|
#if (NXT_THREADS)
|
|
|
|
/* A trampoline function is called by a thread pool thread. */
|
|
|
|
static void
|
|
nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_job_t *job;
|
|
nxt_work_handler_t handler;
|
|
|
|
job = obj;
|
|
handler = (nxt_work_handler_t) data;
|
|
|
|
nxt_debug(task, "%s thread", job->name);
|
|
|
|
if (nxt_slow_path(job->cancel)) {
|
|
nxt_job_return(task, job, job->abort_handler);
|
|
|
|
} else {
|
|
handler(job->task, job, job->data);
|
|
}
|
|
}
|
|
|
|
#endif
|
|
|
|
|
|
void
|
|
nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
|
|
{
|
|
nxt_debug(task, "%s return", job->name);
|
|
|
|
#if (NXT_THREADS)
|
|
|
|
if (job->engine != NULL) {
|
|
/* A return function is called in thread pool thread context. */
|
|
|
|
nxt_work_set(&job->work, nxt_job_thread_return_handler,
|
|
job->task, job, (void *) handler);
|
|
|
|
nxt_event_engine_post(job->engine, &job->work);
|
|
|
|
return;
|
|
}
|
|
|
|
#endif
|
|
|
|
if (nxt_slow_path(job->cancel)) {
|
|
nxt_debug(task, "%s cancellation", job->name);
|
|
handler = job->abort_handler;
|
|
}
|
|
|
|
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
|
|
handler, job->task, job, job->data);
|
|
}
|
|
|
|
|
|
#if (NXT_THREADS)
|
|
|
|
static void
|
|
nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_job_t *job;
|
|
nxt_work_handler_t handler;
|
|
|
|
job = obj;
|
|
handler = (nxt_work_handler_t) data;
|
|
|
|
job->task->thread = task->thread;
|
|
|
|
if (nxt_slow_path(job->cancel)) {
|
|
nxt_debug(task, "%s cancellation", job->name);
|
|
handler = job->abort_handler;
|
|
}
|
|
|
|
handler(job->task, job, job->data);
|
|
}
|
|
|
|
#endif
|