Fixed allocation of multiple shared memory chunks.

Previously, one shared memory chunk was allocated under mutex and other
chunks (if required) were allocated using atomic operations.  So such
allocation is not guaranteed and the result buffer can be less than
requested.

This commit moves multiple chunks allocation under mutex and guarantees
the result buffer is large enough.
This commit is contained in:
Max Romanov
2018-06-20 19:11:27 +03:00
parent 6157a599f2
commit b1d7844449
2 changed files with 52 additions and 35 deletions

View File

@@ -394,8 +394,9 @@ remove_fail:
static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
nxt_bool_t tracking)
nxt_int_t n, nxt_bool_t tracking)
{
nxt_int_t i, res, nchunks;
nxt_process_t *process;
nxt_free_map_t *free_map;
nxt_port_mmap_t *port_mmap;
@@ -408,8 +409,6 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
return NULL;
}
*c = 0;
nxt_thread_mutex_lock(&process->outgoing.mutex);
end_port_mmap = process->outgoing.elts + process->outgoing.size;
@@ -425,15 +424,38 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
continue;
}
*c = 0;
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
if (nxt_port_mmap_get_free_chunk(free_map, c)) {
goto unlock_return;
while (nxt_port_mmap_get_free_chunk(free_map, c)) {
nchunks = 1;
while (nchunks < n) {
res = nxt_port_mmap_chk_set_chunk_busy(free_map, *c + nchunks);
if (res == 0) {
for (i = 0; i < nchunks; i++) {
nxt_port_mmap_set_chunk_free(free_map, *c + i);
}
*c += nchunks + 1;
nchunks = 0;
break;
}
nchunks++;
}
if (nchunks == n) {
goto unlock_return;
}
}
}
/* TODO introduce port_mmap limit and release wait. */
*c = 0;
mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking);
unlock_return:
@@ -482,7 +504,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "request tracking for stream #%uD", stream);
mmap_handler = nxt_port_mmap_get(task, port, &c, 1);
mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
if (nxt_slow_path(mmap_handler == NULL)) {
return NXT_ERROR;
}
@@ -606,15 +628,23 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
{
size_t nchunks;
nxt_mp_t *mp;
nxt_buf_t *b;
nxt_int_t nchunks;
nxt_chunk_id_t c;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_debug(task, "request %z bytes shm buffer", size);
nchunks = (size + PORT_MMAP_CHUNK_SIZE - 1) / PORT_MMAP_CHUNK_SIZE;
if (nxt_slow_path(nchunks > PORT_MMAP_CHUNK_COUNT)) {
nxt_alert(task, "requested buffer (%z) too big", size);
return NULL;
}
b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
if (nxt_slow_path(b == NULL)) {
return NULL;
@@ -623,7 +653,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
mmap_handler = nxt_port_mmap_get(task, port, &c, 0);
mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
if (nxt_slow_path(mmap_handler == NULL)) {
mp = task->thread->engine->mem_pool;
nxt_mp_free(mp, b);
@@ -640,32 +670,12 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->mem.start = nxt_port_mmap_chunk_start(hdr, c);
b->mem.pos = b->mem.start;
b->mem.free = b->mem.start;
b->mem.end = b->mem.start + PORT_MMAP_CHUNK_SIZE;
nchunks = size / PORT_MMAP_CHUNK_SIZE;
if ((size % PORT_MMAP_CHUNK_SIZE) != 0 || nchunks == 0) {
nchunks++;
}
b->mem.end = b->mem.start + nchunks * PORT_MMAP_CHUNK_SIZE;
nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%uz] %PI->%PI,%d,%d",
b, b->mem.start, b->mem.end - b->mem.start,
hdr->src_pid, hdr->dst_pid, hdr->id, c);
c++;
nchunks--;
/* Try to acquire as much chunks as required. */
while (nchunks > 0) {
if (nxt_port_mmap_chk_set_chunk_busy(hdr->free_map, c) == 0) {
break;
}
b->mem.end += PORT_MMAP_CHUNK_SIZE;
c++;
nchunks--;
}
return b;
}

View File

@@ -129,13 +129,20 @@ nxt_port_mmap_chunk_start(nxt_port_mmap_header_t *hdr, nxt_chunk_id_t c)
static nxt_bool_t
nxt_port_mmap_get_free_chunk(nxt_free_map_t *m, nxt_chunk_id_t *c)
{
int ffs;
size_t i;
nxt_chunk_id_t chunk;
nxt_free_map_t bits;
const nxt_free_map_t default_mask = (nxt_free_map_t) -1;
int ffs;
size_t i, start;
nxt_chunk_id_t chunk;
nxt_free_map_t bits, mask;
start = FREE_IDX(*c);
mask = default_mask << ((*c) % FREE_BITS);
for (i = start; i < MAX_FREE_IDX; i++) {
bits = m[i] & mask;
mask = default_mask;
for (i = 0; i < MAX_FREE_IDX; i++) {
bits = m[i];
if (bits == 0) {
continue;
}