Using plain shared memory for configuration pass.

There is no restrictions on configration size and using segmented shared memory
only doubles memory usage because to parse configration on router side,
it needs to be 'plain' e. g. located in single continous memory buffer.
This commit is contained in:
Max Romanov
2020-07-25 11:06:32 +03:00
parent 10f90f0d48
commit c617480eef
4 changed files with 152 additions and 83 deletions

View File

@@ -54,7 +54,7 @@ static nxt_int_t nxt_controller_conf_default(void);
static void nxt_controller_conf_init_handler(nxt_task_t *task, static void nxt_controller_conf_init_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data); nxt_port_recv_msg_t *msg, void *data);
static void nxt_controller_flush_requests(nxt_task_t *task); static void nxt_controller_flush_requests(nxt_task_t *task);
static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, static nxt_int_t nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp,
nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data); nxt_conf_value_t *conf, nxt_port_rpc_handler_t handler, void *data);
static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data); static void nxt_controller_conn_init(nxt_task_t *task, void *obj, void *data);
@@ -344,7 +344,7 @@ nxt_controller_send_current_conf(nxt_task_t *task)
conf = nxt_controller_conf.root; conf = nxt_controller_conf.root;
if (conf != NULL) { if (conf != NULL) {
rc = nxt_controller_conf_send(task, conf, rc = nxt_controller_conf_send(task, nxt_controller_conf.pool, conf,
nxt_controller_conf_init_handler, NULL); nxt_controller_conf_init_handler, NULL);
if (nxt_fast_path(rc == NXT_OK)) { if (nxt_fast_path(rc == NXT_OK)) {
@@ -497,11 +497,14 @@ nxt_controller_flush_requests(nxt_task_t *task)
static nxt_int_t static nxt_int_t
nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf, nxt_controller_conf_send(nxt_task_t *task, nxt_mp_t *mp, nxt_conf_value_t *conf,
nxt_port_rpc_handler_t handler, void *data) nxt_port_rpc_handler_t handler, void *data)
{ {
void *mem;
u_char *end;
size_t size; size_t size;
uint32_t stream; uint32_t stream;
nxt_fd_t fd;
nxt_int_t rc; nxt_int_t rc;
nxt_buf_t *b; nxt_buf_t *b;
nxt_port_t *router_port, *controller_port; nxt_port_t *router_port, *controller_port;
@@ -518,30 +521,53 @@ nxt_controller_conf_send(nxt_task_t *task, nxt_conf_value_t *conf,
size = nxt_conf_json_length(conf, NULL); size = nxt_conf_json_length(conf, NULL);
b = nxt_port_mmap_get_buf(task, router_port, size); b = nxt_buf_mem_alloc(mp, sizeof(size_t), 0);
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); fd = nxt_shm_open(task, size);
if (nxt_slow_path(fd == -1)) {
return NXT_ERROR;
}
mem = nxt_mem_mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
if (nxt_slow_path(mem == MAP_FAILED)) {
goto fail;
}
end = nxt_conf_json_print(mem, conf, NULL);
nxt_mem_munmap(mem, size);
size = end - (u_char *) mem;
b->mem.free = nxt_cpymem(b->mem.pos, &size, sizeof(size_t));
stream = nxt_port_rpc_register_handler(task, controller_port, stream = nxt_port_rpc_register_handler(task, controller_port,
handler, handler, handler, handler,
router_port->pid, data); router_port->pid, data);
if (nxt_slow_path(stream == 0)) { if (nxt_slow_path(stream == 0)) {
return NXT_ERROR; goto fail;
} }
rc = nxt_port_socket_write(task, router_port, NXT_PORT_MSG_DATA_LAST, -1, rc = nxt_port_socket_write(task, router_port,
stream, controller_port->id, b); NXT_PORT_MSG_DATA_LAST | NXT_PORT_MSG_CLOSE_FD,
fd, stream, controller_port->id, b);
if (nxt_slow_path(rc != NXT_OK)) { if (nxt_slow_path(rc != NXT_OK)) {
nxt_port_rpc_cancel(task, controller_port, stream); nxt_port_rpc_cancel(task, controller_port, stream);
return NXT_ERROR;
goto fail;
} }
return NXT_OK; return NXT_OK;
fail:
nxt_fd_close(fd);
return NXT_ERROR;
} }
@@ -1201,7 +1227,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
goto alloc_fail; goto alloc_fail;
} }
rc = nxt_controller_conf_send(task, value, rc = nxt_controller_conf_send(task, mp, value,
nxt_controller_conf_handler, req); nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) { if (nxt_slow_path(rc != NXT_OK)) {
@@ -1282,7 +1308,7 @@ nxt_controller_process_config(nxt_task_t *task, nxt_controller_request_t *req,
goto alloc_fail; goto alloc_fail;
} }
rc = nxt_controller_conf_send(task, value, rc = nxt_controller_conf_send(task, mp, value,
nxt_controller_conf_handler, req); nxt_controller_conf_handler, req);
if (nxt_slow_path(rc != NXT_OK)) { if (nxt_slow_path(rc != NXT_OK)) {

View File

@@ -286,7 +286,6 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n) nxt_port_t *port, nxt_bool_t tracking, nxt_int_t n)
{ {
void *mem; void *mem;
u_char *p, name[64];
nxt_fd_t fd; nxt_fd_t fd;
nxt_int_t i; nxt_int_t i;
nxt_free_map_t *free_map; nxt_free_map_t *free_map;
@@ -310,63 +309,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
return NULL; return NULL;
} }
p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD", fd = nxt_shm_open(task, PORT_MMAP_SIZE);
nxt_pid, nxt_random(&task->thread->random));
*p = '\0';
#if (NXT_HAVE_MEMFD_CREATE)
fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
if (nxt_slow_path(fd == -1)) { if (nxt_slow_path(fd == -1)) {
nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
goto remove_fail;
}
nxt_debug(task, "memfd_create(%s): %FD", name, fd);
#elif (NXT_HAVE_SHM_OPEN_ANON)
fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
if (nxt_slow_path(fd == -1)) {
nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
goto remove_fail;
}
#elif (NXT_HAVE_SHM_OPEN)
/* Just in case. */
shm_unlink((char *) name);
fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
nxt_debug(task, "shm_open(%s): %FD", name, fd);
if (nxt_slow_path(fd == -1)) {
nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
goto remove_fail;
}
if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
nxt_errno);
}
#else
#error No working shared memory implementation.
#endif
if (nxt_slow_path(ftruncate(fd, PORT_MMAP_SIZE) == -1)) {
nxt_log(task, NXT_LOG_WARN, "ftruncate() failed %E", nxt_errno);
goto remove_fail; goto remove_fail;
} }
@@ -423,6 +367,83 @@ remove_fail:
} }
nxt_int_t
nxt_shm_open(nxt_task_t *task, size_t size)
{
nxt_fd_t fd;
#if (NXT_HAVE_MEMFD_CREATE || NXT_HAVE_SHM_OPEN)
u_char *p, name[64];
p = nxt_sprintf(name, name + sizeof(name), NXT_SHM_PREFIX "unit.%PI.%uxD",
nxt_pid, nxt_random(&task->thread->random));
*p = '\0';
#endif
#if (NXT_HAVE_MEMFD_CREATE)
fd = syscall(SYS_memfd_create, name, MFD_CLOEXEC);
if (nxt_slow_path(fd == -1)) {
nxt_alert(task, "memfd_create(%s) failed %E", name, nxt_errno);
return -1;
}
nxt_debug(task, "memfd_create(%s): %FD", name, fd);
#elif (NXT_HAVE_SHM_OPEN_ANON)
fd = shm_open(SHM_ANON, O_RDWR, S_IRUSR | S_IWUSR);
if (nxt_slow_path(fd == -1)) {
nxt_alert(task, "shm_open(SHM_ANON) failed %E", nxt_errno);
return -1;
}
nxt_debug(task, "shm_open(SHM_ANON): %FD", fd);
#elif (NXT_HAVE_SHM_OPEN)
/* Just in case. */
shm_unlink((char *) name);
fd = shm_open((char *) name, O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
if (nxt_slow_path(fd == -1)) {
nxt_alert(task, "shm_open(%s) failed %E", name, nxt_errno);
return -1;
}
nxt_debug(task, "shm_open(%s): %FD", name, fd);
if (nxt_slow_path(shm_unlink((char *) name) == -1)) {
nxt_log(task, NXT_LOG_WARN, "shm_unlink(%s) failed %E", name,
nxt_errno);
}
#else
#error No working shared memory implementation.
#endif
if (nxt_slow_path(ftruncate(fd, size) == -1)) {
nxt_alert(task, "ftruncate() failed %E", nxt_errno);
nxt_fd_close(fd);
return -1;
}
return fd;
}
static nxt_port_mmap_handler_t * static nxt_port_mmap_handler_t *
nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c, nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
nxt_int_t n, nxt_bool_t tracking) nxt_int_t n, nxt_bool_t tracking)

