441 lines
10 KiB
C
441 lines
10 KiB
C
|
|
/*
|
|
* Copyright (C) Max Romanov
|
|
* Copyright (C) NGINX, Inc.
|
|
*/
|
|
|
|
#include <nxt_main.h>
|
|
#include <nxt_port_rpc.h>
|
|
|
|
|
|
static nxt_atomic_t nxt_stream_ident = 1;
|
|
|
|
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
|
|
|
|
struct nxt_port_rpc_reg_s {
|
|
uint32_t stream;
|
|
|
|
nxt_pid_t peer;
|
|
nxt_queue_link_t link;
|
|
nxt_bool_t link_first;
|
|
|
|
nxt_port_rpc_handler_t ready_handler;
|
|
nxt_port_rpc_handler_t error_handler;
|
|
void *data;
|
|
};
|
|
|
|
|
|
static nxt_int_t
|
|
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
|
|
{
|
|
return NXT_OK;
|
|
}
|
|
|
|
|
|
static const nxt_lvlhsh_proto_t lvlhsh_rpc_reg_proto nxt_aligned(64) = {
|
|
NXT_LVLHSH_DEFAULT,
|
|
nxt_rpc_reg_test,
|
|
nxt_lvlhsh_alloc,
|
|
nxt_lvlhsh_free,
|
|
};
|
|
|
|
|
|
nxt_inline void
|
|
nxt_port_rpc_lhq_stream(nxt_lvlhsh_query_t *lhq, uint32_t *stream)
|
|
{
|
|
lhq->key_hash = nxt_murmur_hash2(stream, sizeof(*stream));
|
|
lhq->key.length = sizeof(*stream);
|
|
lhq->key.start = (u_char *) stream;
|
|
lhq->proto = &lvlhsh_rpc_reg_proto;
|
|
}
|
|
|
|
|
|
nxt_inline void
|
|
nxt_port_rpc_lhq_peer(nxt_lvlhsh_query_t *lhq, nxt_pid_t *peer)
|
|
{
|
|
lhq->key_hash = nxt_murmur_hash2(peer, sizeof(*peer));
|
|
lhq->key.length = sizeof(*peer);
|
|
lhq->key.start = (u_char *) peer;
|
|
lhq->proto = &lvlhsh_rpc_reg_proto;
|
|
}
|
|
|
|
|
|
uint32_t
|
|
nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
|
nxt_pid_t peer, void *data)
|
|
{
|
|
void *ex;
|
|
nxt_port_rpc_reg_t *reg;
|
|
|
|
ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
|
|
error_handler, 0);
|
|
|
|
if (ex == NULL) {
|
|
return 0;
|
|
}
|
|
|
|
if (peer != -1) {
|
|
nxt_port_rpc_ex_set_peer(task, port, ex, peer);
|
|
}
|
|
|
|
reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
|
|
|
|
nxt_assert(reg->data == ex);
|
|
|
|
reg->data = data;
|
|
|
|
return reg->stream;
|
|
}
|
|
|
|
|
|
void *
|
|
nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
|
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
|
size_t ex_size)
|
|
{
|
|
uint32_t stream;
|
|
nxt_port_rpc_reg_t *reg;
|
|
nxt_lvlhsh_query_t lhq;
|
|
|
|
nxt_assert(port->pair[0] != -1);
|
|
|
|
stream =
|
|
(uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff;
|
|
|
|
reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
|
|
|
|
if (nxt_slow_path(reg == NULL)) {
|
|
nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
reg->stream = stream;
|
|
reg->peer = -1;
|
|
reg->ready_handler = ready_handler;
|
|
reg->error_handler = error_handler;
|
|
reg->data = reg + 1;
|
|
|
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
|
lhq.replace = 0;
|
|
lhq.value = reg;
|
|
lhq.pool = port->mem_pool;
|
|
|
|
switch (nxt_lvlhsh_insert(&port->rpc_streams, &lhq)) {
|
|
|
|
case NXT_OK:
|
|
break;
|
|
|
|
default:
|
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
|
|
"reg ", stream);
|
|
|
|
nxt_mp_free(port->mem_pool, reg);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
nxt_debug(task, "rpc: stream #%uD registered", stream);
|
|
|
|
return reg->data;
|
|
}
|
|
|
|
|
|
uint32_t
|
|
nxt_port_rpc_ex_stream(void *ex)
|
|
{
|
|
nxt_port_rpc_reg_t *reg;
|
|
|
|
reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
|
|
|
|
nxt_assert(reg->data == ex);
|
|
|
|
return reg->stream;
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
|
|
void *ex, nxt_pid_t peer)
|
|
{
|
|
nxt_int_t ret;
|
|
nxt_queue_link_t *peer_link;
|
|
nxt_port_rpc_reg_t *reg;
|
|
nxt_lvlhsh_query_t lhq;
|
|
|
|
reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
|
|
|
|
nxt_assert(reg->data == ex);
|
|
|
|
if (peer == -1 || reg->peer != -1) {
|
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
|
|
"change peer %PI->%PI", reg->stream, reg->peer, peer);
|
|
|
|
return;
|
|
}
|
|
|
|
reg->peer = peer;
|
|
|
|
nxt_port_rpc_lhq_peer(&lhq, &peer);
|
|
lhq.replace = 0;
|
|
lhq.value = ®->link;
|
|
lhq.pool = port->mem_pool;
|
|
|
|
ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
|
|
|
|
switch (ret) {
|
|
|
|
case NXT_OK:
|
|
reg->link_first = 1;
|
|
nxt_queue_self(®->link);
|
|
|
|
nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
|
|
reg->stream, reg->peer, reg->link.next);
|
|
break;
|
|
|
|
case NXT_DECLINED:
|
|
reg->link_first = 0;
|
|
peer_link = lhq.value;
|
|
nxt_queue_insert_after(peer_link, ®->link);
|
|
|
|
nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
|
|
reg->stream, reg->peer, reg->link.next);
|
|
break;
|
|
|
|
default:
|
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
|
|
"peer for stream #%uD (%d)", reg->stream, ret);
|
|
|
|
reg->peer = -1;
|
|
break;
|
|
}
|
|
|
|
}
|
|
|
|
|
|
static void
|
|
nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
|
|
nxt_port_rpc_reg_t *reg)
|
|
{
|
|
uint32_t stream;
|
|
nxt_int_t ret;
|
|
nxt_lvlhsh_query_t lhq;
|
|
nxt_port_rpc_reg_t *r;
|
|
|
|
stream = reg->stream;
|
|
|
|
if (reg->link_first != 0) {
|
|
nxt_port_rpc_lhq_peer(&lhq, ®->peer);
|
|
lhq.pool = port->mem_pool;
|
|
|
|
if (reg->link.next == ®->link) {
|
|
nxt_assert(reg->link.prev == ®->link);
|
|
|
|
nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
|
|
"registration (%p)", stream, reg->peer, reg->link.next);
|
|
|
|
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
|
} else {
|
|
nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
|
|
"registration (%p)", stream, reg->peer, reg->link.next);
|
|
|
|
lhq.replace = 1;
|
|
lhq.value = reg->link.next;
|
|
|
|
r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
|
|
r->link_first = 1;
|
|
|
|
nxt_queue_remove(®->link);
|
|
|
|
ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
|
|
}
|
|
} else {
|
|
nxt_debug(task, "rpc: stream #%uD remove pid %PI "
|
|
"registration (%p)", stream, reg->peer, reg->link.next);
|
|
|
|
nxt_queue_remove(®->link);
|
|
ret = NXT_OK;
|
|
}
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
|
|
" to delete peer %PI (%d)", stream, reg->peer, ret);
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|
{
|
|
uint8_t last;
|
|
uint32_t stream;
|
|
nxt_int_t ret;
|
|
nxt_port_t *port;
|
|
nxt_port_rpc_reg_t *reg;
|
|
nxt_lvlhsh_query_t lhq;
|
|
nxt_port_msg_type_t type;
|
|
|
|
stream = msg->port_msg.stream;
|
|
port = msg->port;
|
|
last = msg->port_msg.last;
|
|
type = msg->port_msg.type;
|
|
|
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
|
lhq.pool = port->mem_pool;
|
|
|
|
if (last != 0) {
|
|
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
|
|
|
|
} else {
|
|
ret = nxt_lvlhsh_find(&port->rpc_streams, &lhq);
|
|
}
|
|
|
|
if (ret != NXT_OK) {
|
|
nxt_debug(task, "rpc: stream #%uD no handler found", stream);
|
|
|
|
return;
|
|
}
|
|
|
|
nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
|
|
(last ? "last " : ""), type);
|
|
|
|
reg = lhq.value;
|
|
|
|
if (reg->peer != -1) {
|
|
nxt_assert(reg->peer == msg->port_msg.pid);
|
|
}
|
|
|
|
if (type == _NXT_PORT_MSG_RPC_ERROR) {
|
|
reg->error_handler(task, msg, reg->data);
|
|
|
|
} else {
|
|
reg->ready_handler(task, msg, reg->data);
|
|
}
|
|
|
|
if (last == 0) {
|
|
return;
|
|
}
|
|
|
|
if (reg->peer != -1) {
|
|
nxt_port_rpc_remove_from_peers(task, port, reg);
|
|
}
|
|
|
|
nxt_debug(task, "rpc: stream #%uD free registration", stream);
|
|
|
|
nxt_mp_free(port->mem_pool, reg);
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
|
|
{
|
|
uint8_t last;
|
|
uint32_t stream;
|
|
nxt_int_t ret;
|
|
nxt_buf_t buf;
|
|
nxt_queue_link_t *peer_link, *next_link;
|
|
nxt_port_rpc_reg_t *reg;
|
|
nxt_lvlhsh_query_t lhq;
|
|
nxt_port_recv_msg_t msg;
|
|
|
|
nxt_port_rpc_lhq_peer(&lhq, &peer);
|
|
lhq.pool = port->mem_pool;
|
|
|
|
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
nxt_debug(task, "rpc: no reg found for peer %PI", peer);
|
|
|
|
return;
|
|
}
|
|
|
|
nxt_memzero(&msg, sizeof(msg));
|
|
nxt_memzero(&buf, sizeof(buf));
|
|
|
|
msg.fd = -1;
|
|
msg.buf = &buf;
|
|
msg.port = port;
|
|
|
|
msg.port_msg.pid = peer;
|
|
msg.port_msg.type = _NXT_PORT_MSG_REMOVE_PID;
|
|
msg.port_msg.last = 1;
|
|
|
|
peer_link = lhq.value;
|
|
last = 0;
|
|
|
|
while (last == 0) {
|
|
|
|
reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
|
|
|
|
nxt_assert(reg->peer == peer);
|
|
|
|
stream = reg->stream;
|
|
|
|
nxt_debug(task, "rpc: stream #%uD trigger error", stream);
|
|
|
|
msg.port_msg.stream = stream;
|
|
|
|
reg->error_handler(task, &msg, reg->data);
|
|
|
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
|
lhq.pool = port->mem_pool;
|
|
|
|
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
|
|
|
|
if (nxt_slow_path(ret != NXT_OK)) {
|
|
nxt_log_error(NXT_LOG_ERR, task->log,
|
|
"rpc: stream #%uD failed to delete handler", stream);
|
|
|
|
return;
|
|
}
|
|
|
|
if (peer_link == peer_link->next) {
|
|
nxt_assert(peer_link->prev == peer_link);
|
|
|
|
last = 1;
|
|
|
|
} else {
|
|
nxt_assert(peer_link->next->prev == peer_link);
|
|
nxt_assert(peer_link->prev->next == peer_link);
|
|
|
|
next_link = peer_link->next;
|
|
nxt_queue_remove(peer_link);
|
|
|
|
peer_link = next_link;
|
|
}
|
|
|
|
nxt_mp_free(port->mem_pool, reg);
|
|
}
|
|
}
|
|
|
|
|
|
void
|
|
nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
|
|
{
|
|
nxt_int_t ret;
|
|
nxt_port_rpc_reg_t *reg;
|
|
nxt_lvlhsh_query_t lhq;
|
|
|
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
|
lhq.pool = port->mem_pool;
|
|
|
|
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
|
|
|
|
if (ret != NXT_OK) {
|
|
nxt_debug(task, "rpc: stream #%uD no handler found", stream);
|
|
|
|
return;
|
|
}
|
|
|
|
reg = lhq.value;
|
|
|
|
if (reg->peer != -1) {
|
|
nxt_port_rpc_remove_from_peers(task, port, reg);
|
|
}
|
|
|
|
nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
|
|
|
|
nxt_mp_free(port->mem_pool, reg);
|
|
}
|