RPC: peer pid special value -1 may be used if pid is unknown.

This commit is contained in:
Max Romanov
2017-08-11 18:04:04 +03:00
parent 1b354421c3
commit 162afe4719

View File

@@ -107,27 +107,28 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
return 0; return 0;
} }
if (peer != -1) {
nxt_port_rpc_lhq_peer(&lhq, &peer);
lhq.replace = 0;
lhq.value = &reg->link;
lhq.pool = port->mem_pool;
nxt_port_rpc_lhq_peer(&lhq, &peer); switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) {
lhq.replace = 0;
lhq.value = &reg->link;
lhq.pool = port->mem_pool;
switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) { case NXT_OK:
nxt_queue_self(&reg->link);
break;
case NXT_OK: case NXT_DECLINED:
nxt_queue_self(&reg->link); peer_link = lhq.value;
break; nxt_queue_insert_before(peer_link, &reg->link);
break;
case NXT_DECLINED: default:
peer_link = lhq.value; nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
nxt_queue_insert_before(peer_link, &reg->link); "for stream #%uD", stream);
break; break;
}
default:
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
"for stream #%uD", stream);
break;
} }
return stream; return stream;
@@ -169,7 +170,9 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
reg = lhq.value; reg = lhq.value;
nxt_assert(reg->peer == msg->port_msg.pid); if (reg->peer != -1) {
nxt_assert(reg->peer == msg->port_msg.pid);
}
if (type == _NXT_PORT_MSG_RPC_ERROR) { if (type == _NXT_PORT_MSG_RPC_ERROR) {
reg->error_handler(task, msg, reg->data); reg->error_handler(task, msg, reg->data);
@@ -183,18 +186,20 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
return; return;
} }
if (reg->link.next == &reg->link) { if (reg->peer != -1) {
nxt_port_rpc_lhq_peer(&lhq, &reg->peer); if (reg->link.next == &reg->link) {
lhq.pool = port->mem_pool; nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
lhq.pool = port->mem_pool;
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
"peer %PI", reg->peer); "peer %PI", reg->peer);
}
} else {
nxt_queue_remove(&reg->link);
} }
} else {
nxt_queue_remove(&reg->link);
} }
nxt_mp_free(port->mem_pool, reg); nxt_mp_free(port->mem_pool, reg);
@@ -296,18 +301,20 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
reg = lhq.value; reg = lhq.value;
if (reg->link.next == &reg->link) { if (reg->peer != -1) {
nxt_port_rpc_lhq_peer(&lhq, &reg->peer); if (reg->link.next == &reg->link) {
lhq.pool = port->mem_pool; nxt_port_rpc_lhq_peer(&lhq, &reg->peer);
lhq.pool = port->mem_pool;
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq); ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete " nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
"peer %PI", reg->peer); "peer %PI", reg->peer);
}
} else {
nxt_queue_remove(&reg->link);
} }
} else {
nxt_queue_remove(&reg->link);
} }
nxt_mp_free(port->mem_pool, reg); nxt_mp_free(port->mem_pool, reg);