Moving file descriptor blocking to libunit.
The default libunit behavior relies on blocking the recv() call for port file descriptors, which an application may override if needed. For external applications, port file descriptors were toggled to blocking mode before the exec() call. If the exec() call failed, descriptor remained blocked, so the process hanged while trying to read from it. This patch moves file descriptor mode switch inside libunit.
This commit is contained in:
@@ -1290,8 +1290,6 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
|
|||||||
init->ready_port.in_fd = -1;
|
init->ready_port.in_fd = -1;
|
||||||
init->ready_port.out_fd = main_port->pair[1];
|
init->ready_port.out_fd = main_port->pair[1];
|
||||||
|
|
||||||
nxt_fd_blocking(task, main_port->pair[1]);
|
|
||||||
|
|
||||||
init->ready_stream = my_port->process->stream;
|
init->ready_stream = my_port->process->stream;
|
||||||
|
|
||||||
init->router_port.id.pid = router_port->pid;
|
init->router_port.id.pid = router_port->pid;
|
||||||
@@ -1299,15 +1297,11 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
|
|||||||
init->router_port.in_fd = -1;
|
init->router_port.in_fd = -1;
|
||||||
init->router_port.out_fd = router_port->pair[1];
|
init->router_port.out_fd = router_port->pair[1];
|
||||||
|
|
||||||
nxt_fd_blocking(task, router_port->pair[1]);
|
|
||||||
|
|
||||||
init->read_port.id.pid = my_port->pid;
|
init->read_port.id.pid = my_port->pid;
|
||||||
init->read_port.id.id = my_port->id;
|
init->read_port.id.id = my_port->id;
|
||||||
init->read_port.in_fd = my_port->pair[0];
|
init->read_port.in_fd = my_port->pair[0];
|
||||||
init->read_port.out_fd = -1;
|
init->read_port.out_fd = -1;
|
||||||
|
|
||||||
nxt_fd_blocking(task, my_port->pair[0]);
|
|
||||||
|
|
||||||
init->log_fd = 2;
|
init->log_fd = 2;
|
||||||
|
|
||||||
return NXT_OK;
|
return NXT_OK;
|
||||||
|
|||||||
@@ -52,8 +52,6 @@ nxt_external_fd_no_cloexec(nxt_task_t *task, nxt_socket_t fd)
|
|||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_fd_blocking(task, fd);
|
|
||||||
|
|
||||||
return NXT_OK;
|
return NXT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -171,6 +171,7 @@ static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
|
|||||||
static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
|
static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
|
||||||
nxt_unit_read_buf_t *rbuf);
|
nxt_unit_read_buf_t *rbuf);
|
||||||
nxt_inline int nxt_unit_close(int fd);
|
nxt_inline int nxt_unit_close(int fd);
|
||||||
|
static int nxt_unit_fd_blocking(int fd);
|
||||||
|
|
||||||
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
|
static int nxt_unit_port_hash_add(nxt_lvlhsh_t *port_hash,
|
||||||
nxt_unit_port_t *port);
|
nxt_unit_port_t *port);
|
||||||
@@ -413,6 +414,7 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
}
|
}
|
||||||
|
|
||||||
queue_fd = -1;
|
queue_fd = -1;
|
||||||
|
mem = MAP_FAILED;
|
||||||
|
|
||||||
if (init->ready_port.id.pid != 0
|
if (init->ready_port.id.pid != 0
|
||||||
&& init->ready_stream != 0
|
&& init->ready_stream != 0
|
||||||
@@ -450,6 +452,11 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
|
|
||||||
ctx = &lib->main_ctx.ctx;
|
ctx = &lib->main_ctx.ctx;
|
||||||
|
|
||||||
|
rc = nxt_unit_fd_blocking(router_port.out_fd);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
|
lib->router_port = nxt_unit_add_port(ctx, &router_port, NULL);
|
||||||
if (nxt_slow_path(lib->router_port == NULL)) {
|
if (nxt_slow_path(lib->router_port == NULL)) {
|
||||||
nxt_unit_alert(NULL, "failed to add router_port");
|
nxt_unit_alert(NULL, "failed to add router_port");
|
||||||
@@ -473,12 +480,20 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
|
|
||||||
nxt_port_queue_init(mem);
|
nxt_port_queue_init(mem);
|
||||||
|
|
||||||
|
rc = nxt_unit_fd_blocking(read_port.in_fd);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
|
lib->main_ctx.read_port = nxt_unit_add_port(ctx, &read_port, mem);
|
||||||
if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
|
if (nxt_slow_path(lib->main_ctx.read_port == NULL)) {
|
||||||
nxt_unit_alert(NULL, "failed to add read_port");
|
nxt_unit_alert(NULL, "failed to add read_port");
|
||||||
|
|
||||||
munmap(mem, sizeof(nxt_port_queue_t));
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = nxt_unit_fd_blocking(ready_port.out_fd);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -486,8 +501,6 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
nxt_unit_alert(NULL, "failed to send READY message");
|
nxt_unit_alert(NULL, "failed to send READY message");
|
||||||
|
|
||||||
munmap(mem, sizeof(nxt_port_queue_t));
|
|
||||||
|
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -498,6 +511,10 @@ nxt_unit_init(nxt_unit_init_t *init)
|
|||||||
|
|
||||||
fail:
|
fail:
|
||||||
|
|
||||||
|
if (mem != MAP_FAILED) {
|
||||||
|
munmap(mem, sizeof(nxt_port_queue_t));
|
||||||
|
}
|
||||||
|
|
||||||
if (queue_fd != -1) {
|
if (queue_fd != -1) {
|
||||||
nxt_unit_close(queue_fd);
|
nxt_unit_close(queue_fd);
|
||||||
}
|
}
|
||||||
@@ -1064,7 +1081,6 @@ fail:
|
|||||||
static int
|
static int
|
||||||
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||||
{
|
{
|
||||||
int nb;
|
|
||||||
void *mem;
|
void *mem;
|
||||||
nxt_unit_impl_t *lib;
|
nxt_unit_impl_t *lib;
|
||||||
nxt_unit_port_t new_port, *port;
|
nxt_unit_port_t new_port, *port;
|
||||||
@@ -1103,13 +1119,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
MAP_SHARED, recv_msg->fd2, 0);
|
MAP_SHARED, recv_msg->fd2, 0);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
nb = 0;
|
if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd) != NXT_UNIT_OK)) {
|
||||||
|
|
||||||
if (nxt_slow_path(ioctl(recv_msg->fd, FIONBIO, &nb) == -1)) {
|
|
||||||
nxt_unit_alert(ctx, "#%"PRIu32": new_port: ioctl(%d, FIONBIO, 0) "
|
|
||||||
"failed: %s (%d)",
|
|
||||||
recv_msg->stream, recv_msg->fd, strerror(errno), errno);
|
|
||||||
|
|
||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -5919,6 +5929,24 @@ nxt_unit_close(int fd)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
nxt_unit_fd_blocking(int fd)
|
||||||
|
{
|
||||||
|
int nb;
|
||||||
|
|
||||||
|
nb = 0;
|
||||||
|
|
||||||
|
if (nxt_slow_path(ioctl(fd, FIONBIO, &nb) == -1)) {
|
||||||
|
nxt_unit_alert(NULL, "ioctl(%d, FIONBIO, 0) failed: %s (%d)",
|
||||||
|
fd, strerror(errno), errno);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t
|
static nxt_int_t
|
||||||
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
|
nxt_unit_port_hash_test(nxt_lvlhsh_query_t *lhq, void *data)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user