Future releases of GCC will render function definitions like
func()
invalid by default. See the previous commit 09f88c9 ("Fixed main()
prototypes in auto tests.") for details.
Such functions should be defined like
func(void)
This is a good thing to do regardless of the upcoming GCC changes.
Reviewed-by: Alejandro Colomar <alx@nginx.com>
Signed-off-by: Andrew Clayton <a.clayton@nginx.com>
632 lines
14 KiB
C
632 lines
14 KiB
C
|
|
/*
|
|
* Copyright (C) Igor Sysoev
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_main.h>
|
|
#include <nxt_runtime.h>
|
|
#include <nxt_port.h>
|
|
#include <nxt_router.h>
|
|
#include <nxt_app_queue.h>
|
|
#include <nxt_port_queue.h>
|
|
|
|
|
|
static void nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|
nxt_pid_t pid);
|
|
static void nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
|
|
|
static nxt_atomic_uint_t nxt_port_last_id = 1;
|
|
|
|
|
|
static void
|
|
nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_mp_t *mp;
|
|
nxt_port_t *port;
|
|
|
|
port = obj;
|
|
mp = data;
|
|
|
|
nxt_assert(port->pair[0] == -1);
|
|
nxt_assert(port->pair[1] == -1);
|
|
|
|
nxt_assert(port->use_count == 0);
|
|
nxt_assert(port->app_link.next == NULL);
|
|
nxt_assert(port->idle_link.next == NULL);
|
|
|
|
nxt_assert(nxt_queue_is_empty(&port->messages));
|
|
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_streams));
|
|
nxt_assert(nxt_lvlhsh_is_empty(&port->rpc_peers));
|
|
|
|
nxt_thread_mutex_destroy(&port->write_mutex);
|
|
|
|
nxt_mp_free(mp, port);
|
|
}
|
|
|
|
|
|
nxt_port_t *
|
|
nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
|
|
nxt_process_type_t type)
|
|
{
|
|
nxt_mp_t *mp;
|
|
nxt_port_t *port;
|
|
|
|
mp = nxt_mp_create(1024, 128, 256, 32);
|
|
|
|
if (nxt_slow_path(mp == NULL)) {
|
|
return NULL;
|
|
}
|
|
|
|
port = nxt_mp_zalloc(mp, sizeof(nxt_port_t));
|
|
|
|
if (nxt_fast_path(port != NULL)) {
|
|
port->id = id;
|
|
port->pid = pid;
|
|
port->type = type;
|
|
port->mem_pool = mp;
|
|
port->use_count = 1;
|
|
|
|
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
|
|
|
|
nxt_queue_init(&port->messages);
|
|
nxt_thread_mutex_create(&port->write_mutex);
|
|
|
|
port->queue_fd = -1;
|
|
|
|
} else {
|
|
nxt_mp_destroy(mp);
|
|
}
|
|
|
|
nxt_thread_log_debug("port %p %d:%d new, type %d", port, pid, id, type);
|
|
|
|
return port;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_close(nxt_task_t *task, nxt_port_t *port)
|
|
{
|
|
size_t size;
|
|
|
|
nxt_debug(task, "port %p %d:%d close, type %d", port, port->pid,
|
|
port->id, port->type);
|
|
|
|
if (port->pair[0] != -1) {
|
|
nxt_port_rpc_close(task, port);
|
|
|
|
nxt_fd_close(port->pair[0]);
|
|
port->pair[0] = -1;
|
|
}
|
|
|
|
if (port->pair[1] != -1) {
|
|
nxt_fd_close(port->pair[1]);
|
|
port->pair[1] = -1;
|
|
|
|
if (port->app != NULL) {
|
|
nxt_router_app_port_close(task, port);
|
|
}
|
|
}
|
|
|
|
if (port->queue_fd != -1) {
|
|
nxt_fd_close(port->queue_fd);
|
|
port->queue_fd = -1;
|
|
}
|
|
|
|
if (port->queue != NULL) {
|
|
size = (port->id == (nxt_port_id_t) -1) ? sizeof(nxt_app_queue_t)
|
|
: sizeof(nxt_port_queue_t);
|
|
nxt_mem_munmap(port->queue, size);
|
|
|
|
port->queue = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_release(nxt_task_t *task, nxt_port_t *port)
|
|
{
|
|
nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
|
|
port->id, port->type);
|
|
|
|
port->app = NULL;
|
|
|
|
if (port->link.next != NULL) {
|
|
nxt_assert(port->process != NULL);
|
|
|
|
nxt_process_port_remove(port);
|
|
|
|
nxt_process_use(task, port->process, -1);
|
|
}
|
|
|
|
nxt_mp_release(port->mem_pool);
|
|
}
|
|
|
|
|
|
nxt_port_id_t
|
|
nxt_port_get_next_id(void)
|
|
{
|
|
return nxt_atomic_fetch_add(&nxt_port_last_id, 1);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_reset_next_id(void)
|
|
{
|
|
nxt_port_last_id = 1;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_enable(nxt_task_t *task, nxt_port_t *port,
|
|
const nxt_port_handlers_t *handlers)
|
|
{
|
|
port->pid = nxt_pid;
|
|
port->handler = nxt_port_handler;
|
|
port->data = (nxt_port_handler_t *) (handlers);
|
|
|
|
nxt_port_read_enable(task, port);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_port_handler_t *handlers;
|
|
|
|
if (nxt_fast_path(msg->port_msg.type < NXT_PORT_MSG_MAX)) {
|
|
|
|
nxt_debug(task, "port %d: message type:%uD fds:%d,%d",
|
|
msg->port->socket.fd, msg->port_msg.type,
|
|
msg->fd[0], msg->fd[1]);
|
|
|
|
handlers = msg->port->data;
|
|
handlers[msg->port_msg.type](task, msg);
|
|
|
|
return;
|
|
}
|
|
|
|
nxt_alert(task, "port %d: unknown message type:%uD",
|
|
msg->port->socket.fd, msg->port_msg.type);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_runtime_quit(task, 0);
|
|
}
|
|
|
|
|
|
/* TODO join with process_ready and move to nxt_main_process.c */
|
|
nxt_inline void
|
|
nxt_port_send_new_port(nxt_task_t *task, nxt_runtime_t *rt,
|
|
nxt_port_t *new_port, uint32_t stream)
|
|
{
|
|
nxt_port_t *port;
|
|
nxt_process_t *process;
|
|
|
|
nxt_debug(task, "new port %d for process %PI",
|
|
new_port->pair[1], new_port->pid);
|
|
|
|
nxt_runtime_process_each(rt, process) {
|
|
|
|
if (process->pid == new_port->pid || process->pid == nxt_pid) {
|
|
continue;
|
|
}
|
|
|
|
port = nxt_process_port_first(process);
|
|
|
|
if (nxt_proc_send_matrix[port->type][new_port->type]) {
|
|
(void) nxt_port_send_port(task, port, new_port, stream);
|
|
}
|
|
|
|
} nxt_runtime_process_loop;
|
|
}
|
|
|
|
|
|
nxt_int_t
|
|
nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
|
|
uint32_t stream)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_port_msg_new_port_t *msg;
|
|
|
|
b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
|
|
sizeof(nxt_port_data_t));
|
|
if (nxt_slow_path(b == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
nxt_debug(task, "send port %FD to process %PI",
|
|
new_port->pair[1], port->pid);
|
|
|
|
b->mem.free += sizeof(nxt_port_msg_new_port_t);
|
|
msg = (nxt_port_msg_new_port_t *) b->mem.pos;
|
|
|
|
msg->id = new_port->id;
|
|
msg->pid = new_port->pid;
|
|
msg->max_size = port->max_size;
|
|
msg->max_share = port->max_share;
|
|
msg->type = new_port->type;
|
|
|
|
return nxt_port_socket_write2(task, port, NXT_PORT_MSG_NEW_PORT,
|
|
new_port->pair[1], new_port->queue_fd,
|
|
stream, 0, b);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_port_t *port;
|
|
nxt_runtime_t *rt;
|
|
nxt_port_msg_new_port_t *new_port_msg;
|
|
|
|
rt = task->thread->runtime;
|
|
|
|
new_port_msg = (nxt_port_msg_new_port_t *) msg->buf->mem.pos;
|
|
|
|
/* TODO check b size and make plain */
|
|
|
|
nxt_debug(task, "new port %d received for process %PI:%d",
|
|
msg->fd[0], new_port_msg->pid, new_port_msg->id);
|
|
|
|
port = nxt_runtime_port_find(rt, new_port_msg->pid, new_port_msg->id);
|
|
if (port != NULL) {
|
|
nxt_debug(task, "port %PI:%d already exists", new_port_msg->pid,
|
|
new_port_msg->id);
|
|
|
|
msg->u.new_port = port;
|
|
|
|
nxt_fd_close(msg->fd[0]);
|
|
msg->fd[0] = -1;
|
|
return;
|
|
}
|
|
|
|
port = nxt_runtime_process_port_create(task, rt, new_port_msg->pid,
|
|
new_port_msg->id,
|
|
new_port_msg->type);
|
|
if (nxt_slow_path(port == NULL)) {
|
|
return;
|
|
}
|
|
|
|
nxt_fd_nonblocking(task, msg->fd[0]);
|
|
|
|
port->pair[0] = -1;
|
|
port->pair[1] = msg->fd[0];
|
|
port->max_size = new_port_msg->max_size;
|
|
port->max_share = new_port_msg->max_share;
|
|
|
|
port->socket.task = task;
|
|
|
|
nxt_port_write_enable(task, port);
|
|
|
|
msg->u.new_port = port;
|
|
}
|
|
|
|
/* TODO move to nxt_main_process.c */
|
|
void
|
|
nxt_port_process_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_port_t *port;
|
|
nxt_process_t *process;
|
|
nxt_runtime_t *rt;
|
|
|
|
rt = task->thread->runtime;
|
|
|
|
process = nxt_runtime_process_find(rt, msg->port_msg.pid);
|
|
if (nxt_slow_path(process == NULL)) {
|
|
return;
|
|
}
|
|
|
|
nxt_assert(process->state != NXT_PROCESS_STATE_READY);
|
|
|
|
process->state = NXT_PROCESS_STATE_READY;
|
|
|
|
nxt_assert(!nxt_queue_is_empty(&process->ports));
|
|
|
|
port = nxt_process_port_first(process);
|
|
|
|
nxt_debug(task, "process %PI ready", msg->port_msg.pid);
|
|
|
|
if (msg->fd[0] != -1) {
|
|
port->queue_fd = msg->fd[0];
|
|
port->queue = nxt_mem_mmap(NULL, sizeof(nxt_port_queue_t),
|
|
PROT_READ | PROT_WRITE, MAP_SHARED,
|
|
msg->fd[0], 0);
|
|
}
|
|
|
|
nxt_port_send_new_port(task, rt, port, msg->port_msg.stream);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_mmap_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_runtime_t *rt;
|
|
nxt_process_t *process;
|
|
|
|
rt = task->thread->runtime;
|
|
|
|
if (nxt_slow_path(msg->fd[0] == -1)) {
|
|
nxt_log(task, NXT_LOG_WARN, "invalid fd passed with mmap message");
|
|
|
|
return;
|
|
}
|
|
|
|
process = nxt_runtime_process_find(rt, msg->port_msg.pid);
|
|
if (nxt_slow_path(process == NULL)) {
|
|
nxt_log(task, NXT_LOG_WARN, "failed to get process #%PI",
|
|
msg->port_msg.pid);
|
|
|
|
goto fail_close;
|
|
}
|
|
|
|
nxt_port_incoming_port_mmap(task, process, msg->fd[0]);
|
|
|
|
fail_close:
|
|
|
|
nxt_fd_close(msg->fd[0]);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
|
|
nxt_fd_t fd)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_port_t *port;
|
|
nxt_process_t *process;
|
|
|
|
nxt_debug(task, "change log file #%ui fd:%FD", slot, fd);
|
|
|
|
nxt_runtime_process_each(rt, process) {
|
|
|
|
if (nxt_pid == process->pid) {
|
|
continue;
|
|
}
|
|
|
|
port = nxt_process_port_first(process);
|
|
|
|
b = nxt_buf_mem_alloc(task->thread->engine->mem_pool,
|
|
sizeof(nxt_uint_t), 0);
|
|
if (nxt_slow_path(b == NULL)) {
|
|
continue;
|
|
}
|
|
|
|
b->mem.free = nxt_cpymem(b->mem.free, &slot, sizeof(nxt_uint_t));
|
|
|
|
(void) nxt_port_socket_write(task, port, NXT_PORT_MSG_CHANGE_FILE,
|
|
fd, 0, 0, b);
|
|
|
|
} nxt_runtime_process_loop;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_change_log_file_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_buf_t *b;
|
|
nxt_uint_t slot;
|
|
nxt_file_t *log_file;
|
|
nxt_runtime_t *rt;
|
|
|
|
rt = task->thread->runtime;
|
|
|
|
b = msg->buf;
|
|
slot = *(nxt_uint_t *) b->mem.pos;
|
|
|
|
log_file = nxt_list_elt(rt->log_files, slot);
|
|
|
|
nxt_debug(task, "change log file %FD:%FD", msg->fd[0], log_file->fd);
|
|
|
|
/*
|
|
* The old log file descriptor must be closed at the moment when no
|
|
* other threads use it. dup2() allows to use the old file descriptor
|
|
* for new log file. This change is performed atomically in the kernel.
|
|
*/
|
|
if (nxt_file_redirect(log_file, msg->fd[0]) == NXT_OK) {
|
|
if (slot == 0) {
|
|
(void) nxt_file_stderr(log_file);
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
size_t dump_size;
|
|
nxt_buf_t *b;
|
|
|
|
b = msg->buf;
|
|
dump_size = b->mem.free - b->mem.pos;
|
|
|
|
if (dump_size > 300) {
|
|
dump_size = 300;
|
|
}
|
|
|
|
nxt_debug(task, "data: %*s", dump_size, b->mem.pos);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_remove_notify_others(nxt_task_t *task, nxt_process_t *process)
|
|
{
|
|
nxt_pid_t pid;
|
|
nxt_buf_t *buf;
|
|
nxt_port_t *port;
|
|
nxt_runtime_t *rt;
|
|
nxt_process_t *p;
|
|
nxt_process_type_t ptype;
|
|
|
|
pid = process->pid;
|
|
|
|
ptype = nxt_process_type(process);
|
|
|
|
rt = task->thread->runtime;
|
|
|
|
nxt_runtime_process_each(rt, p) {
|
|
|
|
if (p->pid == nxt_pid
|
|
|| p->pid == pid
|
|
|| nxt_queue_is_empty(&p->ports))
|
|
{
|
|
continue;
|
|
}
|
|
|
|
port = nxt_process_port_first(p);
|
|
|
|
if (nxt_proc_remove_notify_matrix[ptype][port->type] == 0) {
|
|
continue;
|
|
}
|
|
|
|
buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
|
|
sizeof(pid));
|
|
|
|
if (nxt_slow_path(buf == NULL)) {
|
|
continue;
|
|
}
|
|
|
|
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
|
|
|
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, -1,
|
|
process->stream, 0, buf);
|
|
|
|
} nxt_runtime_process_loop;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_pid_t pid;
|
|
nxt_buf_t *buf;
|
|
|
|
buf = msg->buf;
|
|
|
|
nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
|
|
|
|
nxt_memcpy(&pid, buf->mem.pos, sizeof(nxt_pid_t));
|
|
|
|
nxt_port_remove_pid(task, msg, pid);
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_remove_pid(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|
nxt_pid_t pid)
|
|
{
|
|
nxt_runtime_t *rt;
|
|
nxt_process_t *process;
|
|
|
|
msg->u.removed_pid = pid;
|
|
|
|
nxt_debug(task, "port remove pid %PI handler", pid);
|
|
|
|
rt = task->thread->runtime;
|
|
|
|
nxt_port_rpc_remove_peer(task, msg->port, pid);
|
|
|
|
process = nxt_runtime_process_find(rt, pid);
|
|
|
|
if (process) {
|
|
nxt_process_close_ports(task, process);
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_empty_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
nxt_debug(task, "port empty handler");
|
|
}
|
|
|
|
|
|
typedef struct {
|
|
nxt_work_t work;
|
|
nxt_port_t *port;
|
|
nxt_port_post_handler_t handler;
|
|
} nxt_port_work_t;
|
|
|
|
|
|
static void
|
|
nxt_port_post_handler(nxt_task_t *task, void *obj, void *data)
|
|
{
|
|
nxt_port_t *port;
|
|
nxt_port_work_t *pw;
|
|
nxt_port_post_handler_t handler;
|
|
|
|
pw = obj;
|
|
port = pw->port;
|
|
handler = pw->handler;
|
|
|
|
nxt_free(pw);
|
|
|
|
handler(task, port, data);
|
|
|
|
nxt_port_use(task, port, -1);
|
|
}
|
|
|
|
|
|
nxt_int_t
|
|
nxt_port_post(nxt_task_t *task, nxt_port_t *port,
|
|
nxt_port_post_handler_t handler, void *data)
|
|
{
|
|
nxt_port_work_t *pw;
|
|
|
|
if (task->thread->engine == port->engine) {
|
|
handler(task, port, data);
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
pw = nxt_zalloc(sizeof(nxt_port_work_t));
|
|
|
|
if (nxt_slow_path(pw == NULL)) {
|
|
return NXT_ERROR;
|
|
}
|
|
|
|
nxt_atomic_fetch_add(&port->use_count, 1);
|
|
|
|
pw->work.handler = nxt_port_post_handler;
|
|
pw->work.task = &port->engine->task;
|
|
pw->work.obj = pw;
|
|
pw->work.data = data;
|
|
|
|
pw->port = port;
|
|
pw->handler = handler;
|
|
|
|
nxt_event_engine_post(port->engine, &pw->work);
|
|
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_release_handler(nxt_task_t *task, nxt_port_t *port, void *data)
|
|
{
|
|
/* no op */
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_use(nxt_task_t *task, nxt_port_t *port, int i)
|
|
{
|
|
int c;
|
|
|
|
c = nxt_atomic_fetch_add(&port->use_count, i);
|
|
|
|
if (i < 0 && c == -i) {
|
|
|
|
if (port->engine == NULL || task->thread->engine == port->engine) {
|
|
nxt_port_release(task, port);
|
|
|
|
return;
|
|
}
|
|
|
|
nxt_port_post(task, port, nxt_port_release_handler, NULL);
|
|
}
|
|
}
|