Port changes.

This commit is contained in:
Igor Sysoev
2017-02-22 15:10:28 +03:00
parent 029942f4eb
commit b379dae85e
11 changed files with 274 additions and 302 deletions

View File

@@ -17,7 +17,7 @@ NXT_LIB_DEPS=" \
src/nxt_socket.h \
src/nxt_process.h \
src/nxt_signal.h \
src/nxt_port_socket.h \
src/nxt_port.h \
src/nxt_dyld.h \
src/nxt_thread.h \
src/nxt_thread_id.h \
@@ -319,7 +319,6 @@ fi
NXT_DEPS=" \
src/nxt_cycle.h \
src/nxt_port.h \
src/nxt_application.h \
src/nxt_master_process.h \
"

View File

@@ -345,33 +345,32 @@ static nxt_int_t
nxt_cycle_processes(nxt_cycle_t *cycle)
{
nxt_uint_t n;
nxt_process_port_t *proc, *prev;
nxt_port_t *port, *prev;
/*
* Preallocate double number of previous cycle
* process slots or 2 process slots for initial cycle.
*/
n = (cycle->previous != NULL) ? cycle->previous->processes->nelts : 1;
n = (cycle->previous != NULL) ? cycle->previous->ports->nelts : 1;
cycle->processes = nxt_array_create(cycle->mem_pool, 2 * n,
sizeof(nxt_process_port_t));
cycle->ports = nxt_array_create(cycle->mem_pool, 2 * n, sizeof(nxt_port_t));
if (nxt_slow_path(cycle->processes == NULL)) {
if (nxt_slow_path(cycle->ports == NULL)) {
return NXT_ERROR;
}
if (cycle->previous != NULL) {
cycle->process_generation = cycle->previous->process_generation;
prev = cycle->previous->processes->elts;
prev = cycle->previous->ports->elts;
while (n != 0) {
proc = nxt_array_add(cycle->processes);
if (nxt_slow_path(proc == NULL)) {
port = nxt_array_add(cycle->ports);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
*proc = *prev++;
*port = *prev++;
n--;
}
}

View File

@@ -16,7 +16,6 @@ typedef enum {
} nxt_process_type_e;
typedef struct nxt_cycle_s nxt_cycle_t;
typedef void (*nxt_cycle_cont_t)(nxt_task_t *task, nxt_cycle_t *cycle);
@@ -47,7 +46,7 @@ struct nxt_cycle_s {
nxt_cycle_cont_t continuation;
#endif
nxt_array_t *processes; /* of nxt_process_port_t */
nxt_array_t *ports; /* of nxt_port_t */
nxt_list_t *log_files; /* of nxt_file_t */

View File

@@ -121,7 +121,8 @@ nxt_thread_extern_data(nxt_thread_t, nxt_thread_context);
#include <nxt_fd_event.h>
#include <nxt_port_socket.h>
typedef struct nxt_cycle_s nxt_cycle_t;
#include <nxt_port.h>
#if (NXT_THREADS)
#include <nxt_thread_pool.h>
#endif
@@ -153,6 +154,7 @@ typedef struct nxt_upstream_source_s nxt_upstream_source_t;
#include <nxt_http_parse.h>
#include <nxt_http_source.h>
#include <nxt_fastcgi_source.h>
#include <nxt_cycle.h>
#if (NXT_LIB_UNIT_TEST)

View File

@@ -71,26 +71,27 @@ nxt_master_process_start(nxt_thread_t *thr, nxt_task_t *task,
static nxt_int_t
nxt_master_process_port_create(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_process_port_t *proc;
nxt_int_t ret;
nxt_port_t *port;
proc = nxt_array_add(cycle->processes);
if (nxt_slow_path(proc == NULL)) {
port = nxt_array_zero_add(cycle->ports);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
proc->pid = nxt_pid;
proc->engine = 0;
proc->port = nxt_port_create(task, 0);
if (nxt_slow_path(proc->port == NULL)) {
return NXT_ERROR;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
port->pid = nxt_pid;
port->engine = 0;
/*
* A master process port. A write port is not closed
* since it should be inherited by worker processes.
*/
nxt_port_read_enable(task, proc->port);
nxt_port_read_enable(task, port);
return NXT_OK;
}
@@ -143,24 +144,25 @@ nxt_master_start_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
static nxt_int_t
nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_int_t ret;
nxt_pid_t pid;
nxt_process_port_t *proc;
nxt_port_t *port;
proc = nxt_array_add(cycle->processes);
if (nxt_slow_path(proc == NULL)) {
port = nxt_array_zero_add(cycle->ports);
if (nxt_slow_path(port == NULL)) {
return NXT_ERROR;
}
cycle->current_process = cycle->processes->nelts - 1;
cycle->current_process = cycle->ports->nelts - 1;
proc->engine = 0;
proc->generation = cycle->process_generation;
proc->port = nxt_port_create(task, 0);
if (nxt_slow_path(proc->port == NULL)) {
return NXT_ERROR;
ret = nxt_port_socket_init(task, port, 0);
if (nxt_slow_path(ret != NXT_OK)) {
return ret;
}
port->engine = 0;
port->generation = cycle->process_generation;
pid = nxt_process_create(nxt_worker_process_start, cycle,
"start worker process");
@@ -175,12 +177,12 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_cycle_t *cycle)
default:
/* The master process created a new process. */
proc->pid = pid;
port->pid = pid;
nxt_port_read_close(proc->port);
nxt_port_write_enable(task, proc->port);
nxt_port_read_close(port);
nxt_port_write_enable(task, port);
nxt_process_new_port(task, cycle, proc);
nxt_port_send_new_port(task, cycle, port);
return NXT_OK;
}
}
@@ -252,22 +254,22 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
{
uint32_t generation;
nxt_uint_t i, n;
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_process_port_t *proc;
cycle = nxt_thread_cycle();
proc = cycle->processes->elts;
n = cycle->processes->nelts;
port = cycle->ports->elts;
n = cycle->ports->nelts;
generation = cycle->process_generation - 1;
/* The proc[0] is the master process. */
/* The port[0] is the master process port. */
for (i = 1; i < n; i++) {
if (proc[i].generation == generation) {
(void) nxt_port_write(task, proc[i].port, NXT_PORT_MSG_QUIT,
-1, 0, NULL);
if (port[i].generation == generation) {
(void) nxt_port_socket_write(task, &port[i],
NXT_PORT_MSG_QUIT, -1, 0, NULL);
}
}
@@ -278,7 +280,7 @@ nxt_master_stop_previous_worker_processes(nxt_task_t *task, void *obj,
void
nxt_master_stop_worker_processes(nxt_task_t *task, nxt_cycle_t *cycle)
{
nxt_process_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL);
nxt_port_write(task, cycle, NXT_PORT_MSG_QUIT, -1, 0, NULL);
}
@@ -366,7 +368,7 @@ nxt_master_process_sigusr1_handler(nxt_task_t *task, void *obj, void *data)
nxt_list_each(file, cycle->log_files) {
nxt_process_port_change_log_file(task, cycle, n, new_file[n].fd);
nxt_port_change_log_file(task, cycle, n, new_file[n].fd);
/*
* The old log file descriptor must be closed at the moment
* when no other threads use it. dup2() allows to use the
@@ -618,8 +620,8 @@ static void
nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
{
nxt_uint_t i, n, generation;
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_process_port_t *proc;
cycle = nxt_thread_cycle();
@@ -630,15 +632,15 @@ nxt_master_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
return;
}
proc = cycle->processes->elts;
n = cycle->processes->nelts;
port = cycle->ports->elts;
n = cycle->ports->nelts;
for (i = 0; i < n; i++) {
if (pid == proc[i].pid) {
generation = proc[i].generation;
if (pid == port[i].pid) {
generation = port[i].generation;
nxt_array_remove(cycle->processes, &proc[i]);
nxt_array_remove(cycle->ports, &port[i]);
if (nxt_exiting) {
nxt_debug(task, "processes %d", n);

View File

@@ -9,48 +9,47 @@
#include <nxt_port.h>
static void nxt_process_port_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
static void nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj,
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
static void nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj,
void *data);
void
nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
nxt_process_port_handler_t *handlers)
nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
nxt_port_handler_t *handlers)
{
proc->pid = nxt_pid;
proc->engine = thr->engine->id;
proc->port->handler = nxt_process_port_handler;
proc->port->data = handlers;
port->pid = nxt_pid;
port->engine = thread->engine->id;
port->handler = nxt_port_handler;
port->data = handlers;
nxt_port_write_close(proc->port);
nxt_port_read_enable(&thr->engine->task, proc->port);
nxt_port_write_close(port);
nxt_port_read_enable(&thread->engine->task, port);
}
void
nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
nxt_uint_t i, n;
nxt_process_port_t *proc;
nxt_port_t *port;
proc = cycle->processes->elts;
n = cycle->processes->nelts;
port = cycle->ports->elts;
n = cycle->ports->nelts;
for (i = 0; i < n; i++) {
if (nxt_pid != proc[i].pid) {
(void) nxt_port_write(task, proc[i].port, type, fd, stream, b);
if (nxt_pid != port[i].pid) {
(void) nxt_port_socket_write(task, &port[i], type, fd, stream, b);
}
}
}
static void
nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_process_port_handler_t *handlers;
nxt_port_handler_t *handlers;
if (nxt_fast_path(msg->type <= NXT_PORT_MSG_MAX)) {
@@ -69,62 +68,64 @@ nxt_process_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_cycle_quit(task, NULL);
}
void
nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_process_port_t *proc)
nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_port_t *new_port)
{
nxt_buf_t *b;
nxt_uint_t i, n;
nxt_process_port_t *p;
nxt_proc_msg_new_port_t *new_port;
nxt_port_t *port;
nxt_port_msg_new_port_t *msg;
n = cycle->processes->nelts;
n = cycle->ports->nelts;
if (n == 0) {
return;
}
nxt_thread_log_debug("new port %d for process %PI engine %uD",
proc->port->socket.fd, proc->pid, proc->engine);
nxt_debug(task, "new port %d for process %PI engine %uD",
new_port->socket.fd, new_port->pid, new_port->engine);
p = cycle->processes->elts;
port = cycle->ports->elts;
for (i = 0; i < n; i++) {
if (proc->pid == p[i].pid || nxt_pid == p[i].pid || p[i].engine != 0) {
if (port[i].pid == new_port->pid
|| port[i].pid == nxt_pid
|| port[i].engine != 0)
{
continue;
}
b = nxt_buf_mem_alloc(p[i].port->mem_pool,
sizeof(nxt_process_port_data_t), 0);
b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
}
b->data = p[i].port;
b->completion_handler = nxt_process_new_port_buf_completion;
b->mem.free += sizeof(nxt_proc_msg_new_port_t);
new_port = (nxt_proc_msg_new_port_t *) b->mem.pos;
b->data = &port[i];
b->completion_handler = nxt_port_new_port_buf_completion;
b->mem.free += sizeof(nxt_port_msg_new_port_t);
msg = (nxt_port_msg_new_port_t *) b->mem.pos;
new_port->pid = proc->pid;
new_port->engine = proc->engine;
new_port->max_size = p[i].port->max_size;
new_port->max_share = p[i].port->max_share;
msg->pid = new_port->pid;
msg->engine = new_port->engine;
msg->max_size = port[i].max_size;
msg->max_share = port[i].max_share;
(void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_NEW_PORT,
proc->port->socket.fd, 0, b);
(void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_NEW_PORT,
new_port->socket.fd, 0, b);
}
}
static void
nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
nxt_port_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
{
nxt_buf_t *b;
nxt_port_t *port;
@@ -139,66 +140,69 @@ nxt_process_new_port_buf_completion(nxt_task_t *task, void *obj, void *data)
void
nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_process_port_t *proc;
nxt_proc_msg_new_port_t *new_port;
nxt_mem_pool_t *mp;
nxt_port_msg_new_port_t *new_port_msg;
cycle = nxt_thread_cycle();
proc = nxt_array_add(cycle->processes);
if (nxt_slow_path(proc == NULL)) {
return;
}
port = nxt_port_alloc(task);
port = nxt_array_add(cycle->ports);
if (nxt_slow_path(port == NULL)) {
return;
}
proc->port = port;
mp = nxt_mem_pool_create(1024);
if (nxt_slow_path(mp == NULL)) {
return;
}
new_port = (nxt_proc_msg_new_port_t *) msg->buf->mem.pos;
port->mem_pool = mp;
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
msg->buf->mem.pos = msg->buf->mem.free;
nxt_debug(task, "new port %d received for process %PI engine %uD",
msg->fd, new_port->pid, new_port->engine);
msg->fd, new_port_msg->pid, new_port_msg->engine);
proc->pid = new_port->pid;
proc->engine = new_port->engine;
port->pid = new_port_msg->pid;
port->engine = new_port_msg->engine;
port->pair[0] = -1;
port->pair[1] = msg->fd;
port->max_size = new_port->max_size;
port->max_share = new_port->max_share;
port->max_size = new_port_msg->max_size;
port->max_share = new_port_msg->max_share;
nxt_queue_init(&port->messages);
port->socket.task = task;
/* A read port is not passed at all. */
nxt_port_write_enable(task, port);
}
void
nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_uint_t slot, nxt_fd_t fd)
{
nxt_buf_t *b;
nxt_uint_t i, n;
nxt_process_port_t *p;
nxt_port_t *port;
n = cycle->processes->nelts;
n = cycle->ports->nelts;
if (n == 0) {
return;
}
nxt_thread_log_debug("change log file #%ui fd:%FD", slot, fd);
nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
p = cycle->processes->elts;
port = cycle->ports->elts;
/* p[0] is master process. */
/* port[0] is master process port. */
for (i = 1; i < n; i++) {
b = nxt_buf_mem_alloc(p[i].port->mem_pool,
sizeof(nxt_process_port_data_t), 0);
b = nxt_buf_mem_alloc(port[i].mem_pool, sizeof(nxt_port_data_t), 0);
if (nxt_slow_path(b == NULL)) {
continue;
@@ -207,15 +211,14 @@ nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
*(nxt_uint_t *) b->mem.pos = slot;
b->mem.free += sizeof(nxt_uint_t);
(void) nxt_port_write(task, p[i].port, NXT_PORT_MSG_PORTGE_FILE,
(void) nxt_port_socket_write(task, &port[i], NXT_PORT_MSG_CHANGE_FILE,
fd, 0, b);
}
}
void
nxt_process_port_change_log_file_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg)
nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b;
nxt_uint_t slot;
@@ -246,7 +249,7 @@ nxt_process_port_change_log_file_handler(nxt_task_t *task,
void
nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_buf_t *b;
@@ -259,7 +262,7 @@ nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
void
nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
{
nxt_debug(task, "port empty handler");
}

View File

@@ -8,61 +8,116 @@
#define _NXT_PORT_H_INCLUDED_
typedef struct nxt_port_s nxt_port_t;
typedef struct {
uint32_t stream;
uint16_t type;
uint8_t last; /* 1 bit */
} nxt_port_msg_t;
typedef struct {
nxt_queue_link_t link;
nxt_buf_t *buf;
size_t share;
nxt_fd_t fd;
nxt_port_msg_t port_msg;
} nxt_port_send_msg_t;
typedef struct nxt_port_recv_msg_s {
uint32_t stream;
uint16_t type;
nxt_fd_t fd;
nxt_buf_t *buf;
nxt_port_t *port;
} nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
struct nxt_port_s {
/* Must be the first field. */
nxt_fd_event_t socket;
nxt_queue_t messages; /* of nxt_port_send_msg_t */
/* Maximum size of message part. */
uint32_t max_size;
/* Maximum interleave of message parts. */
uint32_t max_share;
nxt_port_handler_t handler;
void *data;
nxt_mem_pool_t *mem_pool;
nxt_buf_t *free_bufs;
nxt_socket_t pair[2];
nxt_pid_t pid;
uint32_t engine;
uint32_t generation;
};
#define NXT_PORT_MSG_MAX NXT_PORT_MSG_DATA
typedef enum {
NXT_PORT_MSG_QUIT = 0,
NXT_PORT_MSG_NEW_PORT,
NXT_PORT_MSG_PORTGE_FILE,
NXT_PORT_MSG_CHANGE_FILE,
NXT_PORT_MSG_DATA,
} nxt_port_msg_type_e;
typedef struct {
nxt_pid_t pid;
uint32_t engine;
uint32_t generation;
nxt_port_t *port;
} nxt_process_port_t;
typedef struct {
nxt_pid_t pid;
uint32_t engine;
size_t max_size;
size_t max_share;
} nxt_proc_msg_new_port_t;
} nxt_port_msg_new_port_t;
/*
* nxt_process_port_data_t is allocaiton size
* enabling effective reuse of memory pool cache.
* nxt_port_data_t size is allocation size
* which enables effective reuse of memory pool cache.
*/
typedef union {
nxt_buf_t buf;
nxt_proc_msg_new_port_t new_port;
} nxt_process_port_data_t;
nxt_port_msg_new_port_t new_port;
} nxt_port_data_t;
typedef void (*nxt_process_port_handler_t)(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
void nxt_process_port_create(nxt_thread_t *thr, nxt_process_port_t *proc,
nxt_process_port_handler_t *handlers);
void nxt_process_port_write(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_int_t nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port,
size_t max_size);
void nxt_port_destroy(nxt_port_t *port);
void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_write_close(nxt_port_t *port);
void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
void nxt_port_read_close(nxt_port_t *port);
nxt_int_t nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
void nxt_process_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_process_port_t *proc);
void nxt_process_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
void nxt_port_create(nxt_thread_t *thread, nxt_port_t *port,
nxt_port_handler_t *handlers);
void nxt_port_write(nxt_task_t *task, nxt_cycle_t *cycle, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
void nxt_port_send_new_port(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_port_t *port);
void nxt_port_change_log_file(nxt_task_t *task, nxt_cycle_t *cycle,
nxt_uint_t slot, nxt_fd_t fd);
void nxt_process_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_process_port_new_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_process_port_change_log_file_handler(nxt_task_t *task,
void nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_change_log_file_handler(nxt_task_t *task,
nxt_port_recv_msg_t *msg);
void nxt_process_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_process_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
void nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
#endif /* _NXT_PORT_H_INCLUDED_ */

View File

@@ -16,19 +16,13 @@ static void nxt_port_buf_free(nxt_port_t *port, nxt_buf_t *b);
static void nxt_port_error_handler(nxt_task_t *task, void *obj, void *data);
nxt_port_t *
nxt_port_alloc(nxt_task_t *task)
nxt_int_t
nxt_port_socket_init(nxt_task_t *task, nxt_port_t *port, size_t max_size)
{
nxt_port_t *port;
nxt_int_t sndbuf, rcvbuf, size;
nxt_socket_t snd, rcv;
nxt_mem_pool_t *mp;
mp = nxt_mem_pool_create(1024);
if (nxt_fast_path(mp != NULL)) {
/* This allocation cannot fail. */
port = nxt_mem_zalloc(mp, sizeof(nxt_port_t));
port->mem_pool = mp;
port->socket.task = task;
port->pair[0] = -1;
@@ -36,24 +30,12 @@ nxt_port_alloc(nxt_task_t *task)
nxt_queue_init(&port->messages);
return port;
mp = nxt_mem_pool_create(1024);
if (nxt_slow_path(mp == NULL)) {
return NXT_ERROR;
}
return NULL;
}
nxt_port_t *
nxt_port_create(nxt_task_t *task, size_t max_size)
{
nxt_int_t sndbuf, rcvbuf, size;
nxt_port_t *port;
nxt_socket_t snd, rcv;
port = nxt_port_alloc(task);
if (nxt_slow_path(port == NULL)) {
return NULL;
}
port->mem_pool = mp;
if (nxt_slow_path(nxt_socketpair_create(task, port->pair) != NXT_OK)) {
goto socketpair_fail;
@@ -109,7 +91,7 @@ nxt_port_create(nxt_task_t *task, size_t max_size)
port->max_size = nxt_min(max_size, (size_t) sndbuf);
port->max_share = (64 * 1024);
return port;
return NXT_OK;
getsockopt_fail:
@@ -120,7 +102,7 @@ socketpair_fail:
nxt_mem_pool_destroy(port->mem_pool);
return NULL;
return NXT_ERROR;
}
@@ -154,7 +136,7 @@ nxt_port_write_close(nxt_port_t *port)
nxt_int_t
nxt_port_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
nxt_fd_t fd, uint32_t stream, nxt_buf_t *b)
{
nxt_queue_link_t *link;
@@ -204,9 +186,9 @@ static void
nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
{
ssize_t n;
nxt_uint_t niob;
nxt_uint_t niov;
nxt_port_t *port;
struct iovec iob[NXT_IOBUF_MAX];
struct iovec iov[NXT_IOBUF_MAX];
nxt_queue_link_t *link;
nxt_port_send_msg_t *msg;
nxt_sendbuf_coalesce_t sb;
@@ -223,21 +205,22 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
msg = (nxt_port_send_msg_t *) link;
nxt_iobuf_set(&iob[0], &msg->port_msg, sizeof(nxt_port_msg_t));
iov[0].iov_base = &msg->port_msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
sb.buf = msg->buf;
sb.iobuf = &iob[1];
sb.iobuf = &iov[1];
sb.nmax = NXT_IOBUF_MAX - 1;
sb.sync = 0;
sb.last = 0;
sb.size = sizeof(nxt_port_msg_t);
sb.limit = port->max_size;
niob = nxt_sendbuf_mem_coalesce(task, &sb);
niov = nxt_sendbuf_mem_coalesce(task, &sb);
msg->port_msg.last = sb.last;
n = nxt_socketpair_send(&port->socket, msg->fd, iob, niob + 1);
n = nxt_socketpair_send(&port->socket, msg->fd, iov, niov + 1);
if (n > 0) {
if (nxt_slow_path((size_t) n != sb.size)) {
@@ -322,7 +305,7 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
nxt_fd_t fd;
nxt_buf_t *b;
nxt_port_t *port;
nxt_iobuf_t iob[2];
struct iovec iov[2];
nxt_port_msg_t msg;
port = obj;
@@ -335,10 +318,13 @@ nxt_port_read_handler(nxt_task_t *task, void *obj, void *data)
/* TODO: disable event for some time */
}
nxt_iobuf_set(&iob[0], &msg, sizeof(nxt_port_msg_t));
nxt_iobuf_set(&iob[1], b->mem.pos, port->max_size);
iov[0].iov_base = &msg;
iov[0].iov_len = sizeof(nxt_port_msg_t);
n = nxt_socketpair_recv(&port->socket, &fd, iob, 2);
iov[1].iov_base = b->mem.pos;
iov[1].iov_len = port->max_size;
n = nxt_socketpair_recv(&port->socket, &fd, iov, 2);
if (n > 0) {
nxt_port_read_msg_process(task, port, &msg, fd, b, n);

View File

@@ -1,73 +0,0 @@
/*
* Copyright (C) Igor Sysoev
* Copyright (C) NGINX, Inc.
*/
#ifndef _NXT_PORT_SOCKET_H_INCLUDED_
#define _NXT_PORT_SOCKET_H_INCLUDED_
typedef struct {
uint32_t stream;
uint16_t type;
uint8_t last; /* 1 bit */
} nxt_port_msg_t;
typedef struct {
nxt_queue_link_t link;
nxt_buf_t *buf;
size_t share;
nxt_fd_t fd;
nxt_port_msg_t port_msg;
} nxt_port_send_msg_t;
typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
typedef struct {
/* Must be the first field. */
nxt_fd_event_t socket;
nxt_queue_t messages; /* of nxt_port_send_msg_t */
/* Maximum size of message part. */
uint32_t max_size;
/* Maximum interleave of message parts. */
uint32_t max_share;
nxt_port_handler_t handler;
void *data;
nxt_mem_pool_t *mem_pool;
nxt_buf_t *free_bufs;
nxt_socket_t pair[2];
} nxt_port_t;
struct nxt_port_recv_msg_s {
uint32_t stream;
uint16_t type;
nxt_fd_t fd;
nxt_buf_t *buf;
nxt_port_t *port;
};
NXT_EXPORT nxt_port_t *nxt_port_alloc(nxt_task_t *task);
NXT_EXPORT nxt_port_t *nxt_port_create(nxt_task_t *task, size_t bufsize);
NXT_EXPORT void nxt_port_destroy(nxt_port_t *port);
NXT_EXPORT void nxt_port_write_enable(nxt_task_t *task, nxt_port_t *port);
NXT_EXPORT void nxt_port_write_close(nxt_port_t *port);
NXT_EXPORT void nxt_port_read_enable(nxt_task_t *task, nxt_port_t *port);
NXT_EXPORT void nxt_port_read_close(nxt_port_t *port);
NXT_EXPORT nxt_int_t nxt_port_write(nxt_task_t *task, nxt_port_t *port,
nxt_uint_t type, nxt_fd_t fd, uint32_t stream, nxt_buf_t *b);
#endif /* _NXT_PORT_SOCKET_H_INCLUDED_ */

View File

@@ -122,15 +122,15 @@ nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
goto done;
}
nxt_iobuf_set(&sb->iobuf[n], b->mem.pos, size);
sb->iobuf[n].iov_base = b->mem.pos;
sb->iobuf[n].iov_len = size;
} else {
nxt_iobuf_add(&sb->iobuf[n], size);
sb->iobuf[n].iov_len += size;
}
nxt_debug(task, "sendbuf: %ui, %p, %uz", n,
nxt_iobuf_data(&sb->iobuf[n]),
nxt_iobuf_size(&sb->iobuf[n]));
nxt_debug(task, "sendbuf: %ui, %p, %uz",
n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
total += size;
last = b->mem.pos + size;

View File

@@ -21,11 +21,11 @@ static void nxt_worker_process_sigquit_handler(nxt_task_t *task, void *obj,
void *data);
static nxt_process_port_handler_t nxt_worker_process_port_handlers[] = {
static nxt_port_handler_t nxt_worker_process_port_handlers[] = {
nxt_worker_process_quit_handler,
nxt_process_port_new_handler,
nxt_process_port_change_log_file_handler,
nxt_process_port_data_handler,
nxt_port_new_port_handler,
nxt_port_change_log_file_handler,
nxt_port_data_handler,
};
@@ -45,9 +45,9 @@ void
nxt_worker_process_start(void *data)
{
nxt_int_t n;
nxt_port_t *port;
nxt_cycle_t *cycle;
nxt_thread_t *thr;
nxt_process_port_t *proc;
const nxt_event_interface_t *interface;
cycle = data;
@@ -90,14 +90,14 @@ nxt_worker_process_start(void *data)
goto fail;
}
proc = cycle->processes->elts;
port = cycle->ports->elts;
/* A master process port. */
nxt_port_read_close(proc[0].port);
nxt_port_write_enable(&nxt_main_task, proc[0].port);
nxt_port_read_close(&port[0]);
nxt_port_write_enable(&nxt_main_task, &port[0]);
/* A worker process port. */
nxt_process_port_create(thr, &proc[cycle->current_process],
nxt_port_create(thr, &port[cycle->current_process],
nxt_worker_process_port_handlers);
#if (NXT_THREADS)