Introducing src_pid for mmap header for accurate buf completion.
This allows to use shared memory to communicate with main process. This patch changes shared memory segment format and breaks compatibility with older modules.
This commit is contained in:
@@ -105,7 +105,8 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id)
|
|||||||
memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
||||||
|
|
||||||
hdr->id = process->outgoing.nelts - 1;
|
hdr->id = process->outgoing.nelts - 1;
|
||||||
hdr->pid = process->pid;
|
hdr->src_pid = getpid();
|
||||||
|
hdr->dst_pid = process->pid;
|
||||||
hdr->sent_over = id;
|
hdr->sent_over = id;
|
||||||
|
|
||||||
/* Mark first chunk as busy */
|
/* Mark first chunk as busy */
|
||||||
@@ -136,7 +137,7 @@ nxt_go_new_port_mmap(nxt_go_process_t *process, nxt_port_id_t id)
|
|||||||
*/
|
*/
|
||||||
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
|
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
|
||||||
|
|
||||||
rc = nxt_go_port_send(hdr->pid, id, &port_msg, sizeof(port_msg),
|
rc = nxt_go_port_send(hdr->dst_pid, id, &port_msg, sizeof(port_msg),
|
||||||
&cmsg, sizeof(cmsg));
|
&cmsg, sizeof(cmsg));
|
||||||
|
|
||||||
nxt_go_debug("new mmap #%d created for %d -> %d",
|
nxt_go_debug("new mmap #%d created for %d -> %d",
|
||||||
|
|||||||
@@ -107,6 +107,13 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
hdr = data;
|
hdr = data;
|
||||||
|
|
||||||
|
if (nxt_slow_path(hdr->src_pid != nxt_pid && hdr->dst_pid != nxt_pid)) {
|
||||||
|
nxt_debug(task, "mmap buf completion: mmap for other process pair "
|
||||||
|
"%PI->%PI", hdr->src_pid, hdr->dst_pid);
|
||||||
|
|
||||||
|
goto release_buf;
|
||||||
|
}
|
||||||
|
|
||||||
if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
|
if (b->is_port_mmap_sent && b->mem.pos > b->mem.start) {
|
||||||
/*
|
/*
|
||||||
* Chunks until b->mem.pos has been sent to other side,
|
* Chunks until b->mem.pos has been sent to other side,
|
||||||
@@ -123,9 +130,9 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
nxt_port_mmap_free_junk(p, b->mem.end - p);
|
nxt_port_mmap_free_junk(p, b->mem.end - p);
|
||||||
|
|
||||||
nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), %PI,%d,%d", b,
|
nxt_debug(task, "mmap buf completion: %p [%p,%d] (sent=%d), "
|
||||||
b->mem.start, b->mem.end - b->mem.start, b->is_port_mmap_sent,
|
"%PI->%PI,%d,%d", b, b->mem.start, b->mem.end - b->mem.start,
|
||||||
hdr->pid, hdr->id, c);
|
b->is_port_mmap_sent, hdr->src_pid, hdr->dst_pid, hdr->id, c);
|
||||||
|
|
||||||
while (p < b->mem.end) {
|
while (p < b->mem.end) {
|
||||||
nxt_port_mmap_set_chunk_free(hdr, c);
|
nxt_port_mmap_set_chunk_free(hdr, c);
|
||||||
@@ -134,6 +141,8 @@ nxt_port_mmap_buf_completion(nxt_task_t *task, void *obj, void *data)
|
|||||||
c++;
|
c++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
release_buf:
|
||||||
|
|
||||||
nxt_mp_release(mp, b);
|
nxt_mp_release(mp, b);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -198,6 +207,9 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
|||||||
nxt_abort();
|
nxt_abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_assert(hdr->src_pid == process->pid);
|
||||||
|
nxt_assert(hdr->dst_pid == nxt_pid);
|
||||||
|
|
||||||
hdr->sent_over = 0xFFFFu;
|
hdr->sent_over = 0xFFFFu;
|
||||||
|
|
||||||
fail:
|
fail:
|
||||||
@@ -298,7 +310,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
|||||||
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
nxt_memset(hdr->free_map, 0xFFU, sizeof(hdr->free_map));
|
||||||
|
|
||||||
hdr->id = process->outgoing->nelts - 1;
|
hdr->id = process->outgoing->nelts - 1;
|
||||||
hdr->pid = process->pid;
|
hdr->src_pid = nxt_pid;
|
||||||
|
hdr->dst_pid = process->pid;
|
||||||
hdr->sent_over = port->id;
|
hdr->sent_over = port->id;
|
||||||
|
|
||||||
/* Mark first chunk as busy */
|
/* Mark first chunk as busy */
|
||||||
@@ -307,8 +320,7 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
|||||||
/* Mark as busy chunk followed the last available chunk. */
|
/* Mark as busy chunk followed the last available chunk. */
|
||||||
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
|
nxt_port_mmap_set_chunk_busy(hdr, PORT_MMAP_CHUNK_COUNT);
|
||||||
|
|
||||||
nxt_debug(task, "send mmap fd %FD to process %PI", fd,
|
nxt_debug(task, "send mmap fd %FD to process %PI", fd, port->pid);
|
||||||
port->pid);
|
|
||||||
|
|
||||||
/* TODO handle error */
|
/* TODO handle error */
|
||||||
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
|
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_MMAP, fd, 0, 0, NULL);
|
||||||
@@ -452,9 +464,9 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
|
|||||||
nchunks++;
|
nchunks++;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI,%d,%d", b,
|
nxt_debug(task, "outgoing mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
|
||||||
b->mem.start, b->mem.end - b->mem.start,
|
b, b->mem.start, b->mem.end - b->mem.start,
|
||||||
hdr->pid, hdr->id, c);
|
hdr->src_pid, hdr->dst_pid, hdr->id, c);
|
||||||
|
|
||||||
c++;
|
c++;
|
||||||
nchunks--;
|
nchunks--;
|
||||||
@@ -576,9 +588,9 @@ nxt_port_mmap_get_incoming_buf(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
b->parent = hdr;
|
b->parent = hdr;
|
||||||
|
|
||||||
nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI,%d,%d",
|
nxt_debug(task, "incoming mmap buf allocation: %p [%p,%d] %PI->%PI,%d,%d",
|
||||||
b, b->mem.start, b->mem.end - b->mem.start,
|
b, b->mem.start, b->mem.end - b->mem.start,
|
||||||
hdr->pid, hdr->id, mmap_msg->chunk_id);
|
hdr->src_pid, hdr->dst_pid, hdr->id, mmap_msg->chunk_id);
|
||||||
|
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
@@ -699,10 +711,10 @@ nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (port->pid != hdr->pid) {
|
if (port->pid != hdr->dst_pid) {
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log,
|
nxt_log_error(NXT_LOG_ERR, task->log,
|
||||||
"send mmap buffer for %PI to %PI, "
|
"send mmap buffer for %PI to %PI, "
|
||||||
"using plain mode", hdr->pid, port->pid);
|
"using plain mode", hdr->dst_pid, port->pid);
|
||||||
|
|
||||||
m = NXT_PORT_METHOD_PLAIN;
|
m = NXT_PORT_METHOD_PLAIN;
|
||||||
|
|
||||||
|
|||||||
@@ -48,7 +48,8 @@ typedef nxt_atomic_uint_t nxt_free_map_t;
|
|||||||
/* Mapped at the start of shared memory segment. */
|
/* Mapped at the start of shared memory segment. */
|
||||||
struct nxt_port_mmap_header_s {
|
struct nxt_port_mmap_header_s {
|
||||||
uint32_t id;
|
uint32_t id;
|
||||||
nxt_pid_t pid; /* For sanity check. */
|
nxt_pid_t src_pid; /* For sanity check. */
|
||||||
|
nxt_pid_t dst_pid; /* For sanity check. */
|
||||||
nxt_port_id_t sent_over;
|
nxt_port_id_t sent_over;
|
||||||
nxt_free_map_t free_map[MAX_FREE_IDX];
|
nxt_free_map_t free_map[MAX_FREE_IDX];
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user