Supporting concurrent shared memory fd receive in router.
Two different router threads may send different requests to single application worker. In this case shared memory fds from worker to router will be send over 2 different router ports. These fds will be received and processed by different threads in any order. This patch made possible to add incoming shared memory segments in arbitrary order. Additionally, array and memory pool are no longer used to store segments because of pool's single threaded nature. Custom array-like structure nxt_port_mmaps_t introduced.
This commit is contained in:
@@ -103,7 +103,6 @@ typedef struct {
|
||||
#include <nxt_service.h>
|
||||
|
||||
typedef struct nxt_buf_s nxt_buf_t;
|
||||
typedef struct nxt_port_mmap_s nxt_port_mmap_t;
|
||||
#include <nxt_buf.h>
|
||||
#include <nxt_buf_pool.h>
|
||||
#include <nxt_recvbuf.h>
|
||||
|
||||
@@ -16,7 +16,7 @@
|
||||
|
||||
#include <nxt_port_memory_int.h>
|
||||
|
||||
void
|
||||
nxt_inline void
|
||||
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
|
||||
{
|
||||
if (port_mmap->hdr != NULL) {
|
||||
@@ -26,32 +26,51 @@ nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap)
|
||||
}
|
||||
|
||||
|
||||
static nxt_array_t *
|
||||
nxt_port_mmaps_create()
|
||||
static nxt_port_mmap_t *
|
||||
nxt_port_mmap_at(nxt_port_mmaps_t *port_mmaps, uint32_t i)
|
||||
{
|
||||
nxt_mp_t *mp;
|
||||
uint32_t cap;
|
||||
|
||||
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||
cap = port_mmaps->cap;
|
||||
|
||||
if (nxt_slow_path(mp == NULL)) {
|
||||
return NULL;
|
||||
if (cap == 0) {
|
||||
cap = i + 1;
|
||||
}
|
||||
|
||||
return nxt_array_create(mp, 1, sizeof(nxt_port_mmap_t));
|
||||
}
|
||||
while (i + 1 > cap) {
|
||||
|
||||
if (cap < 16) {
|
||||
cap = cap * 2;
|
||||
|
||||
static nxt_port_mmap_t *
|
||||
nxt_port_mmap_add(nxt_array_t *port_mmaps)
|
||||
{
|
||||
nxt_mp_thread_adopt(port_mmaps->mem_pool);
|
||||
} else {
|
||||
cap = cap + cap / 2;
|
||||
}
|
||||
}
|
||||
|
||||
return nxt_array_zero_add(port_mmaps);
|
||||
if (cap != port_mmaps->cap) {
|
||||
|
||||
port_mmaps->elts = nxt_realloc(port_mmaps->elts,
|
||||
cap * sizeof(nxt_port_mmap_t));
|
||||
if (nxt_slow_path(port_mmaps->elts == NULL)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nxt_memzero(port_mmaps->elts + port_mmaps->cap,
|
||||
sizeof(nxt_port_mmap_t) * (cap - port_mmaps->cap));
|
||||
|
||||
port_mmaps->cap = cap;
|
||||
}
|
||||
|
||||
if (i + 1 > port_mmaps->size) {
|
||||
port_mmaps->size = i + 1;
|
||||
}
|
||||
|
||||
return port_mmaps->elts + i;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
|
||||
nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free)
|
||||
{
|
||||
uint32_t i;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
@@ -60,18 +79,16 @@ nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool)
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_mp_thread_adopt(port_mmaps->mem_pool);
|
||||
|
||||
port_mmap = port_mmaps->elts;
|
||||
|
||||
for (i = 0; i < port_mmaps->nelts; i++) {
|
||||
nxt_port_mmap_destroy(port_mmap);
|
||||
for (i = 0; i < port_mmaps->size; i++) {
|
||||
nxt_port_mmap_destroy(port_mmap + i);
|
||||
}
|
||||
|
||||
port_mmaps->nelts = 0;
|
||||
port_mmaps->size = 0;
|
||||
|
||||
if (destroy_pool != 0) {
|
||||
nxt_mp_destroy(port_mmaps->mem_pool);
|
||||
if (free != 0) {
|
||||
nxt_free(port_mmaps->elts);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,53 +185,39 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nxt_thread_mutex_lock(&process->incoming_mutex);
|
||||
|
||||
if (process->incoming == NULL) {
|
||||
process->incoming = nxt_port_mmaps_create();
|
||||
}
|
||||
|
||||
if (nxt_slow_path(process->incoming == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "failed to allocate incoming array");
|
||||
|
||||
goto fail;
|
||||
}
|
||||
|
||||
port_mmap = nxt_port_mmap_add(process->incoming);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
|
||||
|
||||
goto fail;
|
||||
}
|
||||
|
||||
mem = nxt_mem_mmap(NULL, mmap_stat.st_size,
|
||||
PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
|
||||
if (nxt_slow_path(mem == MAP_FAILED)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "mmap() failed %E", nxt_errno);
|
||||
|
||||
port_mmap = NULL;
|
||||
|
||||
goto fail;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
port_mmap->hdr = mem;
|
||||
hdr = port_mmap->hdr;
|
||||
hdr = mem;
|
||||
|
||||
if (nxt_slow_path(port_mmap->hdr->id != process->incoming->nelts - 1)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "port mmap id mismatch (%d != %d)",
|
||||
port_mmap->hdr->id, process->incoming->nelts - 1);
|
||||
nxt_abort();
|
||||
nxt_thread_mutex_lock(&process->incoming.mutex);
|
||||
|
||||
port_mmap = nxt_port_mmap_at(&process->incoming, hdr->id);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
|
||||
|
||||
nxt_mem_munmap(mem, PORT_MMAP_SIZE);
|
||||
hdr = NULL;
|
||||
|
||||
goto fail;
|
||||
}
|
||||
|
||||
nxt_assert(hdr->src_pid == process->pid);
|
||||
nxt_assert(hdr->dst_pid == nxt_pid);
|
||||
|
||||
port_mmap->hdr = hdr;
|
||||
|
||||
hdr->sent_over = 0xFFFFu;
|
||||
|
||||
fail:
|
||||
|
||||
nxt_thread_mutex_unlock(&process->incoming_mutex);
|
||||
nxt_thread_mutex_unlock(&process->incoming.mutex);
|
||||
|
||||
return hdr;
|
||||
}
|
||||
@@ -230,19 +233,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
|
||||
port_mmap = NULL;
|
||||
|
||||
if (process->outgoing == NULL) {
|
||||
process->outgoing = nxt_port_mmaps_create();
|
||||
}
|
||||
|
||||
if (nxt_slow_path(process->outgoing == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "failed to allocate outgoing array");
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
port_mmap = nxt_port_mmap_add(process->outgoing);
|
||||
port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN,
|
||||
"failed to add port mmap to outgoing array");
|
||||
@@ -309,7 +300,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
|
||||
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
||||
|
||||
hdr->id = process->outgoing->nelts - 1;
|
||||
hdr->id = process->outgoing.size - 1;
|
||||
hdr->src_pid = nxt_pid;
|
||||
hdr->dst_pid = process->pid;
|
||||
hdr->sent_over = port->id;
|
||||
@@ -332,7 +323,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
|
||||
remove_fail:
|
||||
|
||||
nxt_array_remove(process->outgoing, port_mmap);
|
||||
process->outgoing.size--;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
@@ -342,7 +333,6 @@ static nxt_port_mmap_header_t *
|
||||
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
||||
size_t size)
|
||||
{
|
||||
nxt_array_t *outgoing;
|
||||
nxt_process_t *process;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_t *end_port_mmap;
|
||||
@@ -357,17 +347,10 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
||||
port_mmap = NULL;
|
||||
hdr = NULL;
|
||||
|
||||
nxt_thread_mutex_lock(&process->outgoing_mutex);
|
||||
nxt_thread_mutex_lock(&process->outgoing.mutex);
|
||||
|
||||
if (process->outgoing == NULL) {
|
||||
hdr = nxt_port_new_port_mmap(task, process, port);
|
||||
|
||||
goto unlock_return;
|
||||
}
|
||||
|
||||
outgoing = process->outgoing;
|
||||
port_mmap = outgoing->elts;
|
||||
end_port_mmap = port_mmap + outgoing->nelts;
|
||||
port_mmap = process->outgoing.elts;
|
||||
end_port_mmap = port_mmap + process->outgoing.size;
|
||||
|
||||
while (port_mmap < end_port_mmap) {
|
||||
|
||||
@@ -388,7 +371,7 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
||||
|
||||
unlock_return:
|
||||
|
||||
nxt_thread_mutex_unlock(&process->outgoing_mutex);
|
||||
nxt_thread_mutex_unlock(&process->outgoing.mutex);
|
||||
|
||||
return hdr;
|
||||
}
|
||||
@@ -397,9 +380,7 @@ unlock_return:
|
||||
static nxt_port_mmap_header_t *
|
||||
nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
||||
{
|
||||
nxt_array_t *incoming;
|
||||
nxt_process_t *process;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_port_mmap_header_t *hdr;
|
||||
|
||||
process = nxt_runtime_process_find(task->thread->runtime, spid);
|
||||
@@ -409,20 +390,17 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
||||
|
||||
hdr = NULL;
|
||||
|
||||
nxt_thread_mutex_lock(&process->incoming_mutex);
|
||||
nxt_thread_mutex_lock(&process->incoming.mutex);
|
||||
|
||||
incoming = process->incoming;
|
||||
|
||||
if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
|
||||
port_mmap = incoming->elts;
|
||||
hdr = port_mmap[id].hdr;
|
||||
if (nxt_fast_path(process->incoming.size > id)) {
|
||||
hdr = process->incoming.elts[id].hdr;
|
||||
|
||||
} else {
|
||||
nxt_log(task, NXT_LOG_WARN,
|
||||
"failed to get incoming mmap #%d for process %PI", id, spid);
|
||||
}
|
||||
|
||||
nxt_thread_mutex_unlock(&process->incoming_mutex);
|
||||
nxt_thread_mutex_unlock(&process->incoming.mutex);
|
||||
|
||||
return hdr;
|
||||
}
|
||||
|
||||
@@ -12,10 +12,7 @@
|
||||
|
||||
typedef struct nxt_port_mmap_header_s nxt_port_mmap_header_t;
|
||||
|
||||
void
|
||||
nxt_port_mmap_destroy(nxt_port_mmap_t *port_mmap);
|
||||
|
||||
void nxt_port_mmaps_destroy(nxt_array_t *port_mmaps, nxt_bool_t destroy_pool);
|
||||
void nxt_port_mmaps_destroy(nxt_port_mmaps_t *port_mmaps, nxt_bool_t free);
|
||||
|
||||
/*
|
||||
* Allocates nxt_but_t structure from port's mem_pool, assigns this buf 'mem'
|
||||
|
||||
@@ -61,8 +61,8 @@ nxt_process_create(nxt_task_t *task, nxt_process_t *process)
|
||||
nxt_process_close_ports(task, p);
|
||||
|
||||
} else {
|
||||
nxt_port_mmaps_destroy(p->incoming, 0);
|
||||
nxt_port_mmaps_destroy(p->outgoing, 0);
|
||||
nxt_port_mmaps_destroy(&p->incoming, 0);
|
||||
nxt_port_mmaps_destroy(&p->outgoing, 0);
|
||||
}
|
||||
|
||||
} nxt_runtime_process_loop;
|
||||
|
||||
@@ -42,6 +42,17 @@ struct nxt_process_init_s {
|
||||
};
|
||||
|
||||
|
||||
typedef struct nxt_port_mmap_s nxt_port_mmap_t;
|
||||
typedef struct nxt_port_mmaps_s nxt_port_mmaps_t;
|
||||
|
||||
struct nxt_port_mmaps_s {
|
||||
nxt_thread_mutex_t mutex;
|
||||
uint32_t size;
|
||||
uint32_t cap;
|
||||
nxt_port_mmap_t *elts;
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
nxt_pid_t pid;
|
||||
nxt_queue_t ports; /* of nxt_port_t */
|
||||
@@ -51,11 +62,8 @@ typedef struct {
|
||||
|
||||
nxt_process_init_t *init;
|
||||
|
||||
nxt_thread_mutex_t incoming_mutex;
|
||||
nxt_array_t *incoming; /* of nxt_port_mmap_t */
|
||||
|
||||
nxt_thread_mutex_t outgoing_mutex;
|
||||
nxt_array_t *outgoing; /* of nxt_port_mmap_t */
|
||||
nxt_port_mmaps_t incoming;
|
||||
nxt_port_mmaps_t outgoing;
|
||||
|
||||
nxt_thread_mutex_t cp_mutex;
|
||||
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
|
||||
|
||||
@@ -1576,8 +1576,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
||||
|
||||
nxt_queue_init(&process->ports);
|
||||
|
||||
nxt_thread_mutex_create(&process->incoming_mutex);
|
||||
nxt_thread_mutex_create(&process->outgoing_mutex);
|
||||
nxt_thread_mutex_create(&process->incoming.mutex);
|
||||
nxt_thread_mutex_create(&process->outgoing.mutex);
|
||||
nxt_thread_mutex_create(&process->cp_mutex);
|
||||
|
||||
process->use_count = 1;
|
||||
@@ -1595,8 +1595,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
nxt_assert(process->use_count == 0);
|
||||
nxt_assert(process->registered == 0);
|
||||
|
||||
nxt_port_mmaps_destroy(process->incoming, 1);
|
||||
nxt_port_mmaps_destroy(process->outgoing, 1);
|
||||
nxt_port_mmaps_destroy(&process->incoming, 1);
|
||||
nxt_port_mmaps_destroy(&process->outgoing, 1);
|
||||
|
||||
port = nxt_port_hash_first(&process->connected_ports, &lhe);
|
||||
|
||||
@@ -1606,8 +1606,8 @@ nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
port = nxt_port_hash_first(&process->connected_ports, &lhe);
|
||||
}
|
||||
|
||||
nxt_thread_mutex_destroy(&process->incoming_mutex);
|
||||
nxt_thread_mutex_destroy(&process->outgoing_mutex);
|
||||
nxt_thread_mutex_destroy(&process->incoming.mutex);
|
||||
nxt_thread_mutex_destroy(&process->outgoing.mutex);
|
||||
nxt_thread_mutex_destroy(&process->cp_mutex);
|
||||
|
||||
nxt_mp_free(rt->mem_pool, process);
|
||||
|
||||
Reference in New Issue
Block a user