Moving nxt_stream_ident to shared memory.
This aims to avoid stream id clashes after router restart.
This commit is contained in:
@@ -8,7 +8,7 @@
|
|||||||
#include <nxt_port_rpc.h>
|
#include <nxt_port_rpc.h>
|
||||||
|
|
||||||
|
|
||||||
static nxt_atomic_t nxt_stream_ident = 1;
|
static volatile uint32_t *nxt_stream_ident;
|
||||||
|
|
||||||
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
|
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
|
||||||
|
|
||||||
@@ -30,6 +30,29 @@ nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
|
|||||||
nxt_port_rpc_reg_t *reg);
|
nxt_port_rpc_reg_t *reg);
|
||||||
|
|
||||||
|
|
||||||
|
nxt_int_t
|
||||||
|
nxt_port_rpc_init(void)
|
||||||
|
{
|
||||||
|
void *p;
|
||||||
|
|
||||||
|
if (nxt_stream_ident != NULL) {
|
||||||
|
return NXT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
p = nxt_mem_mmap(NULL, sizeof(*nxt_stream_ident), PROT_READ | PROT_WRITE,
|
||||||
|
MAP_ANON | MAP_SHARED, -1, 0);
|
||||||
|
|
||||||
|
if (nxt_slow_path(p == MAP_FAILED)) {
|
||||||
|
return NXT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_stream_ident = p;
|
||||||
|
*nxt_stream_ident = 1;
|
||||||
|
|
||||||
|
return NXT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t
|
static nxt_int_t
|
||||||
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
|
nxt_rpc_reg_test(nxt_lvlhsh_query_t *lhq, void *data)
|
||||||
{
|
{
|
||||||
@@ -105,8 +128,7 @@ nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
|
|||||||
|
|
||||||
nxt_assert(port->pair[0] != -1);
|
nxt_assert(port->pair[0] != -1);
|
||||||
|
|
||||||
stream =
|
stream = nxt_atomic_fetch_add(nxt_stream_ident, 1);
|
||||||
(uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3FFFFFFF;
|
|
||||||
|
|
||||||
reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
|
reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
|
||||||
|
|
||||||
|
|||||||
@@ -11,6 +11,8 @@
|
|||||||
typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task,
|
typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task,
|
||||||
nxt_port_recv_msg_t *msg, void *data);
|
nxt_port_recv_msg_t *msg, void *data);
|
||||||
|
|
||||||
|
nxt_int_t nxt_port_rpc_init(void);
|
||||||
|
|
||||||
uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
||||||
nxt_pid_t peer, void *data);
|
nxt_pid_t peer, void *data);
|
||||||
|
|||||||
@@ -118,6 +118,10 @@ nxt_runtime_create(nxt_task_t *task)
|
|||||||
goto fail;
|
goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nxt_port_rpc_init() != NXT_OK) {
|
||||||
|
goto fail;
|
||||||
|
}
|
||||||
|
|
||||||
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
|
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
|
||||||
nxt_runtime_start, task, rt, NULL);
|
nxt_runtime_start, task, rt, NULL);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user