Introduced SCM_CREDENTIALS / SCM_CREDS in the socket control msgs.
This commit is contained in:
212
src/nxt_unit.c
212
src/nxt_unit.c
@@ -3,10 +3,9 @@
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "nxt_main.h"
|
||||
#include "nxt_port_memory_int.h"
|
||||
#include "nxt_socket_msg.h"
|
||||
#include "nxt_port_queue.h"
|
||||
#include "nxt_app_queue.h"
|
||||
|
||||
@@ -168,9 +167,9 @@ static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
|
||||
static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
|
||||
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_t *port, const void *buf, size_t buf_size,
|
||||
const void *oob, size_t oob_size);
|
||||
const nxt_send_oob_t *oob);
|
||||
static ssize_t nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
|
||||
const void *buf, size_t buf_size, const void *oob, size_t oob_size);
|
||||
const void *buf, size_t buf_size, const nxt_send_oob_t *oob);
|
||||
static int nxt_unit_ctx_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
nxt_unit_read_buf_t *rbuf);
|
||||
nxt_inline void nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst,
|
||||
@@ -278,8 +277,8 @@ struct nxt_unit_read_buf_s {
|
||||
nxt_queue_link_t link;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
ssize_t size;
|
||||
nxt_recv_oob_t oob;
|
||||
char buf[16384];
|
||||
char oob[256];
|
||||
};
|
||||
|
||||
|
||||
@@ -891,13 +890,10 @@ static int
|
||||
nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
|
||||
{
|
||||
ssize_t res;
|
||||
nxt_send_oob_t oob;
|
||||
nxt_port_msg_t msg;
|
||||
nxt_unit_impl_t *lib;
|
||||
|
||||
union {
|
||||
struct cmsghdr cm;
|
||||
char space[CMSG_SPACE(sizeof(int))];
|
||||
} cmsg;
|
||||
int fds[2] = {queue_fd, -1};
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
@@ -911,25 +907,9 @@ nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream, int queue_fd)
|
||||
msg.mf = 0;
|
||||
msg.tracking = 0;
|
||||
|
||||
memset(&cmsg, 0, sizeof(cmsg));
|
||||
nxt_socket_msg_oob_init(&oob, fds);
|
||||
|
||||
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
|
||||
cmsg.cm.cmsg_level = SOL_SOCKET;
|
||||
cmsg.cm.cmsg_type = SCM_RIGHTS;
|
||||
|
||||
/*
|
||||
* memcpy() is used instead of simple
|
||||
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
|
||||
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
|
||||
* dereferencing type-punned pointer will break strict-aliasing rules
|
||||
*
|
||||
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
|
||||
* in the same simple assignment as in the code above.
|
||||
*/
|
||||
memcpy(CMSG_DATA(&cmsg.cm), &queue_fd, sizeof(int));
|
||||
|
||||
res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg),
|
||||
&cmsg, sizeof(cmsg));
|
||||
res = nxt_unit_sendmsg(ctx, ready_fd, &msg, sizeof(msg), &oob);
|
||||
if (res != sizeof(msg)) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -945,7 +925,6 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
|
||||
int rc;
|
||||
pid_t pid;
|
||||
uint8_t quit_param;
|
||||
struct cmsghdr *cm;
|
||||
nxt_port_msg_t *port_msg;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_recv_msg_t recv_msg;
|
||||
@@ -955,18 +934,12 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
|
||||
recv_msg.fd[0] = -1;
|
||||
recv_msg.fd[1] = -1;
|
||||
port_msg = (nxt_port_msg_t *) rbuf->buf;
|
||||
cm = (struct cmsghdr *) rbuf->oob;
|
||||
|
||||
if (cm->cmsg_level == SOL_SOCKET
|
||||
&& cm->cmsg_type == SCM_RIGHTS)
|
||||
{
|
||||
if (cm->cmsg_len == CMSG_LEN(sizeof(int))) {
|
||||
memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int));
|
||||
}
|
||||
|
||||
if (cm->cmsg_len == CMSG_LEN(sizeof(int) * 2)) {
|
||||
memcpy(recv_msg.fd, CMSG_DATA(cm), sizeof(int) * 2);
|
||||
}
|
||||
rc = nxt_socket_msg_oob_get_fds(&rbuf->oob, recv_msg.fd);
|
||||
if (nxt_slow_path(rc != NXT_OK)) {
|
||||
nxt_unit_alert(ctx, "failed to receive file descriptor over cmsg");
|
||||
rc = NXT_UNIT_ERROR;
|
||||
goto done;
|
||||
}
|
||||
|
||||
recv_msg.incoming_buf = NULL;
|
||||
@@ -1607,7 +1580,7 @@ nxt_unit_send_req_headers_ack(nxt_unit_request_info_t *req)
|
||||
msg.type = _NXT_PORT_MSG_REQ_HEADERS_ACK;
|
||||
|
||||
res = nxt_unit_port_send(req->ctx, req->response_port,
|
||||
&msg, sizeof(msg), NULL, 0);
|
||||
&msg, sizeof(msg), NULL);
|
||||
if (nxt_slow_path(res != sizeof(msg))) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -2673,7 +2646,7 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
|
||||
(int) m.mmap_msg.size);
|
||||
|
||||
res = nxt_unit_port_send(req->ctx, req->response_port, &m, sizeof(m),
|
||||
NULL, 0);
|
||||
NULL);
|
||||
if (nxt_slow_path(res != sizeof(m))) {
|
||||
goto free_buf;
|
||||
}
|
||||
@@ -2725,8 +2698,8 @@ nxt_unit_mmap_buf_send(nxt_unit_request_info_t *req,
|
||||
|
||||
res = nxt_unit_port_send(req->ctx, req->response_port,
|
||||
buf->start - sizeof(m.msg),
|
||||
m.mmap_msg.size + sizeof(m.msg),
|
||||
NULL, 0);
|
||||
m.mmap_msg.size + sizeof(m.msg), NULL);
|
||||
|
||||
if (nxt_slow_path(res != (ssize_t) (m.mmap_msg.size + sizeof(m.msg)))) {
|
||||
goto free_buf;
|
||||
}
|
||||
@@ -2793,7 +2766,7 @@ nxt_unit_read_buf_get(nxt_unit_ctx_t *ctx)
|
||||
|
||||
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||
|
||||
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
|
||||
rbuf->oob.size = 0;
|
||||
|
||||
return rbuf;
|
||||
}
|
||||
@@ -3312,7 +3285,7 @@ skip_response_send:
|
||||
msg.tracking = 0;
|
||||
|
||||
(void) nxt_unit_port_send(req->ctx, req->response_port,
|
||||
&msg, sizeof(msg), NULL, 0);
|
||||
&msg, sizeof(msg), NULL);
|
||||
|
||||
nxt_unit_request_info_release(req);
|
||||
}
|
||||
@@ -3634,7 +3607,7 @@ nxt_unit_send_oosm(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
msg.mf = 0;
|
||||
msg.tracking = 0;
|
||||
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
|
||||
if (nxt_slow_path(res != sizeof(msg))) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -3903,12 +3876,10 @@ static int
|
||||
nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
|
||||
{
|
||||
ssize_t res;
|
||||
nxt_send_oob_t oob;
|
||||
nxt_port_msg_t msg;
|
||||
nxt_unit_impl_t *lib;
|
||||
union {
|
||||
struct cmsghdr cm;
|
||||
char space[CMSG_SPACE(sizeof(int))];
|
||||
} cmsg;
|
||||
int fds[2] = {fd, -1};
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
@@ -3922,30 +3893,9 @@ nxt_unit_send_mmap(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, int fd)
|
||||
msg.mf = 0;
|
||||
msg.tracking = 0;
|
||||
|
||||
/*
|
||||
* Fill all padding fields with 0.
|
||||
* Code in Go 1.11 validate cmsghdr using padding field as part of len.
|
||||
* See Cmsghdr definition and socketControlMessageHeaderAndData function.
|
||||
*/
|
||||
memset(&cmsg, 0, sizeof(cmsg));
|
||||
nxt_socket_msg_oob_init(&oob, fds);
|
||||
|
||||
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int));
|
||||
cmsg.cm.cmsg_level = SOL_SOCKET;
|
||||
cmsg.cm.cmsg_type = SCM_RIGHTS;
|
||||
|
||||
/*
|
||||
* memcpy() is used instead of simple
|
||||
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
|
||||
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
|
||||
* dereferencing type-punned pointer will break strict-aliasing rules
|
||||
*
|
||||
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
|
||||
* in the same simple assignment as in the code above.
|
||||
*/
|
||||
memcpy(CMSG_DATA(&cmsg.cm), &fd, sizeof(int));
|
||||
|
||||
res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg),
|
||||
&cmsg, sizeof(cmsg));
|
||||
res = nxt_unit_port_send(ctx, port, &msg, sizeof(msg), &oob);
|
||||
if (nxt_slow_path(res != sizeof(msg))) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -4135,7 +4085,7 @@ nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
|
||||
msg.type = _NXT_PORT_MSG_RPC_READY;
|
||||
|
||||
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
|
||||
&msg, sizeof(msg), NULL, 0);
|
||||
&msg, sizeof(msg), NULL);
|
||||
}
|
||||
|
||||
|
||||
@@ -4358,7 +4308,7 @@ nxt_unit_get_mmap(nxt_unit_ctx_t *ctx, pid_t pid, uint32_t id)
|
||||
|
||||
nxt_unit_debug(ctx, "get_mmap: %d %d", (int) pid, (int) id);
|
||||
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
|
||||
if (nxt_slow_path(res != sizeof(m))) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -4428,7 +4378,7 @@ nxt_unit_send_shm_ack(nxt_unit_ctx_t *ctx, pid_t pid)
|
||||
msg.mf = 0;
|
||||
msg.tracking = 0;
|
||||
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL, 0);
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &msg, sizeof(msg), NULL);
|
||||
if (nxt_slow_path(res != sizeof(msg))) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -4716,7 +4666,7 @@ retry:
|
||||
return (err == EAGAIN) ? NXT_UNIT_AGAIN : NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04uXi, %04uXi]",
|
||||
nxt_unit_debug(ctx, "poll(%d,%d): %d, revents [%04X, %04X]",
|
||||
fds[0].fd, fds[1].fd, nevents, fds[0].revents,
|
||||
fds[1].revents);
|
||||
|
||||
@@ -5315,6 +5265,24 @@ nxt_unit_create_port(nxt_unit_ctx_t *ctx)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
#if (NXT_HAVE_SOCKOPT_SO_PASSCRED)
|
||||
int enable_creds = 1;
|
||||
|
||||
if (nxt_slow_path(setsockopt(port_sockets[0], SOL_SOCKET, SO_PASSCRED,
|
||||
&enable_creds, sizeof(enable_creds)) == -1))
|
||||
{
|
||||
nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(setsockopt(port_sockets[1], SOL_SOCKET, SO_PASSCRED,
|
||||
&enable_creds, sizeof(enable_creds)) == -1))
|
||||
{
|
||||
nxt_unit_warn(ctx, "failed to set SO_PASSCRED %s", strerror(errno));
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
nxt_unit_debug(ctx, "create_port: new socketpair: %d->%d",
|
||||
port_sockets[0], port_sockets[1]);
|
||||
|
||||
@@ -5355,6 +5323,7 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
|
||||
nxt_unit_port_t *port, int queue_fd)
|
||||
{
|
||||
ssize_t res;
|
||||
nxt_send_oob_t oob;
|
||||
nxt_unit_impl_t *lib;
|
||||
int fds[2] = { port->out_fd, queue_fd };
|
||||
|
||||
@@ -5363,11 +5332,6 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
|
||||
nxt_port_msg_new_port_t new_port;
|
||||
} m;
|
||||
|
||||
union {
|
||||
struct cmsghdr cm;
|
||||
char space[CMSG_SPACE(sizeof(int) * 2)];
|
||||
} cmsg;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
m.msg.stream = 0;
|
||||
@@ -5386,24 +5350,9 @@ nxt_unit_send_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *dst,
|
||||
m.new_port.max_size = 16 * 1024;
|
||||
m.new_port.max_share = 64 * 1024;
|
||||
|
||||
memset(&cmsg, 0, sizeof(cmsg));
|
||||
nxt_socket_msg_oob_init(&oob, fds);
|
||||
|
||||
cmsg.cm.cmsg_len = CMSG_LEN(sizeof(int) * 2);
|
||||
cmsg.cm.cmsg_level = SOL_SOCKET;
|
||||
cmsg.cm.cmsg_type = SCM_RIGHTS;
|
||||
|
||||
/*
|
||||
* memcpy() is used instead of simple
|
||||
* *(int *) CMSG_DATA(&cmsg.cm) = fd;
|
||||
* because GCC 4.4 with -O2/3/s optimization may issue a warning:
|
||||
* dereferencing type-punned pointer will break strict-aliasing rules
|
||||
*
|
||||
* Fortunately, GCC with -O1 compiles this nxt_memcpy()
|
||||
* in the same simple assignment as in the code above.
|
||||
*/
|
||||
memcpy(CMSG_DATA(&cmsg.cm), fds, sizeof(int) * 2);
|
||||
|
||||
res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &cmsg, sizeof(cmsg));
|
||||
res = nxt_unit_port_send(ctx, dst, &m, sizeof(m), &oob);
|
||||
|
||||
return (res == sizeof(m)) ? NXT_UNIT_OK : NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -5885,7 +5834,7 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
|
||||
}
|
||||
|
||||
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
|
||||
&m, sizeof(m), NULL, 0);
|
||||
&m, sizeof(m), NULL);
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
@@ -5920,7 +5869,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
|
||||
nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
|
||||
(int) port_id->id);
|
||||
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
|
||||
res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL);
|
||||
if (nxt_slow_path(res != sizeof(m))) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -5931,7 +5880,7 @@ nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
|
||||
|
||||
static ssize_t
|
||||
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
|
||||
const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
|
||||
{
|
||||
int notify;
|
||||
ssize_t ret;
|
||||
@@ -5943,7 +5892,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
|
||||
if (port_impl->queue != NULL && oob_size == 0
|
||||
if (port_impl->queue != NULL && (oob == NULL || oob->size == 0)
|
||||
&& buf_size <= NXT_PORT_QUEUE_MSG_SIZE)
|
||||
{
|
||||
rc = nxt_port_queue_send(port_impl->queue, buf, buf_size, ¬ify);
|
||||
@@ -5965,7 +5914,7 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
|
||||
if (lib->callbacks.port_send == NULL) {
|
||||
ret = nxt_unit_sendmsg(ctx, port->out_fd, &msg,
|
||||
sizeof(nxt_port_msg_t), NULL, 0);
|
||||
sizeof(nxt_port_msg_t), NULL);
|
||||
|
||||
nxt_unit_debug(ctx, "port{%d,%d} send %d read_queue",
|
||||
(int) port->id.pid, (int) port->id.id,
|
||||
@@ -6002,15 +5951,15 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
|
||||
if (lib->callbacks.port_send != NULL) {
|
||||
ret = lib->callbacks.port_send(ctx, port, buf, buf_size,
|
||||
oob, oob_size);
|
||||
oob != NULL ? oob->buf : NULL,
|
||||
oob != NULL ? oob->size : 0);
|
||||
|
||||
nxt_unit_debug(ctx, "port{%d,%d} sendcb %d",
|
||||
(int) port->id.pid, (int) port->id.id,
|
||||
(int) ret);
|
||||
|
||||
} else {
|
||||
ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size,
|
||||
oob, oob_size);
|
||||
ret = nxt_unit_sendmsg(ctx, port->out_fd, buf, buf_size, oob);
|
||||
|
||||
nxt_unit_debug(ctx, "port{%d,%d} sendmsg %d",
|
||||
(int) port->id.pid, (int) port->id.id,
|
||||
@@ -6023,29 +5972,20 @@ nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
|
||||
static ssize_t
|
||||
nxt_unit_sendmsg(nxt_unit_ctx_t *ctx, int fd,
|
||||
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
|
||||
const void *buf, size_t buf_size, const nxt_send_oob_t *oob)
|
||||
{
|
||||
int err;
|
||||
ssize_t res;
|
||||
ssize_t n;
|
||||
struct iovec iov[1];
|
||||
struct msghdr msg;
|
||||
|
||||
iov[0].iov_base = (void *) buf;
|
||||
iov[0].iov_len = buf_size;
|
||||
|
||||
msg.msg_name = NULL;
|
||||
msg.msg_namelen = 0;
|
||||
msg.msg_iov = iov;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_flags = 0;
|
||||
msg.msg_control = (void *) oob;
|
||||
msg.msg_controllen = oob_size;
|
||||
|
||||
retry:
|
||||
|
||||
res = sendmsg(fd, &msg, 0);
|
||||
n = nxt_sendmsg(fd, iov, 1, oob);
|
||||
|
||||
if (nxt_slow_path(res == -1)) {
|
||||
if (nxt_slow_path(n == -1)) {
|
||||
err = errno;
|
||||
|
||||
if (err == EINTR) {
|
||||
@@ -6060,11 +6000,11 @@ retry:
|
||||
fd, (int) buf_size, strerror(err), err);
|
||||
|
||||
} else {
|
||||
nxt_unit_debug(ctx, "sendmsg(%d, %d): %d", fd, (int) buf_size,
|
||||
(int) res);
|
||||
nxt_unit_debug(ctx, "sendmsg(%d, %d, %d): %d", fd, (int) buf_size,
|
||||
(oob != NULL ? (int) oob->size : 0), (int) n);
|
||||
}
|
||||
|
||||
return res;
|
||||
return n;
|
||||
}
|
||||
|
||||
|
||||
@@ -6173,7 +6113,7 @@ retry:
|
||||
|
||||
nxt_unit_rbuf_cpy(port_impl->socket_rbuf, rbuf);
|
||||
|
||||
memset(rbuf->oob, 0, sizeof(struct cmsghdr));
|
||||
rbuf->oob.size = 0;
|
||||
|
||||
goto retry;
|
||||
}
|
||||
@@ -6184,7 +6124,8 @@ nxt_unit_rbuf_cpy(nxt_unit_read_buf_t *dst, nxt_unit_read_buf_t *src)
|
||||
{
|
||||
memcpy(dst->buf, src->buf, src->size);
|
||||
dst->size = src->size;
|
||||
memcpy(dst->oob, src->oob, sizeof(src->oob));
|
||||
dst->oob.size = src->oob.size;
|
||||
memcpy(dst->oob.buf, src->oob.buf, src->oob.size);
|
||||
}
|
||||
|
||||
|
||||
@@ -6230,16 +6171,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
nxt_unit_read_buf_t *rbuf)
|
||||
{
|
||||
int fd, err;
|
||||
size_t oob_size;
|
||||
struct iovec iov[1];
|
||||
struct msghdr msg;
|
||||
nxt_unit_impl_t *lib;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
if (lib->callbacks.port_recv != NULL) {
|
||||
oob_size = sizeof(rbuf->oob.buf);
|
||||
|
||||
rbuf->size = lib->callbacks.port_recv(ctx, port,
|
||||
rbuf->buf, sizeof(rbuf->buf),
|
||||
rbuf->oob, sizeof(rbuf->oob));
|
||||
rbuf->oob.buf, &oob_size);
|
||||
|
||||
nxt_unit_debug(ctx, "port{%d,%d} recvcb %d",
|
||||
(int) port->id.pid, (int) port->id.id, (int) rbuf->size);
|
||||
@@ -6248,25 +6191,18 @@ nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
rbuf->oob.size = oob_size;
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
iov[0].iov_base = rbuf->buf;
|
||||
iov[0].iov_len = sizeof(rbuf->buf);
|
||||
|
||||
msg.msg_name = NULL;
|
||||
msg.msg_namelen = 0;
|
||||
msg.msg_iov = iov;
|
||||
msg.msg_iovlen = 1;
|
||||
msg.msg_flags = 0;
|
||||
msg.msg_control = rbuf->oob;
|
||||
msg.msg_controllen = sizeof(rbuf->oob);
|
||||
|
||||
fd = port->in_fd;
|
||||
|
||||
retry:
|
||||
|
||||
rbuf->size = recvmsg(fd, &msg, 0);
|
||||
rbuf->size = nxt_recvmsg(fd, iov, 1, &rbuf->oob);
|
||||
|
||||
if (nxt_slow_path(rbuf->size == -1)) {
|
||||
err = errno;
|
||||
@@ -6350,7 +6286,7 @@ retry:
|
||||
m.quit_param = NXT_QUIT_GRACEFUL;
|
||||
|
||||
(void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
|
||||
&m, sizeof(m), NULL, 0);
|
||||
&m, sizeof(m), NULL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user