Moved message size to nxt_port_recv_msg_t for convenience.
This commit is contained in:
@@ -47,6 +47,7 @@ struct nxt_port_recv_msg_s {
|
|||||||
nxt_buf_t *buf;
|
nxt_buf_t *buf;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_port_msg_t port_msg;
|
nxt_port_msg_t port_msg;
|
||||||
|
size_t size;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -594,7 +594,7 @@ nxt_port_mmap_write(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
void
|
void
|
||||||
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_recv_msg_t *msg, size_t size)
|
nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
nxt_buf_t *b, **pb;
|
nxt_buf_t *b, **pb;
|
||||||
nxt_port_mmap_msg_t *end, *mmap_msg;
|
nxt_port_mmap_msg_t *end, *mmap_msg;
|
||||||
@@ -605,6 +605,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
|||||||
end = (nxt_port_mmap_msg_t *) b->mem.free;
|
end = (nxt_port_mmap_msg_t *) b->mem.free;
|
||||||
|
|
||||||
pb = &msg->buf;
|
pb = &msg->buf;
|
||||||
|
msg->size = 0;
|
||||||
|
|
||||||
while (mmap_msg < end) {
|
while (mmap_msg < end) {
|
||||||
nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
|
nxt_debug(task, "mmap_msg={%D, %D, %D} from %PI",
|
||||||
@@ -619,6 +620,7 @@ nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msg->size += mmap_msg->size;
|
||||||
pb = &(*pb)->next;
|
pb = &(*pb)->next;
|
||||||
mmap_msg++;
|
mmap_msg++;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ nxt_port_mmap_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b) {
|
|||||||
|
|
||||||
void
|
void
|
||||||
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
nxt_port_mmap_read(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_recv_msg_t *msg, size_t size);
|
nxt_port_recv_msg_t *msg);
|
||||||
|
|
||||||
enum nxt_port_method_e {
|
enum nxt_port_method_e {
|
||||||
NXT_PORT_METHOD_ANY = 0,
|
NXT_PORT_METHOD_ANY = 0,
|
||||||
|
|||||||
@@ -10,7 +10,7 @@
|
|||||||
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
|
static void nxt_port_write_handler(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
|
static void nxt_port_read_handler(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
static void nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_recv_msg_t *msg, size_t size);
|
nxt_port_recv_msg_t *msg);
|
||||||
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
|
static nxt_buf_t *nxt_port_buf_alloc(nxt_port_t *port);
|
||||||
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
|
static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
|
||||||
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
|
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
|
||||||
@@ -365,8 +365,9 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
if (n > 0) {
|
if (n > 0) {
|
||||||
|
|
||||||
msg.buf = b;
|
msg.buf = b;
|
||||||
|
msg.size = n;
|
||||||
|
|
||||||
nxt_port_read_msg_process(task, port, &msg, n);
|
nxt_port_read_msg_process(task, port, &msg);
|
||||||
|
|
||||||
if (b->mem.pos == b->mem.free) {
|
if (b->mem.pos == b->mem.free) {
|
||||||
nxt_port_buf_free(port, b);
|
nxt_port_buf_free(port, b);
|
||||||
@@ -397,53 +398,32 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
nxt_port_read_msg_process(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_recv_msg_t *msg, size_t size)
|
nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
nxt_buf_t *b;
|
nxt_buf_t *b;
|
||||||
nxt_buf_t *orig_b;
|
nxt_buf_t *orig_b;
|
||||||
nxt_buf_t **last_next;
|
|
||||||
|
|
||||||
if (nxt_slow_path(size < sizeof(nxt_port_msg_t))) {
|
if (nxt_slow_path(msg->size < sizeof(nxt_port_msg_t))) {
|
||||||
nxt_log(port->socket.task, NXT_LOG_CRIT,
|
nxt_log(task, NXT_LOG_CRIT,
|
||||||
"port %d: too small message:%uz", port->socket.fd, size);
|
"port %d: too small message:%uz", port->socket.fd, msg->size);
|
||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* adjust size to actual buffer used size */
|
/* adjust size to actual buffer used size */
|
||||||
size -= sizeof(nxt_port_msg_t);
|
msg->size -= sizeof(nxt_port_msg_t);
|
||||||
|
|
||||||
b = orig_b = msg->buf;
|
b = orig_b = msg->buf;
|
||||||
b->mem.free += size;
|
b->mem.free += msg->size;
|
||||||
|
|
||||||
if (msg->port_msg.mmap) {
|
if (msg->port_msg.mmap) {
|
||||||
nxt_port_mmap_read(task, port, msg, size);
|
nxt_port_mmap_read(task, port, msg);
|
||||||
b = msg->buf;
|
b = msg->buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
last_next = &b->next;
|
|
||||||
|
|
||||||
if (msg->port_msg.last) {
|
|
||||||
/* find reference to last next, the NULL one */
|
|
||||||
while (*last_next) {
|
|
||||||
last_next = &(*last_next)->next;
|
|
||||||
}
|
|
||||||
|
|
||||||
*last_next = nxt_buf_sync_alloc(port->mem_pool, NXT_BUF_SYNC_LAST);
|
|
||||||
if (nxt_slow_path(*last_next == NULL)) {
|
|
||||||
goto fail;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
port->handler(task, msg);
|
port->handler(task, msg);
|
||||||
|
|
||||||
if (*last_next != NULL) {
|
if (msg->port_msg.mmap && orig_b != b) {
|
||||||
/* A sync buffer */
|
/* complete used mmap buffers */
|
||||||
nxt_buf_free(port->mem_pool, *last_next);
|
|
||||||
*last_next = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (orig_b != b) {
|
|
||||||
/* complete mmap buffers */
|
|
||||||
for (; b && nxt_buf_used_size(b) == 0;
|
for (; b && nxt_buf_used_size(b) == 0;
|
||||||
b = b->next) {
|
b = b->next) {
|
||||||
nxt_debug(task, "complete buffer %p", b);
|
nxt_debug(task, "complete buffer %p", b);
|
||||||
@@ -475,7 +455,6 @@ nxt_port_buf_alloc(nxt_port_t *port)
|
|||||||
b->mem.pos = b->mem.start;
|
b->mem.pos = b->mem.start;
|
||||||
b->mem.free = b->mem.start;
|
b->mem.free = b->mem.start;
|
||||||
b->next = NULL;
|
b->next = NULL;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
|
b = nxt_buf_mem_alloc(port->mem_pool, port->max_size, 0);
|
||||||
if (nxt_slow_path(b == NULL)) {
|
if (nxt_slow_path(b == NULL)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user