Files
nginx-unit/src/nxt_buf.c
Max Romanov f319220a6c Redirecting buffer completion handler to specific engine.
There is a case in router where we use port in router connection thread.
Buffers are allocated within connection memory pool which can be used only in
this router thread. sendmsg() can be postponed into main router thread and
completion handler will compare current engine and post itself to correct
engine.
2017-07-07 16:01:34 +03:00

301 lines
6.0 KiB
C

/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#include <nxt_main.h>
static void nxt_buf_completion(nxt_task_t *task, void *obj, void *data);
static void nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data);
typedef struct {
nxt_work_t work;
nxt_event_engine_t *engine;
} nxt_buf_ts_t;
void
nxt_buf_mem_init(nxt_buf_t *b, void *start, size_t size)
{
b->mem.start = start;
b->mem.pos = start;
b->mem.free = start;
b->mem.end = nxt_pointer_to(start, size);
}
nxt_buf_t *
nxt_buf_mem_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags)
{
nxt_buf_t *b;
b = nxt_mp_alloc(mp, NXT_BUF_MEM_SIZE + size);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
nxt_memzero(b, NXT_BUF_MEM_SIZE);
b->data = mp;
b->completion_handler = nxt_buf_completion;
if (size != 0) {
b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + size;
}
return b;
}
nxt_buf_t *
nxt_buf_mem_ts_alloc(nxt_task_t *task, nxt_mp_t *mp, size_t size)
{
nxt_buf_t *b;
nxt_buf_ts_t *ts;
b = nxt_mp_retain(mp, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t) + size);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
nxt_memzero(b, NXT_BUF_MEM_SIZE + sizeof(nxt_buf_ts_t));
b->data = mp;
b->completion_handler = nxt_buf_ts_completion;
b->is_ts = 1;
if (size != 0) {
b->mem.start = nxt_pointer_to(b, NXT_BUF_MEM_SIZE +
sizeof(nxt_buf_ts_t));
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + size;
}
ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
ts->engine = task->thread->engine;
ts->work.handler = nxt_buf_ts_completion;
ts->work.task = task;
ts->work.obj = b;
ts->work.data = b->parent;
return b;
}
nxt_buf_t *
nxt_buf_file_alloc(nxt_mp_t *mp, size_t size, nxt_uint_t flags)
{
nxt_buf_t *b;
b = nxt_mp_alloc(mp, NXT_BUF_FILE_SIZE + size);
if (nxt_slow_path(b == NULL)) {
return NULL;
}
nxt_memzero(b, NXT_BUF_FILE_SIZE);
b->data = mp;
b->completion_handler = nxt_buf_completion;
nxt_buf_set_file(b);
if (size != 0) {
b->mem.start = nxt_pointer_to(b, NXT_BUF_FILE_SIZE);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + size;
}
return b;
}
nxt_buf_t *
nxt_buf_mmap_alloc(nxt_mp_t *mp, size_t size)
{
nxt_buf_t *b;
b = nxt_mp_zalloc(mp, NXT_BUF_MMAP_SIZE);
if (nxt_fast_path(b != NULL)) {
b->data = mp;
b->completion_handler = nxt_buf_completion;
nxt_buf_set_file(b);
nxt_buf_set_mmap(b);
nxt_buf_mem_set_size(&b->mem, size);
}
return b;
}
nxt_buf_t *
nxt_buf_sync_alloc(nxt_mp_t *mp, nxt_uint_t flags)
{
nxt_buf_t *b;
b = nxt_mp_zalloc(mp, NXT_BUF_SYNC_SIZE);
if (nxt_fast_path(b != NULL)) {
b->data = mp;
b->completion_handler = nxt_buf_completion;
nxt_buf_set_sync(b);
b->is_nobuf = ((flags & NXT_BUF_SYNC_NOBUF) != 0);
b->is_flush = ((flags & NXT_BUF_SYNC_FLUSH) != 0);
b->is_last = ((flags & NXT_BUF_SYNC_LAST) != 0);
}
return b;
}
void
nxt_buf_chain_add(nxt_buf_t **head, nxt_buf_t *in)
{
nxt_buf_t *b, **prev;
prev = head;
for (b = *head; b != NULL; b = b->next) {
prev = &b->next;
}
*prev = in;
}
size_t
nxt_buf_chain_length(nxt_buf_t *b)
{
size_t length;
length = 0;
while (b != NULL) {
length += b->mem.free - b->mem.pos;
b = b->next;
}
return length;
}
static void
nxt_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
nxt_buf_t *b, *parent;
b = obj;
parent = data;
nxt_debug(task, "buf completion: %p %p", b, b->mem.start);
#if (NXT_DEBUG)
if (nxt_slow_path(data != b->parent)) {
nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
data, b->parent);
nxt_abort();
}
#endif
mp = b->data;
nxt_mp_free(mp, b);
if (parent != NULL) {
nxt_debug(task, "parent retain:%uD", parent->retain);
parent->retain--;
if (parent->retain == 0) {
parent->mem.pos = parent->mem.free;
parent->completion_handler(task, parent, parent->parent);
}
}
}
nxt_int_t
nxt_buf_ts_handle(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_buf_ts_t *ts;
b = obj;
#if (NXT_DEBUG)
if (nxt_slow_path(b->is_ts == 0)) {
nxt_log_alert(task->log, "not a thread safe buf (%p) completed", b);
nxt_abort();
}
#endif
ts = nxt_pointer_to(b, NXT_BUF_MEM_SIZE);
if (ts->engine != task->thread->engine) {
nxt_debug(task, "buf ts: %p current engine is %p, expected %p",
b, task->thread->engine, ts->engine);
ts->work.handler = b->completion_handler;
ts->work.obj = obj;
ts->work.data = data;
nxt_event_engine_post(ts->engine, &ts->work);
return 1;
}
return 0;
}
static void
nxt_buf_ts_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_mp_t *mp;
nxt_buf_t *b, *parent;
b = obj;
parent = data;
if (nxt_buf_ts_handle(task, obj, data)) {
return;
}
nxt_debug(task, "buf ts completion: %p %p", b, b->mem.start);
#if (NXT_DEBUG)
if (nxt_slow_path(data != b->parent)) {
nxt_log_alert(task->log, "completion data (%p) != b->parent (%p)",
data, b->parent);
nxt_abort();
}
#endif
mp = b->data;
nxt_mp_release(mp, b);
if (parent != NULL) {
nxt_debug(task, "parent retain:%uD", parent->retain);
parent->retain--;
if (parent->retain == 0) {
parent->mem.pos = parent->mem.free;
parent->completion_handler(task, parent, parent->parent);
}
}
}