Using port 'post' facility to proxy remove pid message to workers.
Remove pid proxying to worker engines implementation was originally overcomplicated. Memory pool and 2 engine posts (there and back again) are optimized out and replaced with band new nxt_port_post() call.
This commit is contained in:
@@ -189,7 +189,7 @@ nxt_controller_process_new_port_handler(nxt_task_t *task,
|
|||||||
|
|
||||||
nxt_port_new_port_handler(task, msg);
|
nxt_port_new_port_handler(task, msg);
|
||||||
|
|
||||||
if (msg->new_port->type != NXT_PROCESS_ROUTER) {
|
if (msg->u.new_port->type != NXT_PROCESS_ROUTER) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -283,7 +283,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
nxt_port_write_enable(task, port);
|
nxt_port_write_enable(task, port);
|
||||||
|
|
||||||
msg->new_port = port;
|
msg->u.new_port = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -441,6 +441,8 @@ nxt_port_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
|
nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
|
||||||
|
|
||||||
|
msg->u.removed_pid = pid;
|
||||||
|
|
||||||
nxt_debug(task, "port remove pid %PI handler", pid);
|
nxt_debug(task, "port remove pid %PI handler", pid);
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
|
|||||||
@@ -127,7 +127,11 @@ struct nxt_port_recv_msg_s {
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_port_msg_t port_msg;
|
nxt_port_msg_t port_msg;
|
||||||
size_t size;
|
size_t size;
|
||||||
nxt_port_t *new_port;
|
union {
|
||||||
|
nxt_port_t *new_port;
|
||||||
|
nxt_pid_t removed_pid;
|
||||||
|
void *data;
|
||||||
|
} u;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct nxt_app_s nxt_app_t;
|
typedef struct nxt_app_s nxt_app_t;
|
||||||
|
|||||||
104
src/nxt_router.c
104
src/nxt_router.c
@@ -62,23 +62,11 @@ typedef struct {
|
|||||||
} nxt_socket_rpc_t;
|
} nxt_socket_rpc_t;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
nxt_mp_t *mem_pool;
|
|
||||||
nxt_port_recv_msg_t msg;
|
|
||||||
nxt_work_t work;
|
|
||||||
} nxt_remove_pid_msg_t;
|
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
|
static nxt_int_t nxt_router_start_worker(nxt_task_t *task, nxt_app_t *app);
|
||||||
|
|
||||||
static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
|
static void nxt_router_ra_error(nxt_task_t *task, nxt_req_app_link_t *ra,
|
||||||
int code, const char* str);
|
int code, const char* str);
|
||||||
|
|
||||||
static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
|
|
||||||
void *data);
|
|
||||||
static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
|
|
||||||
void *data);
|
|
||||||
|
|
||||||
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
|
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
|
||||||
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_router_conf_ready(nxt_task_t *task,
|
static void nxt_router_conf_ready(nxt_task_t *task,
|
||||||
@@ -572,7 +560,9 @@ nxt_router_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msg->new_port == NULL || msg->new_port->type != NXT_PROCESS_WORKER) {
|
if (msg->u.new_port == NULL ||
|
||||||
|
msg->u.new_port->type != NXT_PROCESS_WORKER)
|
||||||
|
{
|
||||||
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -621,13 +611,24 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_worker_remove_pid(nxt_task_t *task, nxt_port_t *port, void *data)
|
||||||
|
{
|
||||||
|
union {
|
||||||
|
nxt_pid_t removed_pid;
|
||||||
|
void *data;
|
||||||
|
} u;
|
||||||
|
|
||||||
|
u.data = data;
|
||||||
|
|
||||||
|
nxt_port_rpc_remove_peer(task, port, u.removed_pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
nxt_mp_t *mp;
|
nxt_event_engine_t *engine;
|
||||||
nxt_buf_t *buf;
|
|
||||||
nxt_event_engine_t *engine;
|
|
||||||
nxt_remove_pid_msg_t *rp;
|
|
||||||
|
|
||||||
nxt_port_remove_pid_handler(task, msg);
|
nxt_port_remove_pid_handler(task, msg);
|
||||||
|
|
||||||
@@ -635,82 +636,19 @@ nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
mp = nxt_mp_create(1024, 128, 256, 32);
|
|
||||||
|
|
||||||
buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0);
|
|
||||||
buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos,
|
|
||||||
nxt_buf_used_size(msg->buf));
|
|
||||||
|
|
||||||
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
|
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
|
||||||
{
|
{
|
||||||
rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t));
|
nxt_port_post(task, engine->port, nxt_router_worker_remove_pid,
|
||||||
|
msg->u.data);
|
||||||
rp->mem_pool = mp;
|
|
||||||
|
|
||||||
rp->msg.fd = msg->fd;
|
|
||||||
rp->msg.buf = buf;
|
|
||||||
rp->msg.port = engine->port;
|
|
||||||
rp->msg.port_msg = msg->port_msg;
|
|
||||||
rp->msg.size = msg->size;
|
|
||||||
rp->msg.new_port = NULL;
|
|
||||||
|
|
||||||
rp->work.handler = nxt_router_worker_remove_pid_handler;
|
|
||||||
rp->work.task = &engine->task;
|
|
||||||
rp->work.obj = rp;
|
|
||||||
rp->work.data = task->thread->engine;
|
|
||||||
rp->work.next = NULL;
|
|
||||||
|
|
||||||
nxt_event_engine_post(engine, &rp->work);
|
|
||||||
}
|
}
|
||||||
nxt_queue_loop;
|
nxt_queue_loop;
|
||||||
|
|
||||||
nxt_mp_release(mp, NULL);
|
|
||||||
|
|
||||||
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
||||||
|
|
||||||
nxt_port_rpc_handler(task, msg);
|
nxt_port_rpc_handler(task, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
|
|
||||||
{
|
|
||||||
nxt_pid_t pid;
|
|
||||||
nxt_buf_t *buf;
|
|
||||||
nxt_event_engine_t *engine;
|
|
||||||
nxt_remove_pid_msg_t *rp;
|
|
||||||
|
|
||||||
rp = obj;
|
|
||||||
|
|
||||||
buf = rp->msg.buf;
|
|
||||||
|
|
||||||
nxt_assert(nxt_buf_used_size(buf) == sizeof(pid));
|
|
||||||
|
|
||||||
nxt_memcpy(&pid, buf->mem.pos, sizeof(pid));
|
|
||||||
|
|
||||||
nxt_port_rpc_remove_peer(task, rp->msg.port, pid);
|
|
||||||
|
|
||||||
engine = rp->work.data;
|
|
||||||
|
|
||||||
rp->work.handler = nxt_router_worker_remove_pid_done;
|
|
||||||
rp->work.task = &engine->task;
|
|
||||||
rp->work.next = NULL;
|
|
||||||
|
|
||||||
nxt_event_engine_post(engine, &rp->work);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void
|
|
||||||
nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data)
|
|
||||||
{
|
|
||||||
nxt_remove_pid_msg_t *rp;
|
|
||||||
|
|
||||||
rp = obj;
|
|
||||||
|
|
||||||
nxt_mp_release(rp->mem_pool, rp);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static nxt_router_temp_conf_t *
|
static nxt_router_temp_conf_t *
|
||||||
nxt_router_temp_conf(nxt_task_t *task)
|
nxt_router_temp_conf(nxt_task_t *task)
|
||||||
{
|
{
|
||||||
@@ -2528,7 +2466,7 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
|||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
|
|
||||||
app = data;
|
app = data;
|
||||||
port = msg->new_port;
|
port = msg->u.new_port;
|
||||||
|
|
||||||
nxt_assert(app != NULL);
|
nxt_assert(app != NULL);
|
||||||
nxt_assert(port != NULL);
|
nxt_assert(port != NULL);
|
||||||
|
|||||||
Reference in New Issue
Block a user