Sending shared port to application prototype.
Application process started with shared port (and queue) already configured. But still waits for PORT_ACK message from router to start request processing (so-called "ready state"). Waiting for router confirmation is necessary. Otherwise, the application may produce response and send it to router before the router have the information about the application process. This is a subject of further optimizations.
This commit is contained in:
@@ -1052,6 +1052,9 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
|
|||||||
init->read_port.in_fd = my_port->pair[0];
|
init->read_port.in_fd = my_port->pair[0];
|
||||||
init->read_port.out_fd = my_port->pair[1];
|
init->read_port.out_fd = my_port->pair[1];
|
||||||
|
|
||||||
|
init->shared_port_fd = conf->shared_port_fd;
|
||||||
|
init->shared_queue_fd = conf->shared_queue_fd;
|
||||||
|
|
||||||
init->log_fd = 2;
|
init->log_fd = 2;
|
||||||
|
|
||||||
init->shm_limit = conf->shm_limit;
|
init->shm_limit = conf->shm_limit;
|
||||||
|
|||||||
@@ -103,6 +103,9 @@ struct nxt_common_app_conf_s {
|
|||||||
size_t shm_limit;
|
size_t shm_limit;
|
||||||
uint32_t request_limit;
|
uint32_t request_limit;
|
||||||
|
|
||||||
|
nxt_fd_t shared_port_fd;
|
||||||
|
nxt_fd_t shared_queue_fd;
|
||||||
|
|
||||||
union {
|
union {
|
||||||
nxt_external_app_conf_t external;
|
nxt_external_app_conf_t external;
|
||||||
nxt_python_app_conf_t python;
|
nxt_python_app_conf_t python;
|
||||||
|
|||||||
@@ -106,6 +106,16 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
|
|||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rc = nxt_external_fd_no_cloexec(task, conf->shared_port_fd);
|
||||||
|
if (nxt_slow_path(rc != NXT_OK)) {
|
||||||
|
return NXT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = nxt_external_fd_no_cloexec(task, conf->shared_queue_fd);
|
||||||
|
if (nxt_slow_path(rc != NXT_OK)) {
|
||||||
|
return NXT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
end = buf + sizeof(buf);
|
end = buf + sizeof(buf);
|
||||||
|
|
||||||
p = nxt_sprintf(buf, end,
|
p = nxt_sprintf(buf, end,
|
||||||
@@ -113,12 +123,14 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
|
|||||||
"%PI,%ud,%d;"
|
"%PI,%ud,%d;"
|
||||||
"%PI,%ud,%d;"
|
"%PI,%ud,%d;"
|
||||||
"%PI,%ud,%d,%d;"
|
"%PI,%ud,%d,%d;"
|
||||||
|
"%d,%d;"
|
||||||
"%d,%z,%uD,%Z",
|
"%d,%z,%uD,%Z",
|
||||||
NXT_VERSION, my_port->process->stream,
|
NXT_VERSION, my_port->process->stream,
|
||||||
proto_port->pid, proto_port->id, proto_port->pair[1],
|
proto_port->pid, proto_port->id, proto_port->pair[1],
|
||||||
router_port->pid, router_port->id, router_port->pair[1],
|
router_port->pid, router_port->id, router_port->pair[1],
|
||||||
my_port->pid, my_port->id, my_port->pair[0],
|
my_port->pid, my_port->id, my_port->pair[0],
|
||||||
my_port->pair[1],
|
my_port->pair[1],
|
||||||
|
conf->shared_port_fd, conf->shared_queue_fd,
|
||||||
2, conf->shm_limit, conf->request_limit);
|
2, conf->shm_limit, conf->request_limit);
|
||||||
|
|
||||||
if (nxt_slow_path(p == end)) {
|
if (nxt_slow_path(p == end)) {
|
||||||
|
|||||||
@@ -382,25 +382,25 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
port = rt->port_by_type[NXT_PROCESS_ROUTER];
|
port = rt->port_by_type[NXT_PROCESS_ROUTER];
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
nxt_alert(task, "router port not found");
|
nxt_alert(task, "router port not found");
|
||||||
return;
|
goto close_fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
|
if (nxt_slow_path(port->pid != nxt_recv_msg_cmsg_pid(msg))) {
|
||||||
nxt_alert(task, "process %PI cannot start processes",
|
nxt_alert(task, "process %PI cannot start processes",
|
||||||
nxt_recv_msg_cmsg_pid(msg));
|
nxt_recv_msg_cmsg_pid(msg));
|
||||||
|
|
||||||
return;
|
goto close_fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
process = nxt_process_new(rt);
|
process = nxt_process_new(rt);
|
||||||
if (nxt_slow_path(process == NULL)) {
|
if (nxt_slow_path(process == NULL)) {
|
||||||
return;
|
goto close_fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
|
process->mem_pool = nxt_mp_create(1024, 128, 256, 32);
|
||||||
if (process->mem_pool == NULL) {
|
if (process->mem_pool == NULL) {
|
||||||
nxt_process_use(task, process, -1);
|
nxt_process_use(task, process, -1);
|
||||||
return;
|
goto close_fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
|
process->parent_port = rt->port_by_type[NXT_PROCESS_MAIN];
|
||||||
@@ -422,6 +422,9 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
goto failed;
|
goto failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
app_conf->shared_port_fd = msg->fd[0];
|
||||||
|
app_conf->shared_queue_fd = msg->fd[1];
|
||||||
|
|
||||||
start = b->mem.pos;
|
start = b->mem.pos;
|
||||||
|
|
||||||
app_conf->name.start = start;
|
app_conf->name.start = start;
|
||||||
@@ -509,6 +512,17 @@ nxt_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
ret = nxt_process_start(task, process);
|
ret = nxt_process_start(task, process);
|
||||||
if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
|
if (nxt_fast_path(ret == NXT_OK || ret == NXT_AGAIN)) {
|
||||||
|
|
||||||
|
/* Close shared port fds only in main process. */
|
||||||
|
if (ret == NXT_OK) {
|
||||||
|
nxt_fd_close(app_conf->shared_port_fd);
|
||||||
|
nxt_fd_close(app_conf->shared_queue_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Avoid fds close in caller. */
|
||||||
|
msg->fd[0] = -1;
|
||||||
|
msg->fd[1] = -1;
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -523,6 +537,14 @@ failed:
|
|||||||
nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
|
||||||
-1, msg->port_msg.stream, 0, NULL);
|
-1, msg->port_msg.stream, 0, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
close_fds:
|
||||||
|
|
||||||
|
nxt_fd_close(msg->fd[0]);
|
||||||
|
msg->fd[0] = -1;
|
||||||
|
|
||||||
|
nxt_fd_close(msg->fd[1]);
|
||||||
|
msg->fd[1] = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -176,8 +176,9 @@ nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
|
if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
|
||||||
|
|
||||||
nxt_debug(task, "port %d: message type:%uD",
|
nxt_debug(task, "port %d: message type:%uD fds:%d,%d",
|
||||||
msg->port->socket.fd, msg->port_msg.type);
|
msg->port->socket.fd, msg->port_msg.type,
|
||||||
|
msg->fd[0], msg->fd[1]);
|
||||||
|
|
||||||
handlers = msg->port->data;
|
handlers = msg->port->data;
|
||||||
handlers[msg->port_msg.type](task, msg);
|
handlers[msg->port_msg.type](task, msg);
|
||||||
|
|||||||
@@ -221,8 +221,6 @@ static void nxt_router_access_log_reopen_error(nxt_task_t *task,
|
|||||||
|
|
||||||
static void nxt_router_app_port_ready(nxt_task_t *task,
|
static void nxt_router_app_port_ready(nxt_task_t *task,
|
||||||
nxt_port_recv_msg_t *msg, void *data);
|
nxt_port_recv_msg_t *msg, void *data);
|
||||||
static nxt_int_t nxt_router_app_shared_port_send(nxt_task_t *task,
|
|
||||||
nxt_port_t *app_port);
|
|
||||||
static void nxt_router_app_port_error(nxt_task_t *task,
|
static void nxt_router_app_port_error(nxt_task_t *task,
|
||||||
nxt_port_recv_msg_t *msg, void *data);
|
nxt_port_recv_msg_t *msg, void *data);
|
||||||
|
|
||||||
@@ -394,6 +392,7 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
|
|||||||
{
|
{
|
||||||
size_t size;
|
size_t size;
|
||||||
uint32_t stream;
|
uint32_t stream;
|
||||||
|
nxt_fd_t port_fd, queue_fd;
|
||||||
nxt_int_t ret;
|
nxt_int_t ret;
|
||||||
nxt_app_t *app;
|
nxt_app_t *app;
|
||||||
nxt_buf_t *b;
|
nxt_buf_t *b;
|
||||||
@@ -413,6 +412,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
|
|||||||
nxt_debug(task, "app '%V' %p start process", &app->name, app);
|
nxt_debug(task, "app '%V' %p start process", &app->name, app);
|
||||||
|
|
||||||
b = NULL;
|
b = NULL;
|
||||||
|
port_fd = -1;
|
||||||
|
queue_fd = -1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (app->proto_port_requests > 0) {
|
if (app->proto_port_requests > 0) {
|
||||||
@@ -439,6 +440,9 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
|
|||||||
nxt_buf_cpystr(b, &app->name);
|
nxt_buf_cpystr(b, &app->name);
|
||||||
*b->mem.free++ = '\0';
|
*b->mem.free++ = '\0';
|
||||||
nxt_buf_cpystr(b, &app->conf);
|
nxt_buf_cpystr(b, &app->conf);
|
||||||
|
|
||||||
|
port_fd = app->shared_port->pair[0];
|
||||||
|
queue_fd = app->shared_port->queue_fd;
|
||||||
}
|
}
|
||||||
|
|
||||||
app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
|
app_joint_rpc = nxt_port_rpc_register_handler_ex(task, port,
|
||||||
@@ -451,8 +455,8 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
stream = nxt_port_rpc_ex_stream(app_joint_rpc);
|
stream = nxt_port_rpc_ex_stream(app_joint_rpc);
|
||||||
|
|
||||||
ret = nxt_port_socket_write(task, dport, NXT_PORT_MSG_START_PROCESS,
|
ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
|
||||||
-1, stream, port->id, b);
|
port_fd, queue_fd, stream, port->id, b);
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
nxt_port_rpc_cancel(task, port, stream);
|
nxt_port_rpc_cancel(task, port, stream);
|
||||||
|
|
||||||
@@ -2773,6 +2777,7 @@ nxt_router_app_rpc_create(nxt_task_t *task,
|
|||||||
{
|
{
|
||||||
size_t size;
|
size_t size;
|
||||||
uint32_t stream;
|
uint32_t stream;
|
||||||
|
nxt_fd_t port_fd, queue_fd;
|
||||||
nxt_int_t ret;
|
nxt_int_t ret;
|
||||||
nxt_buf_t *b;
|
nxt_buf_t *b;
|
||||||
nxt_port_t *router_port, *dport;
|
nxt_port_t *router_port, *dport;
|
||||||
@@ -2801,10 +2806,15 @@ nxt_router_app_rpc_create(nxt_task_t *task,
|
|||||||
|
|
||||||
dport = rt->port_by_type[NXT_PROCESS_MAIN];
|
dport = rt->port_by_type[NXT_PROCESS_MAIN];
|
||||||
|
|
||||||
|
port_fd = app->shared_port->pair[0];
|
||||||
|
queue_fd = app->shared_port->queue_fd;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
nxt_debug(task, "app '%V' prefork", &app->name);
|
nxt_debug(task, "app '%V' prefork", &app->name);
|
||||||
|
|
||||||
b = NULL;
|
b = NULL;
|
||||||
|
port_fd = -1;
|
||||||
|
queue_fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
|
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
|
||||||
@@ -2823,9 +2833,8 @@ nxt_router_app_rpc_create(nxt_task_t *task,
|
|||||||
|
|
||||||
stream = nxt_port_rpc_ex_stream(rpc);
|
stream = nxt_port_rpc_ex_stream(rpc);
|
||||||
|
|
||||||
ret = nxt_port_socket_write(task, dport,
|
ret = nxt_port_socket_write2(task, dport, NXT_PORT_MSG_START_PROCESS,
|
||||||
NXT_PORT_MSG_START_PROCESS,
|
port_fd, queue_fd, stream, router_port->id, b);
|
||||||
-1, stream, router_port->id, b);
|
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
nxt_port_rpc_cancel(task, router_port, stream);
|
nxt_port_rpc_cancel(task, router_port, stream);
|
||||||
goto fail;
|
goto fail;
|
||||||
@@ -2900,7 +2909,7 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|||||||
|
|
||||||
nxt_port_inc_use(port);
|
nxt_port_inc_use(port);
|
||||||
|
|
||||||
nxt_router_app_shared_port_send(task, port);
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
|
||||||
|
|
||||||
nxt_work_queue_add(&engine->fast_work_queue,
|
nxt_work_queue_add(&engine->fast_work_queue,
|
||||||
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
|
nxt_router_conf_apply, task, rpc->temp_conf, NULL);
|
||||||
@@ -4596,46 +4605,12 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|||||||
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
|
nxt_debug(task, "app '%V' new port ready, pid %PI, %d/%d",
|
||||||
&app->name, port->pid, app->processes, app->pending_processes);
|
&app->name, port->pid, app->processes, app->pending_processes);
|
||||||
|
|
||||||
nxt_router_app_shared_port_send(task, port);
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_PORT_ACK, -1, 0, 0, NULL);
|
||||||
|
|
||||||
nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
|
nxt_router_app_port_release(task, app, port, NXT_APR_NEW_PORT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t
|
|
||||||
nxt_router_app_shared_port_send(nxt_task_t *task, nxt_port_t *app_port)
|
|
||||||
{
|
|
||||||
nxt_buf_t *b;
|
|
||||||
nxt_port_t *port;
|
|
||||||
nxt_port_msg_new_port_t *msg;
|
|
||||||
|
|
||||||
b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
|
|
||||||
sizeof(nxt_port_data_t));
|
|
||||||
if (nxt_slow_path(b == NULL)) {
|
|
||||||
return NXT_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
port = app_port->app->shared_port;
|
|
||||||
|
|
||||||
nxt_debug(task, "send port %FD to process %PI",
|
|
||||||
port->pair[0], app_port->pid);
|
|
||||||
|
|
||||||
b->mem.free += sizeof(nxt_port_msg_new_port_t);
|
|
||||||
msg = (nxt_port_msg_new_port_t *) b->mem.pos;
|
|
||||||
|
|
||||||
msg->id = port->id;
|
|
||||||
msg->pid = port->pid;
|
|
||||||
msg->max_size = port->max_size;
|
|
||||||
msg->max_share = port->max_share;
|
|
||||||
msg->type = port->type;
|
|
||||||
|
|
||||||
return nxt_port_socket_write2(task, app_port,
|
|
||||||
NXT_PORT_MSG_NEW_PORT,
|
|
||||||
port->pair[0], port->queue_fd,
|
|
||||||
0, 0, b);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||||
void *data)
|
void *data)
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
|
|||||||
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
|
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
|
||||||
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
|
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
|
||||||
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
|
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
|
||||||
|
int *shared_port_fd, int *shared_queue_fd,
|
||||||
int *log_fd, uint32_t *stream, uint32_t *shm_limit,
|
int *log_fd, uint32_t *stream, uint32_t *shm_limit,
|
||||||
uint32_t *request_limit);
|
uint32_t *request_limit);
|
||||||
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
|
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
|
||||||
@@ -424,12 +425,12 @@ static pid_t nxt_unit_pid;
|
|||||||
nxt_unit_ctx_t *
|
nxt_unit_ctx_t *
|
||||||
nxt_unit_init(nxt_unit_init_t *init)
|
nxt_unit_init(nxt_unit_init_t *init)
|
||||||
{
|
{
|
||||||
int rc, queue_fd;
|
int rc, queue_fd, shared_queue_fd;
|
||||||
void *mem;
|
void *mem;
|
||||||
uint32_t ready_stream, shm_limit, request_limit;
|
uint32_t ready_stream, shm_limit, request_limit;
|
||||||
nxt_unit_ctx_t *ctx;
|
nxt_unit_ctx_t *ctx;
|
||||||
nxt_unit_impl_t *lib;
|
nxt_unit_impl_t *lib;
|
||||||
nxt_unit_port_t ready_port, router_port, read_port;
|
nxt_unit_port_t ready_port, router_port, read_port, shared_port;
|
||||||
|
|
||||||
nxt_unit_pid = getpid();
|
nxt_unit_pid = getpid();
|
||||||
|
|
||||||
@@ -440,6 +441,7 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
|
|
||||||
queue_fd = -1;
|
queue_fd = -1;
|
||||||
mem = MAP_FAILED;
|
mem = MAP_FAILED;
|
||||||
|
shared_port.out_fd = -1;
|
||||||
|
|
||||||
if (init->ready_port.id.pid != 0
|
if (init->ready_port.id.pid != 0
|
||||||
&& init->ready_stream != 0
|
&& init->ready_stream != 0
|
||||||
@@ -458,8 +460,12 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
|
nxt_unit_port_id_init(&read_port.id, read_port.id.pid,
|
||||||
read_port.id.id);
|
read_port.id.id);
|
||||||
|
|
||||||
|
shared_port.in_fd = init->shared_port_fd;
|
||||||
|
shared_queue_fd = init->shared_queue_fd;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
|
rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
|
||||||
|
&shared_port.in_fd, &shared_queue_fd,
|
||||||
&lib->log_fd, &ready_stream, &shm_limit,
|
&lib->log_fd, &ready_stream, &shm_limit,
|
||||||
&request_limit);
|
&request_limit);
|
||||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
@@ -525,6 +531,27 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_unit_port_id_init(&shared_port.id, read_port.id.pid,
|
||||||
|
NXT_UNIT_SHARED_PORT_ID);
|
||||||
|
|
||||||
|
mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
|
||||||
|
MAP_SHARED, shared_queue_fd, 0);
|
||||||
|
if (nxt_slow_path(mem == MAP_FAILED)) {
|
||||||
|
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", shared_queue_fd,
|
||||||
|
strerror(errno), errno);
|
||||||
|
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_unit_close(shared_queue_fd);
|
||||||
|
|
||||||
|
lib->shared_port = nxt_unit_add_port(ctx, &shared_port, mem);
|
||||||
|
if (nxt_slow_path(lib->shared_port == NULL)) {
|
||||||
|
nxt_unit_alert(NULL, "failed to add shared_port");
|
||||||
|
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
|
rc = nxt_unit_ready(ctx, ready_port.out_fd, ready_stream, queue_fd);
|
||||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
nxt_unit_alert(NULL, "failed to send READY message");
|
nxt_unit_alert(NULL, "failed to send READY message");
|
||||||
@@ -799,7 +826,8 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
|
|||||||
|
|
||||||
static int
|
static int
|
||||||
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
||||||
nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
|
nxt_unit_port_t *read_port, int *shared_port_fd, int *shared_queue_fd,
|
||||||
|
int *log_fd, uint32_t *stream,
|
||||||
uint32_t *shm_limit, uint32_t *request_limit)
|
uint32_t *shm_limit, uint32_t *request_limit)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
@@ -845,11 +873,13 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
|||||||
"%"PRId64",%"PRIu32",%d;"
|
"%"PRId64",%"PRIu32",%d;"
|
||||||
"%"PRId64",%"PRIu32",%d;"
|
"%"PRId64",%"PRIu32",%d;"
|
||||||
"%"PRId64",%"PRIu32",%d,%d;"
|
"%"PRId64",%"PRIu32",%d,%d;"
|
||||||
|
"%d,%d;"
|
||||||
"%d,%"PRIu32",%"PRIu32,
|
"%d,%"PRIu32",%"PRIu32,
|
||||||
&ready_stream,
|
&ready_stream,
|
||||||
&ready_pid, &ready_id, &ready_fd,
|
&ready_pid, &ready_id, &ready_fd,
|
||||||
&router_pid, &router_id, &router_fd,
|
&router_pid, &router_id, &router_fd,
|
||||||
&read_pid, &read_id, &read_in_fd, &read_out_fd,
|
&read_pid, &read_id, &read_in_fd, &read_out_fd,
|
||||||
|
shared_port_fd, shared_queue_fd,
|
||||||
log_fd, shm_limit, request_limit);
|
log_fd, shm_limit, request_limit);
|
||||||
|
|
||||||
if (nxt_slow_path(rc == EOF)) {
|
if (nxt_slow_path(rc == EOF)) {
|
||||||
@@ -859,9 +889,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
|||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nxt_slow_path(rc != 14)) {
|
if (nxt_slow_path(rc != 16)) {
|
||||||
nxt_unit_alert(NULL, "invalid number of variables in %s env: "
|
nxt_unit_alert(NULL, "invalid number of variables in %s env: "
|
||||||
"found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
|
"found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 16, vars);
|
||||||
|
|
||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
@@ -1137,7 +1167,6 @@ static int
|
|||||||
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||||
{
|
{
|
||||||
void *mem;
|
void *mem;
|
||||||
nxt_unit_impl_t *lib;
|
|
||||||
nxt_unit_port_t new_port, *port;
|
nxt_unit_port_t new_port, *port;
|
||||||
nxt_port_msg_new_port_t *new_port_msg;
|
nxt_port_msg_new_port_t *new_port_msg;
|
||||||
|
|
||||||
@@ -1162,34 +1191,18 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
recv_msg->stream, (int) new_port_msg->pid,
|
recv_msg->stream, (int) new_port_msg->pid,
|
||||||
(int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
|
(int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
|
||||||
|
|
||||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0]) != NXT_UNIT_OK)) {
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
|
|
||||||
nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
|
|
||||||
|
|
||||||
new_port.in_fd = recv_msg->fd[0];
|
|
||||||
new_port.out_fd = -1;
|
|
||||||
|
|
||||||
mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
|
|
||||||
MAP_SHARED, recv_msg->fd[1], 0);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
|
|
||||||
!= NXT_UNIT_OK))
|
|
||||||
{
|
|
||||||
return NXT_UNIT_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_unit_port_id_init(&new_port.id, new_port_msg->pid,
|
|
||||||
new_port_msg->id);
|
|
||||||
|
|
||||||
new_port.in_fd = -1;
|
|
||||||
new_port.out_fd = recv_msg->fd[0];
|
|
||||||
|
|
||||||
mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
|
|
||||||
MAP_SHARED, recv_msg->fd[1], 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_unit_port_id_init(&new_port.id, new_port_msg->pid, new_port_msg->id);
|
||||||
|
|
||||||
|
new_port.in_fd = -1;
|
||||||
|
new_port.out_fd = recv_msg->fd[0];
|
||||||
|
|
||||||
|
mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
|
||||||
|
MAP_SHARED, recv_msg->fd[1], 0);
|
||||||
|
|
||||||
if (nxt_slow_path(mem == MAP_FAILED)) {
|
if (nxt_slow_path(mem == MAP_FAILED)) {
|
||||||
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
|
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
|
||||||
strerror(errno), errno);
|
strerror(errno), errno);
|
||||||
@@ -1206,12 +1219,6 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
|
|
||||||
lib->shared_port = port;
|
|
||||||
|
|
||||||
return nxt_unit_ctx_ready(ctx);
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_unit_port_release(port);
|
nxt_unit_port_release(port);
|
||||||
|
|
||||||
return NXT_UNIT_OK;
|
return NXT_UNIT_OK;
|
||||||
|
|||||||
@@ -176,6 +176,8 @@ struct nxt_unit_init_s {
|
|||||||
uint32_t ready_stream;
|
uint32_t ready_stream;
|
||||||
nxt_unit_port_t router_port;
|
nxt_unit_port_t router_port;
|
||||||
nxt_unit_port_t read_port;
|
nxt_unit_port_t read_port;
|
||||||
|
int shared_port_fd;
|
||||||
|
int shared_queue_fd;
|
||||||
int log_fd;
|
int log_fd;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user