Introducing mmap_handler to count references to shared memory.

"All problems in computer science can be
                           solved by another level of indirection"

                                                   Butler Lampson

Completion handlers for application response buffers executed after
sending the data to client.  Application worker can be stopped right
after send response buffers to router.  Worker stop causes removal
of all data structures for the worker.

To prevent shared memory segment unmap, need to count the number of
buffers which uses it.  So instead of direct reference to shared
memory, need to reference to intermediate 'handler' structure with
use counter and pointer to shared memory.
This commit is contained in:
Max Romanov
2017-10-19 17:37:02 +03:00
parent 6532e46465
commit 6031c63225
3 changed files with 146 additions and 87 deletions

View File

@@ -16,12 +16,21 @@
#include <nxt_port_memory_int.h>
nxt_inline void
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
nxt_port_mmap_handler_use(nxt_port_mmap_handler_t *mmap_handler, int i)
{
if (port_mmap->hdr != NULL) {
nxt_mem_munmap(port_mmap->hdr, PORT_MMAP_SIZE);
port_mmap->hdr = NULL;
int c;
c = nxt_atomic_fetch_add(&mmap_handler->use_count, i);
if (i < 0 && c == -i) {
if (mmap_handler->hdr != NULL) {
nxt_mem_munmap(mmap_handler->hdr, PORT_MMAP_SIZE);
mmap_handler->hdr = NULL;
}
nxt_free(mmap_handler);
}
}
@@ -70,7 +79,7 @@ nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
void
nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts)
{
uint32_t i;
nxt_port_mmap_t *port_mmap;
@@ -82,12 +91,12 @@ nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
port_mmap = port_mmaps->elts;
for (i = 0; i < port_mmaps->size; i++) {
nxt_port_mmap_destroy(port_mmap + i);
nxt_port_mmap_handler_use(port_mmap[i].mmap_handler, -1);
}
port_mmaps->size = 0;
if (free != 0) {
if (free_elts != 0) {
nxt_free(port_mmaps->elts);
}
}
@@ -105,6 +114,7 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_buf_t *b;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
if (nxt_buf_ts_handle(task, obj, data)) {
return;
@@ -122,7 +132,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
}
#endif
hdr = data;
mmap_handler = data;
hdr = mmap_handler->hdr;
if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
nxt_debug(task, "mmap buf completion: mmap for other process pair "
@@ -160,11 +171,13 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
release_buf:
nxt_port_mmap_handler_use(mmap_handler, -1);
nxt_mp_release(mp, b);
}
nxt_port_mmap_header_t *
nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd)
{
@@ -172,6 +185,7 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
struct stat mmap_stat;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "got new mmap fd #%FD from process %PI",
fd, process->pid);
@@ -196,6 +210,15 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
hdr = mem;
mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
if (nxt_slow_path(mmap_handler == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
return NULL;
}
mmap_handler->hdr = hdr;
nxt_thread_mutex_lock(&process->incoming.mutex);
port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
@@ -205,13 +228,17 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_mem_munmap(mem, PORT_MMAP_SIZE);
hdr = NULL;
nxt_free(mmap_handler);
mmap_handler = NULL;
goto fail;
}
nxt_assert(hdr->src_pid == process->pid);
nxt_assert(hdr->dst_pid == nxt_pid);
port_mmap->hdr = hdr;
port_mmap->mmap_handler = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
hdr->sent_over = 0xFFFFu;
@@ -219,11 +246,11 @@ fail:
nxt_thread_mutex_unlock(&process->incoming.mutex);
return hdr;
return mmap_handler;
}
static nxt_port_mmap_header_t *
static nxt_port_mmap_handler_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port)
{
@@ -232,12 +259,21 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
if (nxt_slow_path(mmap_handler == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
return NULL;
}
port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN,
"failed to add port mmap to outgoing array");
nxt_free(mmap_handler);
return NULL;
}
@@ -293,10 +329,12 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
goto remove_fail;
}
port_mmap->hdr = mem;
mmap_handler->hdr = mem;
port_mmap->mmap_handler = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
/* Init segment header. */
hdr = port_mmap->hdr;
hdr = mmap_handler->hdr;
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
@@ -319,17 +357,19 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
hdr->id, nxt_pid, process->pid);
return hdr;
return mmap_handler;
remove_fail:
nxt_free(mmap_handler);
process->outgoing.size--;
return NULL;
}
static nxt_port_mmap_header_t *
static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
size_t size)
{
@@ -337,6 +377,7 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
process = port->process;
if (nxt_slow_path(process == NULL)) {
@@ -344,65 +385,61 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
}
*c = 0;
port_mmap = NULL;
hdr = NULL;
nxt_thread_mutex_lock(&process->outgoing.mutex);
port_mmap = process->outgoing.elts;
end_port_mmap = port_mmap + process->outgoing.size;
end_port_mmap = process->outgoing.elts + process->outgoing.size;
while (port_mmap < end_port_mmap) {
for (port_mmap = process->outgoing.elts;
port_mmap < end_port_mmap;
port_mmap++)
{
mmap_handler = port_mmap->mmap_handler;
hdr = mmap_handler->hdr;
if ( (port_mmap->hdr->sent_over == 0xFFFFu ||
port_mmap->hdr->sent_over == port->id) &&
nxt_port_mmap_get_free_chunk(port_mmap->hdr, c)) {
hdr = port_mmap->hdr;
goto unlock_return;
if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
continue;
}
port_mmap++;
if (nxt_port_mmap_get_free_chunk(hdr, c)) {
goto unlock_return;
}
}
/* TODO introduce port_mmap limit and release wait. */
hdr = nxt_port_new_port_mmap(task, process, port);
mmap_handler = nxt_port_new_port_mmap(task, process, port);
unlock_return:
nxt_thread_mutex_unlock(&process->outgoing.mutex);
return hdr;
return mmap_handler;
}
static nxt_port_mmap_header_t *
static nxt_port_mmap_handler_t *
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
{
nxt_process_t *process;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
process = nxt_runtime_process_find(task->thread->runtime, spid);
if (nxt_slow_path(process == NULL)) {
return NULL;
}
hdr = NULL;
mmap_handler = NULL;
nxt_thread_mutex_lock(&process->incoming.mutex);
if (nxt_fast_path(process->incoming.size > id)) {
hdr = process->incoming.elts[id].hdr;
} else {
nxt_log(task, NXT_LOG_WARN,
"failed to get incoming mmap #%d for process %PI", id, spid);
mmap_handler = process->incoming.elts[id].mmap_handler;
}
nxt_thread_mutex_unlock(&process->incoming.mutex);
return hdr;
return mmap_handler;
}
@@ -413,6 +450,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_buf_t *b;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "request %z bytes shm buffer", size);
@@ -424,13 +462,17 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
hdr = nxt_port_mmap_get(task, port, &c, size);
if (nxt_slow_path(hdr == NULL)) {
mmap_handler = nxt_port_mmap_get(task, port, &c, size);
if (nxt_slow_path(mmap_handler == NULL)) {
nxt_mp_release(task->thread->engine->mem_pool, b);
return NULL;
}
b->parent = hdr;
b->parent = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
hdr = mmap_handler->hdr;
b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
@@ -472,6 +514,7 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
size_t nchunks, free_size;
nxt_chunk_id_t c, start;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "request increase %z bytes shm buffer", size);
@@ -487,7 +530,8 @@ nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b, size_t size,
return NXT_OK;
}
hdr = b->parent;
mmap_handler = b->parent;
hdr = mmap_handler->hdr;
start = nxt_port_mmap_chunk_id(hdr, b->mem.end);
@@ -539,9 +583,11 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
size_t nchunks;
nxt_buf_t *b;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
hdr = nxt_port_get_port_incoming_mmap(task, spid, mmap_msg->mmap_id);
if (nxt_slow_path(hdr == NULL)) {
mmap_handler = nxt_port_get_port_incoming_mmap(task, spid,
mmap_msg->mmap_id);
if (nxt_slow_path(mmap_handler == NULL)) {
return NULL;
}
@@ -559,12 +605,15 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
nchunks++;
}
hdr = mmap_handler->hdr;
b->mem.start = nxt_port_mmap_chunk_start(hdr, mmap_msg->chunk_id);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start + mmap_msg->size;
b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
b->parent = hdr;
b->parent = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
b, b->mem.start, b->mem.end - b->mem.start,
@@ -583,6 +632,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t i;
nxt_port_mmap_msg_t *mmap_msg;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "prepare %z bytes message for transfer to process %PI "
"via shared memory", sb->size, port->pid);
@@ -606,7 +656,8 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
/* TODO clear b and exit */
}
hdr = bmem->parent;
mmap_handler = bmem->parent;
hdr = mmap_handler->hdr;
mmap_msg->mmap_id = hdr->id;
mmap_msg->chunk_id = nxt_port_mmap_chunk_id(hdr, bmem->mem.pos);
@@ -669,6 +720,7 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
nxt_port_method_t m;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
m = NXT_PORT_METHOD_ANY;
@@ -679,7 +731,8 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
hdr = b->parent;
mmap_handler = b->parent;
hdr = mmap_handler->hdr;
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,

View File

@@ -11,8 +11,9 @@
#define PORT_MMAP_MIN_SIZE (3 * sizeof(uint32_t))
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
typedef struct nxt_port_mmap_handler_s nxt_port_mmap_handler_t;
void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free);
void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free_elts);
/*
* Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
@@ -27,7 +28,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size);
nxt_int_t nxt_port_mmap_increase_buf(nxt_task_t *task, nxt_buf_t *b,
size_t size, size_t min_size);
nxt_port_mmap_header_t *
nxt_port_mmap_handler_t *
nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_fd_t fd);

View File

@@ -55,12 +55,17 @@ struct nxt_port_mmap_header_s {
};
struct nxt_port_mmap_handler_s {
nxt_port_mmap_header_t *hdr;
nxt_atomic_t use_count;
};
/*
* Element of nxt_process_t.incoming/outgoing, shared memory segment
* descriptor.
*/
struct nxt_port_mmap_s {
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
};
typedef struct nxt_port_mmap_msg_s nxt_port_mmap_msg_t;