Style fixes for 2 file descriptors transfer over port.

Two consecutive fd and fd2 fields replaced with array.
This commit is contained in:
Max Romanov
2020-08-11 21:48:27 +03:00
parent acb0cca49d
commit f147943f63
7 changed files with 115 additions and 110 deletions

View File

@@ -1590,9 +1590,9 @@ nxt_controller_process_cert_save(nxt_task_t *task, nxt_port_recv_msg_t *msg,
mbuf = &c->read->mem; mbuf = &c->read->mem;
nxt_fd_write(msg->fd, mbuf->pos, nxt_buf_mem_used_size(mbuf)); nxt_fd_write(msg->fd[0], mbuf->pos, nxt_buf_mem_used_size(mbuf));
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
nxt_memzero(&resp, sizeof(nxt_controller_response_t)); nxt_memzero(&resp, sizeof(nxt_controller_response_t));

View File

@@ -261,15 +261,15 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
/* TODO check b size and make plain */ /* TODO check b size and make plain */
nxt_debug(task, "new port %d received for process %PI:%d", nxt_debug(task, "new port %d received for process %PI:%d",
msg->fd, new_port_msg->pid, new_port_msg->id); msg->fd[0], new_port_msg->pid, new_port_msg->id);
port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id); port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
if (port != NULL) { if (port != NULL) {
nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid, nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
new_port_msg->id); new_port_msg->id);
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
msg->fd = -1; msg->fd[0] = -1;
return; return;
} }
@@ -280,10 +280,10 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return; return;
} }
nxt_fd_nonblocking(task, msg->fd); nxt_fd_nonblocking(task, msg->fd[0]);
port->pair[0] = -1; port->pair[0] = -1;
port->pair[1] = msg->fd; port->pair[1] = msg->fd[0];
port->max_size = new_port_msg->max_size; port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share; port->max_share = new_port_msg->max_share;
@@ -319,11 +319,11 @@ nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_debug(task, "process %PI ready", msg->port_msg.pid); nxt_debug(task, "process %PI ready", msg->port_msg.pid);
if (msg->fd != -1) { if (msg->fd[0] != -1) {
port->queue_fd = msg->fd; port->queue_fd = msg->fd[0];
port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t), port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
PROT_READ | PROT_WRITE, MAP_SHARED, msg->fd, PROT_READ | PROT_WRITE, MAP_SHARED,
0); msg->fd[0], 0);
} }
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream); nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
@@ -338,7 +338,7 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
rt = task->thread->runtime; rt = task->thread->runtime;
if (nxt_slow_path(msg->fd == -1)) { if (nxt_slow_path(msg->fd[0] == -1)) {
nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message"); nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
return; return;
@@ -352,11 +352,11 @@ nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
goto fail_close; goto fail_close;
} }
nxt_port_incoming_port_mmap(task, process, msg->fd); nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
fail_close: fail_close:
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
} }
@@ -409,14 +409,14 @@ nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
log_file = nxt_list_elt(rt->log_files, slot); log_file = nxt_list_elt(rt->log_files, slot);
nxt_debug(task, "change log file %FD:%FD", msg->fd, log_file->fd); nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
/* /*
* The old log file descriptor must be closed at the moment when no * The old log file descriptor must be closed at the moment when no
* other threads use it. dup2() allows to use the old file descriptor * other threads use it. dup2() allows to use the old file descriptor
* for new log file. This change is performed atomically in the kernel. * for new log file. This change is performed atomically in the kernel.
*/ */
if (nxt_file_redirect(log_file, msg->fd) == NXT_OK) { if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
if (slot == 0) { if (slot == 0) {
(void) nxt_file_stderr(log_file); (void) nxt_file_stderr(log_file);
} }

View File

@@ -173,8 +173,7 @@ typedef struct {
nxt_queue_link_t link; nxt_queue_link_t link;
nxt_buf_t *buf; nxt_buf_t *buf;
size_t share; size_t share;
nxt_fd_t fd; nxt_fd_t fd[2];
nxt_fd_t fd2;
nxt_port_msg_t port_msg; nxt_port_msg_t port_msg;
uint32_t tracking_msg[2]; uint32_t tracking_msg[2];
uint8_t close_fd; /* 1 bit */ uint8_t close_fd; /* 1 bit */
@@ -183,8 +182,7 @@ typedef struct {
struct nxt_port_recv_msg_s { struct nxt_port_recv_msg_s {
nxt_fd_t fd; nxt_fd_t fd[2];
nxt_fd_t fd2;
nxt_buf_t *buf; nxt_buf_t *buf;
nxt_port_t *port; nxt_port_t *port;
nxt_port_msg_t port_msg; nxt_port_msg_t port_msg;

View File

@@ -389,7 +389,8 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
nxt_memzero(&msg, sizeof(msg)); nxt_memzero(&msg, sizeof(msg));
nxt_memzero(&buf, sizeof(buf)); nxt_memzero(&buf, sizeof(buf));
msg.fd = -1; msg.fd[0] = -1;
msg.fd[1] = -1;
msg.buf = &buf; msg.buf = &buf;
msg.port = port; msg.port = port;
@@ -500,7 +501,8 @@ nxt_port_rpc_close(nxt_task_t *task, nxt_port_t *port)
return; return;
} }
msg.fd = -1; msg.fd[0] = -1;
msg.fd[1] = -1;
msg.buf = &nxt_port_close_dummy_buf; msg.buf = &nxt_port_close_dummy_buf;
msg.port = port; msg.port = port;
msg.port_msg.stream = reg->stream; msg.port_msg.stream = reg->stream;

View File

@@ -161,8 +161,8 @@ nxt_port_socket_write2(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg.buf = b; msg.buf = b;
msg.share = 0; msg.share = 0;
msg.fd = fd; msg.fd[0] = fd;
msg.fd2 = fd2; msg.fd[1] = fd2;
msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0; msg.close_fd = (type & NXT_PORT_MSG_CLOSE_FD) != 0;
msg.allocated = 0; msg.allocated = 0;
@@ -365,7 +365,7 @@ next_fragment:
msg->port_msg.last |= sb.last; msg->port_msg.last |= sb.last;
msg->port_msg.mf = sb.limit_reached || sb.nmax_reached; msg->port_msg.mf = sb.limit_reached || sb.nmax_reached;
n = nxt_socketpair_send(&port->socket, &msg->fd, iov, sb.niov + 1); n = nxt_socketpair_send(&port->socket, msg->fd, iov, sb.niov + 1);
if (n > 0) { if (n > 0) {
if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) { if (nxt_slow_path((size_t) n != sb.size + iov[0].iov_len)) {
@@ -374,16 +374,18 @@ next_fragment:
goto fail; goto fail;
} }
if (msg->fd != -1 && msg->close_fd != 0) { if (msg->close_fd) {
nxt_fd_close(msg->fd); if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
msg->fd = -1; msg->fd[0] = -1;
} }
if (msg->fd2 != -1 && msg->close_fd != 0) { if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd2); nxt_fd_close(msg->fd[1]);
msg->fd2 = -1; msg->fd[1] = -1;
}
} }
msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size, msg->buf = nxt_port_buf_completion(task, wq, msg->buf, plain_size,
@@ -397,8 +399,8 @@ next_fragment:
* A file descriptor is sent only * A file descriptor is sent only
* in the first message of a stream. * in the first message of a stream.
*/ */
msg->fd = -1; msg->fd[0] = -1;
msg->fd2 = -1; msg->fd[1] = -1;
msg->share += n; msg->share += n;
msg->port_msg.nf = 1; msg->port_msg.nf = 1;
@@ -654,7 +656,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
iov[1].iov_base = b->mem.pos; iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size; iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
if (n > 0) { if (n > 0) {
@@ -750,8 +752,8 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
msg.port_msg = smsg->port_msg; msg.port_msg = smsg->port_msg;
b = smsg->buf; b = smsg->buf;
n = smsg->size; n = smsg->size;
msg.fd = smsg->fd; msg.fd[0] = smsg->fd[0];
msg.fd2 = smsg->fd2; msg.fd[1] = smsg->fd[1];
smsg->size = 0; smsg->size = 0;
@@ -793,7 +795,7 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
iov[1].iov_base = b->mem.pos; iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size; iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, &msg.fd, iov, 2); n = nxt_socketpair_recv(&port->socket, msg.fd, iov, 2);
if (n == (ssize_t) sizeof(nxt_port_msg_t) if (n == (ssize_t) sizeof(nxt_port_msg_t)
&& msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE) && msg.port_msg.type == _NXT_PORT_MSG_READ_QUEUE)
@@ -848,8 +850,8 @@ nxt_port_queue_read_handler(nxt_task_t *task, void *obj, void *data)
smsg->port_msg = msg.port_msg; smsg->port_msg = msg.port_msg;
smsg->buf = b; smsg->buf = b;
smsg->size = n; smsg->size = n;
smsg->fd = msg.fd; smsg->fd[0] = msg.fd[0];
smsg->fd2 = msg.fd2; smsg->fd[1] = msg.fd[1];
continue; continue;
} }
@@ -1048,12 +1050,12 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
nxt_alert(task, "port %d: too small message:%uz", nxt_alert(task, "port %d: too small message:%uz",
port->socket.fd, msg->size); port->socket.fd, msg->size);
if (msg->fd != -1) { if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
} }
if (msg->fd2 != -1) { if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd2); nxt_fd_close(msg->fd[1]);
} }
return; return;
@@ -1094,8 +1096,8 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
port->handler(task, fmsg); port->handler(task, fmsg);
msg->buf = fmsg->buf; msg->buf = fmsg->buf;
msg->fd = fmsg->fd; msg->fd[0] = fmsg->fd[0];
msg->fd2 = fmsg->fd2; msg->fd[1] = fmsg->fd[1];
/* /*
* To disable instant completion or buffer re-usage, * To disable instant completion or buffer re-usage,
@@ -1129,17 +1131,17 @@ nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
if (nxt_fast_path(msg->cancelled == 0)) { if (nxt_fast_path(msg->cancelled == 0)) {
msg->buf = NULL; msg->buf = NULL;
msg->fd = -1; msg->fd[0] = -1;
msg->fd2 = -1; msg->fd[1] = -1;
b = NULL; b = NULL;
} else { } else {
if (msg->fd != -1) { if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
} }
if (msg->fd2 != -1) { if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd2); nxt_fd_close(msg->fd[1]);
} }
} }
} else { } else {
@@ -1240,16 +1242,18 @@ nxt_port_error_handler(nxt_task_t *task, void *obj, void *data)
nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) { nxt_queue_each(msg, &port->messages, nxt_port_send_msg_t, link) {
if (msg->fd != -1 && msg->close_fd != 0) { if (msg->close_fd) {
nxt_fd_close(msg->fd); if (msg->fd[0] != -1) {
nxt_fd_close(msg->fd[0]);
msg->fd = -1; msg->fd[0] = -1;
} }
if (msg->fd2 != -1 && msg->close_fd != 0) { if (msg->fd[1] != -1) {
nxt_fd_close(msg->fd2); nxt_fd_close(msg->fd[1]);
msg->fd2 = -1; msg->fd[1] = -1;
}
} }
for (b = msg->buf; b != NULL; b = next) { for (b = msg->buf; b != NULL; b = next) {

View File

@@ -607,14 +607,14 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR; msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
} else { } else {
if (msg->fd2 != -1) { if (msg->fd[1] != -1) {
res = nxt_router_port_queue_map(task, port, msg->fd2); res = nxt_router_port_queue_map(task, port, msg->fd[1]);
if (nxt_slow_path(res != NXT_OK)) { if (nxt_slow_path(res != NXT_OK)) {
return; return;
} }
nxt_fd_close(msg->fd2); nxt_fd_close(msg->fd[1]);
msg->fd2 = -1; msg->fd[1] = -1;
} }
} }
@@ -669,7 +669,7 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return; return;
} }
if (nxt_slow_path(msg->fd == -1)) { if (nxt_slow_path(msg->fd[0] == -1)) {
nxt_alert(task, "conf_data_handler: invalid file shm fd"); nxt_alert(task, "conf_data_handler: invalid file shm fd");
return; return;
} }
@@ -678,18 +678,18 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)", nxt_alert(task, "conf_data_handler: unexpected buffer size (%d)",
(int) nxt_buf_mem_used_size(&msg->buf->mem)); (int) nxt_buf_mem_used_size(&msg->buf->mem));
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
msg->fd = -1; msg->fd[0] = -1;
return; return;
} }
nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t)); nxt_memcpy(&size, msg->buf->mem.pos, sizeof(size_t));
p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd, 0); p = nxt_mem_mmap(NULL, size, PROT_READ, MAP_SHARED, msg->fd[0], 0);
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
msg->fd = -1; msg->fd[0] = -1;
if (nxt_slow_path(p == MAP_FAILED)) { if (nxt_slow_path(p == MAP_FAILED)) {
return; return;
@@ -2133,7 +2133,7 @@ nxt_router_listen_socket_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
rpc = data; rpc = data;
s = msg->fd; s = msg->fd[0];
ret = nxt_socket_nonblocking(task, s); ret = nxt_socket_nonblocking(task, s);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
@@ -2271,7 +2271,7 @@ nxt_router_tls_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
goto fail; goto fail;
} }
tlscf->chain_file = msg->fd; tlscf->chain_file = msg->fd[0];
ret = task->thread->runtime->tls->server_init(task, tlscf); ret = task->thread->runtime->tls->server_init(task, tlscf);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
@@ -3392,7 +3392,7 @@ nxt_router_access_log_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
access_log = tmcf->router_conf->access_log; access_log = tmcf->router_conf->access_log;
access_log->fd = msg->fd; access_log->fd = msg->fd[0];
nxt_work_queue_add(&task->thread->engine->fast_work_queue, nxt_work_queue_add(&task->thread->engine->fast_work_queue,
nxt_router_conf_apply, task, tmcf, NULL); nxt_router_conf_apply, task, tmcf, NULL);
@@ -3541,13 +3541,13 @@ nxt_router_access_log_reopen_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
if (access_log == nxt_router->access_log) { if (access_log == nxt_router->access_log) {
if (nxt_slow_path(dup2(msg->fd, access_log->fd) == -1)) { if (nxt_slow_path(dup2(msg->fd[0], access_log->fd) == -1)) {
nxt_alert(task, "dup2(%FD, %FD) failed %E", nxt_alert(task, "dup2(%FD, %FD) failed %E",
msg->fd, access_log->fd, nxt_errno); msg->fd[0], access_log->fd, nxt_errno);
} }
} }
nxt_fd_close(msg->fd); nxt_fd_close(msg->fd[0]);
nxt_mp_release(reopen->mem_pool); nxt_mp_release(reopen->mem_pool);
} }

View File

@@ -211,8 +211,7 @@ struct nxt_unit_recv_msg_s {
void *start; void *start;
uint32_t size; uint32_t size;
int fd; int fd[2];
int fd2;
nxt_unit_mmap_buf_t *incoming_buf; nxt_unit_mmap_buf_t *incoming_buf;
}; };
@@ -900,8 +899,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
rc = NXT_UNIT_ERROR; rc = NXT_UNIT_ERROR;
recv_msg.fd = -1; recv_msg.fd[0] = -1;
recv_msg.fd2 = -1; recv_msg.fd[1] = -1;
port_msg = (nxt_port_msg_t *) rbuf->buf; port_msg = (nxt_port_msg_t *) rbuf->buf;
cm = (struct cmsghdr *) rbuf->oob; cm = (struct cmsghdr *) rbuf->oob;
@@ -909,11 +908,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
&& cm->cmsg_type == SCM_RIGHTS) && cm->cmsg_type == SCM_RIGHTS)
{ {
if (cm->cmsg_len == CMSG_LEN(sizeof(int))) { if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int)); memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
} }
if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) { if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
memcpy(&recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2); memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
} }
} }
@@ -933,9 +932,9 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
goto fail; goto fail;
} }
nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd %d fd2 %d", nxt_unit_debug(ctx, "#%"PRIu32": process message %d fd[0] %d fd[1] %d",
port_msg->stream, (int) port_msg->type, port_msg->stream, (int) port_msg->type,
recv_msg.fd, recv_msg.fd2); recv_msg.fd[0], recv_msg.fd[1]);
recv_msg.stream = port_msg->stream; recv_msg.stream = port_msg->stream;
recv_msg.pid = port_msg->pid; recv_msg.pid = port_msg->pid;
@@ -964,8 +963,8 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (nxt_slow_path(rc != NXT_UNIT_OK)) {
if (rc == NXT_UNIT_AGAIN) { if (rc == NXT_UNIT_AGAIN) {
recv_msg.fd = -1; recv_msg.fd[0] = -1;
recv_msg.fd2 = -1; recv_msg.fd[1] = -1;
} }
goto fail; goto fail;
@@ -987,11 +986,11 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
case _NXT_PORT_MSG_CHANGE_FILE: case _NXT_PORT_MSG_CHANGE_FILE:
nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d", nxt_unit_debug(ctx, "#%"PRIu32": change_file: fd %d",
port_msg->stream, recv_msg.fd); port_msg->stream, recv_msg.fd[0]);
if (dup2(recv_msg.fd, lib->log_fd) == -1) { if (dup2(recv_msg.fd[0], lib->log_fd) == -1) {
nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)", nxt_unit_alert(ctx, "#%"PRIu32": dup2(%d, %d) failed: %s (%d)",
port_msg->stream, recv_msg.fd, lib->log_fd, port_msg->stream, recv_msg.fd[0], lib->log_fd,
strerror(errno), errno); strerror(errno), errno);
goto fail; goto fail;
@@ -1001,14 +1000,14 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
break; break;
case _NXT_PORT_MSG_MMAP: case _NXT_PORT_MSG_MMAP:
if (nxt_slow_path(recv_msg.fd < 0)) { if (nxt_slow_path(recv_msg.fd[0] < 0)) {
nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap", nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for mmap",
port_msg->stream, recv_msg.fd); port_msg->stream, recv_msg.fd[0]);
goto fail; goto fail;
} }
rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd); rc = nxt_unit_incoming_mmap(ctx, port_msg->pid, recv_msg.fd[0]);
break; break;
case _NXT_PORT_MSG_REQ_HEADERS: case _NXT_PORT_MSG_REQ_HEADERS:
@@ -1055,12 +1054,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
fail: fail:
if (recv_msg.fd != -1) { if (recv_msg.fd[0] != -1) {
nxt_unit_close(recv_msg.fd); nxt_unit_close(recv_msg.fd[0]);
} }
if (recv_msg.fd2 != -1) { if (recv_msg.fd[1] != -1) {
nxt_unit_close(recv_msg.fd2); nxt_unit_close(recv_msg.fd[1]);
} }
while (recv_msg.incoming_buf != NULL) { while (recv_msg.incoming_buf != NULL) {
@@ -1094,32 +1093,34 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
if (nxt_slow_path(recv_msg->fd < 0)) { if (nxt_slow_path(recv_msg->fd[0] < 0)) {
nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port", nxt_unit_alert(ctx, "#%"PRIu32": invalid fd %d for new port",
recv_msg->stream, recv_msg->fd); recv_msg->stream, recv_msg->fd[0]);
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
new_port_msg = recv_msg->start; new_port_msg = recv_msg->start;
nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd %d fd2 %d", nxt_unit_debug(ctx, "#%"PRIu32": new_port: port{%d,%d} fd[0] %d fd[1] %d",
recv_msg->stream, (int) new_port_msg->pid, recv_msg->stream, (int) new_port_msg->pid,
(int) new_port_msg->id, recv_msg->fd, recv_msg->fd2); (int) new_port_msg->id, recv_msg->fd[0], recv_msg->fd[1]);
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
if (new_port_msg->id == (nxt_port_id_t) -1) { if (new_port_msg->id == (nxt_port_id_t) -1) {
nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id); nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
new_port.in_fd = recv_msg->fd; new_port.in_fd = recv_msg->fd[0];
new_port.out_fd = -1; new_port.out_fd = -1;
mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE, mem = mmap(NULL, sizeof(nxt_app_queue_t), PROT_READ | PROT_WRITE,
MAP_SHARED, recv_msg->fd2, 0); MAP_SHARED, recv_msg->fd[1], 0);
} else { } else {
if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd) != NXT_UNIT_OK)) { if (nxt_slow_path(nxt_unit_fd_blocking(recv_msg->fd[0])
!= NXT_UNIT_OK))
{
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
@@ -1127,14 +1128,14 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port_msg->id); new_port_msg->id);
new_port.in_fd = -1; new_port.in_fd = -1;
new_port.out_fd = recv_msg->fd; new_port.out_fd = recv_msg->fd[0];
mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE, mem = mmap(NULL, sizeof(nxt_port_queue_t), PROT_READ | PROT_WRITE,
MAP_SHARED, recv_msg->fd2, 0); MAP_SHARED, recv_msg->fd[1], 0);
} }
if (nxt_slow_path(mem == MAP_FAILED)) { if (nxt_slow_path(mem == MAP_FAILED)) {
nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd2, nxt_unit_alert(ctx, "mmap(%d) failed: %s (%d)", recv_msg->fd[1],
strerror(errno), errno); strerror(errno), errno);
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
@@ -1142,7 +1143,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
new_port.data = NULL; new_port.data = NULL;
recv_msg->fd = -1; recv_msg->fd[0] = -1;
port = nxt_unit_add_port(ctx, &new_port, mem); port = nxt_unit_add_port(ctx, &new_port, mem);
if (nxt_slow_path(port == NULL)) { if (nxt_slow_path(port == NULL)) {
@@ -1224,8 +1225,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
req_impl->incoming_buf->prev = &req_impl->incoming_buf; req_impl->incoming_buf->prev = &req_impl->incoming_buf;
recv_msg->incoming_buf = NULL; recv_msg->incoming_buf = NULL;
req->content_fd = recv_msg->fd; req->content_fd = recv_msg->fd[0];
recv_msg->fd = -1; recv_msg->fd[0] = -1;
req->response_max_fields = 0; req->response_max_fields = 0;
req_impl->state = NXT_UNIT_RS_START; req_impl->state = NXT_UNIT_RS_START;
@@ -1312,8 +1313,8 @@ nxt_unit_process_req_body(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
recv_msg->incoming_buf = NULL; recv_msg->incoming_buf = NULL;
} }
req->content_fd = recv_msg->fd; req->content_fd = recv_msg->fd[0];
recv_msg->fd = -1; recv_msg->fd[0] = -1;
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit); lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);