View File

@@ -71,5 +71,6 @@ typedef enum nxt_port_method_e nxt_port_method_t;
nxt_port_method_t nxt_port_method_t
nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b); nxt_port_mmap_get_method(nxt_task_t *task, nxt_port_t *port, nxt_buf_t *b);
nxt_int_t nxt_shm_open(nxt_task_t *task, size_t size);
#endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */ #endif /* _NXT_PORT_MEMORY_H_INCLUDED_ */

View File

@@ -906,8 +906,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void void
nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{ {
void *p;
size_t size;
nxt_int_t ret; nxt_int_t ret;
nxt_buf_t *b;
nxt_router_temp_conf_t *tmcf; nxt_router_temp_conf_t *tmcf;
tmcf = nxt_router_temp_conf(task); tmcf = nxt_router_temp_conf(task);
@@ -915,9 +916,33 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return; return;
} }
nxt_debug(task, "nxt_router_conf_data_handler(%O): %*s", if (nxt_slow_path(msg->fd == -1)) {
nxt_buf_used_size(msg->buf), nxt_alert(task, "conf_data_handler: invalid file shm fd");
(size_t) nxt_buf_used_size(msg->buf), msg->buf->mem.pos); return;
}
if (nxt_buf_mem_used_size(&msg->buf->mem) != sizeof(size_t)) {
nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
(int) nxt_buf_mem_used_size(&msg->buf->mem));
nxt_fd_close(msg->fd);
msg->fd = -1;
return;
}
nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0);
nxt_fd_close(msg->fd);
msg->fd = -1;
if (nxt_slow_path(p == MAP_FAILED)) {
return;
}
nxt_debug(task, "conf_data_handler(%uz): %*s", size, size, p);
tmcf->router_conf->router = nxt_router; tmcf->router_conf->router = nxt_router;
tmcf->stream = msg->port_msg.stream; tmcf->stream = msg->port_msg.stream;
@@ -928,20 +953,12 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
if (nxt_slow_path(tmcf->port == NULL)) { if (nxt_slow_path(tmcf->port == NULL)) {
nxt_alert(task, "reply port not found"); nxt_alert(task, "reply port not found");
return; goto fail;
} }
nxt_port_use(task, tmcf->port, 1); nxt_port_use(task, tmcf->port, 1);
b = nxt_buf_chk_make_plain(tmcf->router_conf->mem_pool, ret = nxt_router_conf_create(task, tmcf, p, nxt_pointer_to(p, size));
msg->buf, msg->size);
if (nxt_slow_path(b == NULL)) {
nxt_router_conf_error(task, tmcf);
return;
}
ret = nxt_router_conf_create(task, tmcf, b->mem.pos, b->mem.free);
if (nxt_fast_path(ret == NXT_OK)) { if (nxt_fast_path(ret == NXT_OK)) {
nxt_router_conf_apply(task, tmcf, NULL); nxt_router_conf_apply(task, tmcf, NULL);
@@ -949,6 +966,10 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
} else { } else {
nxt_router_conf_error(task, tmcf); nxt_router_conf_error(task, tmcf);
} }
fail:
nxt_mem_munmap(p, size);
} }