Libunit: gracefully quitting a multicontext application.
This commit is contained in:
@@ -305,6 +305,8 @@ struct nxt_unit_ctx_impl_s {
|
||||
/* of nxt_unit_read_buf_t */
|
||||
nxt_queue_t free_rbuf;
|
||||
|
||||
int online;
|
||||
|
||||
nxt_unit_mmap_buf_t ctx_buf[2];
|
||||
nxt_unit_read_buf_t ctx_read_buf;
|
||||
|
||||
@@ -354,7 +356,6 @@ struct nxt_unit_impl_s {
|
||||
|
||||
pid_t pid;
|
||||
int log_fd;
|
||||
int online;
|
||||
|
||||
nxt_unit_ctx_impl_t main_ctx;
|
||||
};
|
||||
@@ -561,7 +562,6 @@ nxt_unit_create(nxt_unit_init_t *init)
|
||||
lib->ports.slot = NULL;
|
||||
|
||||
lib->log_fd = STDERR_FILENO;
|
||||
lib->online = 1;
|
||||
|
||||
nxt_queue_init(&lib->contexts);
|
||||
|
||||
@@ -615,10 +615,15 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
||||
|
||||
nxt_unit_lib_use(lib);
|
||||
|
||||
pthread_mutex_lock(&lib->mutex);
|
||||
|
||||
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
|
||||
|
||||
pthread_mutex_unlock(&lib->mutex);
|
||||
|
||||
ctx_impl->use_count = 1;
|
||||
ctx_impl->wait_items = 0;
|
||||
ctx_impl->online = 1;
|
||||
|
||||
nxt_queue_init(&ctx_impl->free_req);
|
||||
nxt_queue_init(&ctx_impl->free_ws);
|
||||
@@ -1119,7 +1124,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
if (new_port_msg->id == (nxt_port_id_t) -1) {
|
||||
if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
|
||||
nxt_unit_port_id_init(&new_port.id, lib->pid, new_port_msg->id);
|
||||
|
||||
new_port.in_fd = recv_msg->fd[0];
|
||||
@@ -1161,7 +1166,7 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
if (new_port_msg->id == (nxt_port_id_t) -1) {
|
||||
if (new_port_msg->id == NXT_UNIT_SHARED_PORT_ID) {
|
||||
lib->shared_port = port;
|
||||
|
||||
} else {
|
||||
@@ -4408,18 +4413,20 @@ nxt_unit_process_pop_first(nxt_unit_impl_t *lib)
|
||||
int
|
||||
nxt_unit_run(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
int rc;
|
||||
nxt_unit_impl_t *lib;
|
||||
int rc;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
nxt_unit_ctx_use(ctx);
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
rc = NXT_UNIT_OK;
|
||||
|
||||
while (nxt_fast_path(lib->online)) {
|
||||
while (nxt_fast_path(ctx_impl->online)) {
|
||||
rc = nxt_unit_run_once_impl(ctx);
|
||||
|
||||
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
|
||||
nxt_unit_quit(ctx);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -4696,18 +4703,16 @@ int
|
||||
nxt_unit_run_ctx(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
int rc;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_read_buf_t *rbuf;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
nxt_unit_ctx_use(ctx);
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
rc = NXT_UNIT_OK;
|
||||
|
||||
while (nxt_fast_path(lib->online)) {
|
||||
while (nxt_fast_path(ctx_impl->online)) {
|
||||
rbuf = nxt_unit_read_buf_get(ctx);
|
||||
if (nxt_slow_path(rbuf == NULL)) {
|
||||
rc = NXT_UNIT_ERROR;
|
||||
@@ -4802,13 +4807,16 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
|
||||
int rc;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_read_buf_t *rbuf;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
nxt_unit_ctx_use(ctx);
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
rc = NXT_UNIT_OK;
|
||||
|
||||
while (nxt_fast_path(lib->online)) {
|
||||
while (nxt_fast_path(ctx_impl->online)) {
|
||||
rbuf = nxt_unit_read_buf_get(ctx);
|
||||
if (nxt_slow_path(rbuf == NULL)) {
|
||||
rc = NXT_UNIT_ERROR;
|
||||
@@ -4867,6 +4875,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
int rc;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_read_buf_t *rbuf;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
rbuf = nxt_unit_read_buf_get(ctx);
|
||||
if (nxt_slow_path(rbuf == NULL)) {
|
||||
@@ -4874,6 +4883,7 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
}
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
retry:
|
||||
|
||||
@@ -4906,7 +4916,7 @@ retry:
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
if (lib->online) {
|
||||
if (ctx_impl->online) {
|
||||
goto retry;
|
||||
}
|
||||
|
||||
@@ -4950,14 +4960,14 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
|
||||
|
||||
queue_fd = -1;
|
||||
|
||||
port = nxt_unit_create_port(ctx);
|
||||
port = nxt_unit_create_port(&new_ctx->ctx);
|
||||
if (nxt_slow_path(port == NULL)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
new_ctx->read_port = port;
|
||||
|
||||
queue_fd = nxt_unit_shm_open(ctx, sizeof(nxt_port_queue_t));
|
||||
queue_fd = nxt_unit_shm_open(&new_ctx->ctx, sizeof(nxt_port_queue_t));
|
||||
if (nxt_slow_path(queue_fd == -1)) {
|
||||
goto fail;
|
||||
}
|
||||
@@ -4976,7 +4986,7 @@ nxt_unit_ctx_alloc(nxt_unit_ctx_t *ctx, void *data)
|
||||
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
|
||||
port_impl->queue = mem;
|
||||
|
||||
rc = nxt_unit_send_port(ctx, lib->router_port, port, queue_fd);
|
||||
rc = nxt_unit_send_port(&new_ctx->ctx, lib->router_port, port, queue_fd);
|
||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||
goto fail;
|
||||
}
|
||||
@@ -5041,8 +5051,12 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
|
||||
|
||||
pthread_mutex_destroy(&ctx_impl->mutex);
|
||||
|
||||
pthread_mutex_lock(&lib->mutex);
|
||||
|
||||
nxt_queue_remove(&ctx_impl->link);
|
||||
|
||||
pthread_mutex_unlock(&lib->mutex);
|
||||
|
||||
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
|
||||
nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
|
||||
nxt_unit_port_release(ctx_impl->read_port);
|
||||
@@ -5229,7 +5243,7 @@ nxt_inline void nxt_unit_port_release(nxt_unit_port_t *port)
|
||||
}
|
||||
|
||||
if (port_impl->queue != NULL) {
|
||||
munmap(port_impl->queue, (port->id.id == (nxt_port_id_t) -1)
|
||||
munmap(port_impl->queue, (port->id.id == NXT_UNIT_SHARED_PORT_ID)
|
||||
? sizeof(nxt_app_queue_t)
|
||||
: sizeof(nxt_port_queue_t));
|
||||
}
|
||||
@@ -5346,7 +5360,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port, void *queue)
|
||||
goto unlock;
|
||||
}
|
||||
|
||||
if (port->id.id >= process->next_port_id) {
|
||||
if (port->id.id != NXT_UNIT_SHARED_PORT_ID
|
||||
&& port->id.id >= process->next_port_id)
|
||||
{
|
||||
process->next_port_id = port->id.id + 1;
|
||||
}
|
||||
|
||||
@@ -5514,17 +5530,49 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
|
||||
static void
|
||||
nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_port_msg_t msg;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
if (lib->online) {
|
||||
lib->online = 0;
|
||||
|
||||
if (lib->callbacks.quit != NULL) {
|
||||
lib->callbacks.quit(ctx);
|
||||
}
|
||||
if (!ctx_impl->online) {
|
||||
return;
|
||||
}
|
||||
|
||||
ctx_impl->online = 0;
|
||||
|
||||
if (lib->callbacks.quit != NULL) {
|
||||
lib->callbacks.quit(ctx);
|
||||
}
|
||||
|
||||
if (ctx != &lib->main_ctx.ctx) {
|
||||
return;
|
||||
}
|
||||
|
||||
memset(&msg, 0, sizeof(nxt_port_msg_t));
|
||||
|
||||
msg.pid = lib->pid;
|
||||
msg.type = _NXT_PORT_MSG_QUIT;
|
||||
|
||||
pthread_mutex_lock(&lib->mutex);
|
||||
|
||||
nxt_queue_each(ctx_impl, &lib->contexts, nxt_unit_ctx_impl_t, link) {
|
||||
|
||||
if (ctx == &ctx_impl->ctx
|
||||
|| ctx_impl->read_port == NULL
|
||||
|| ctx_impl->read_port->out_fd == -1)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
|
||||
&msg, sizeof(msg), NULL, 0);
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
pthread_mutex_unlock(&lib->mutex);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -34,6 +34,8 @@ enum {
|
||||
|
||||
#define NXT_UNIT_INIT_ENV "NXT_UNIT_INIT"
|
||||
|
||||
#define NXT_UNIT_SHARED_PORT_ID ((uint16_t) 0xFFFFu)
|
||||
|
||||
/*
|
||||
* Mostly opaque structure with library state.
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user