Changing router to application shared memory exchange protocol.

The application process needs to request the shared memory segment from the
router instead of the latter pushing the segment before sending a request to
the application.  This is required to simplify the communication between the
router and the application and to prepare the router for using the application
shared port and then the queue.
This commit is contained in:
Max Romanov
2020-08-11 19:20:13 +03:00
parent 3cbc22a6dc
commit 6e31d6cd39
8 changed files with 374 additions and 179 deletions

View File

@@ -282,8 +282,8 @@ fail:
static nxt_port_mmap_handler_t *
nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
nxt_port_new_port_mmap(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
nxt_bool_t tracking, nxt_int_t n)
{
void *mem;
nxt_fd_t fd;
@@ -295,15 +295,14 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
mmap_handler = nxt_zalloc(sizeof(nxt_port_mmap_handler_t));
if (nxt_slow_path(mmap_handler == NULL)) {
nxt_log(task, NXT_LOG_WARN, "failed to allocate mmap_handler");
nxt_alert(task, "failed to allocate mmap_handler");
return NULL;
}
port_mmap = nxt_port_mmap_at(&process->outgoing, process->outgoing.size);
port_mmap = nxt_port_mmap_at(mmaps, mmaps->size);
if (nxt_slow_path(port_mmap == NULL)) {
nxt_log(task, NXT_LOG_WARN,
"failed to add port mmap to outgoing array");
nxt_alert(task, "failed to add port mmap to mmaps array");
nxt_free(mmap_handler);
return NULL;
@@ -322,6 +321,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
}
mmap_handler->hdr = mem;
mmap_handler->fd = fd;
port_mmap->mmap_handler = mmap_handler;
nxt_port_mmap_handler_use(mmap_handler, 1);
@@ -331,10 +331,9 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
nxt_memset(hdr->free_tracking_map, 0xFFU, sizeof(hdr->free_tracking_map));
hdr->id = process->outgoing.size - 1;
hdr->id = mmaps->size - 1;
hdr->src_pid = nxt_pid;
hdr->dst_pid = process->pid;
hdr->sent_over = port->id;
hdr->sent_over = 0xFFFFu;
/* Mark first chunk as busy */
free_map = tracking ? hdr->free_tracking_map : hdr->free_map;
@@ -347,13 +346,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_mmap_set_chunk_busy(hdr->free_map, PORT_MMAP_CHUNK_COUNT);
nxt_port_mmap_set_chunk_busy(hdr->free_tracking_map, PORT_MMAP_CHUNK_COUNT);
nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
/* TODO handle error */
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> %PI",
hdr->id, nxt_pid, process->pid);
nxt_log(task, NXT_LOG_DEBUG, "new mmap #%D created for %PI -> ...",
hdr->id, nxt_pid);
return mmap_handler;
@@ -361,7 +355,7 @@ remove_fail:
nxt_free(mmap_handler);
process->outgoing.size--;
mmaps->size--;
return NULL;
}
@@ -445,34 +439,28 @@ nxt_shm_open(nxt_task_t *task, size_t size)
static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
nxt_port_mmap_get(nxt_task_t *task, nxt_port_mmaps_t *mmaps, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking)
{
nxt_int_t i, res, nchunks;
nxt_process_t *process;
nxt_free_map_t *free_map;
nxt_port_mmap_t *port_mmap;
nxt_port_mmap_t *end_port_mmap;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
process = port->process;
if (nxt_slow_path(process == NULL)) {
return NULL;
}
nxt_thread_mutex_lock(&mmaps->mutex);
nxt_thread_mutex_lock(&process->outgoing.mutex);
end_port_mmap = mmaps->elts + mmaps->size;
end_port_mmap = process->outgoing.elts + process->outgoing.size;
for (port_mmap = process->outgoing.elts;
for (port_mmap = mmaps->elts;
port_mmap < end_port_mmap;
port_mmap++)
{
mmap_handler = port_mmap->mmap_handler;
hdr = mmap_handler->hdr;
if (hdr->sent_over != 0xFFFFu && hdr->sent_over != port->id) {
if (hdr->sent_over != 0xFFFFu) {
continue;
}
@@ -510,11 +498,11 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
/* TODO introduce port_mmap limit and release wait. */
*c = 0;
mmap_handler = nxt_port_new_port_mmap(task, process, port, tracking, n);
mmap_handler = nxt_port_new_port_mmap(task, mmaps, tracking, n);
unlock_return:
nxt_thread_mutex_unlock(&process->outgoing.mutex);
nxt_thread_mutex_unlock(&mmaps->mutex);
return mmap_handler;
}
@@ -549,7 +537,7 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
nxt_int_t
nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_mmaps_t *mmaps,
nxt_port_mmap_tracking_t *tracking, uint32_t stream)
{
nxt_chunk_id_t c;
@@ -558,7 +546,7 @@ nxt_port_mmap_get_tracking(nxt_task_t *task, nxt_port_t *port,
nxt_debug(task, "request tracking for stream #%uD", stream);
mmap_handler = nxt_port_mmap_get(task, port, &c, 1, 1);
mmap_handler = nxt_port_mmap_get(task, mmaps, &c, 1, 1);
if (nxt_slow_path(mmap_handler == NULL)) {
return NXT_ERROR;
}
@@ -680,7 +668,7 @@ nxt_port_mmap_tracking_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_buf_t *
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_mmaps_t *mmaps, size_t size)
{
nxt_mp_t *mp;
nxt_buf_t *b;
@@ -707,7 +695,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
b->completion_handler = nxt_port_mmap_buf_completion;
nxt_buf_set_port_mmap(b);
mmap_handler = nxt_port_mmap_get(task, port, &c, nchunks, 0);
mmap_handler = nxt_port_mmap_get(task, mmaps, &c, nchunks, 0);
if (nxt_slow_path(mmap_handler == NULL)) {
mp = task->thread->engine->mem_pool;
nxt_mp_free(mp, b);
@@ -943,9 +931,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
{
nxt_port_method_t m;
nxt_port_mmap_header_t *hdr;
nxt_port_mmap_handler_t *mmap_handler;
nxt_port_method_t m;
m = NXT_PORT_METHOD_ANY;
@@ -956,9 +942,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
}
if (nxt_buf_is_port_mmap(b)) {
mmap_handler = b->parent;
hdr = mmap_handler->hdr;
if (m == NXT_PORT_METHOD_PLAIN) {
nxt_log_error(NXT_LOG_ERR, task->log,
"mixing plain and mmap buffers, "
@@ -967,16 +950,6 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
break;
}
if (port->pid != hdr->dst_pid) {
nxt_log_error(NXT_LOG_ERR, task->log,
"send mmap buffer for %PI to %PI, "
"using plain mode", hdr->dst_pid, port->pid);
m = NXT_PORT_METHOD_PLAIN;
break;
}
if (m == NXT_PORT_METHOD_ANY) {
nxt_debug(task, "using mmap mode");