Libunit: waking another context with the RPC_READY message.
This commit is contained in:
@@ -107,6 +107,8 @@ static int nxt_unit_get_outgoing_buf(nxt_unit_ctx_t *ctx,
|
|||||||
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
|
uint32_t min_size, nxt_unit_mmap_buf_t *mmap_buf, char *local_buf);
|
||||||
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
|
static int nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd);
|
||||||
|
|
||||||
|
static void nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx,
|
||||||
|
nxt_unit_ctx_impl_t *ctx_impl);
|
||||||
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
|
static void nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps);
|
||||||
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
|
nxt_inline void nxt_unit_process_use(nxt_unit_process_t *process);
|
||||||
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
|
nxt_inline void nxt_unit_process_release(nxt_unit_process_t *process);
|
||||||
@@ -988,6 +990,10 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
|||||||
|
|
||||||
switch (port_msg->type) {
|
switch (port_msg->type) {
|
||||||
|
|
||||||
|
case _NXT_PORT_MSG_RPC_READY:
|
||||||
|
rc = NXT_UNIT_OK;
|
||||||
|
break;
|
||||||
|
|
||||||
case _NXT_PORT_MSG_QUIT:
|
case _NXT_PORT_MSG_QUIT:
|
||||||
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
|
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
|
||||||
|
|
||||||
@@ -1068,7 +1074,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
nxt_unit_debug(ctx, "#%"PRIu32": ignore message type: %d",
|
nxt_unit_alert(ctx, "#%"PRIu32": ignore message type: %d",
|
||||||
port_msg->stream, (int) port_msg->type);
|
port_msg->stream, (int) port_msg->type);
|
||||||
|
|
||||||
rc = NXT_UNIT_ERROR;
|
rc = NXT_UNIT_ERROR;
|
||||||
@@ -4012,12 +4018,40 @@ nxt_unit_incoming_mmap(nxt_unit_ctx_t *ctx, pid_t pid, int fd)
|
|||||||
|
|
||||||
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
|
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
|
||||||
|
|
||||||
|
nxt_unit_awake_ctx(ctx, ctx_impl);
|
||||||
|
|
||||||
} nxt_queue_loop;
|
} nxt_queue_loop;
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_unit_awake_ctx(nxt_unit_ctx_t *ctx, nxt_unit_ctx_impl_t *ctx_impl)
|
||||||
|
{
|
||||||
|
nxt_port_msg_t msg;
|
||||||
|
|
||||||
|
if (nxt_fast_path(ctx == &ctx_impl->ctx)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nxt_slow_path(ctx_impl->read_port == NULL
|
||||||
|
|| ctx_impl->read_port->out_fd == -1))
|
||||||
|
{
|
||||||
|
nxt_unit_alert(ctx, "target context read_port is NULL or not writable");
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(&msg, 0, sizeof(nxt_port_msg_t));
|
||||||
|
|
||||||
|
msg.type = _NXT_PORT_MSG_RPC_READY;
|
||||||
|
|
||||||
|
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
|
||||||
|
&msg, sizeof(msg), NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
|
nxt_unit_mmaps_init(nxt_unit_mmaps_t *mmaps)
|
||||||
{
|
{
|
||||||
@@ -5390,6 +5424,8 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
|
|||||||
|
|
||||||
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
|
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
|
||||||
|
|
||||||
|
nxt_unit_awake_ctx(ctx, ctx_impl);
|
||||||
|
|
||||||
} nxt_queue_loop;
|
} nxt_queue_loop;
|
||||||
|
|
||||||
return old_port;
|
return old_port;
|
||||||
|
|||||||
Reference in New Issue
Block a user