Moving request limit control to libunit.
Introducting application graceful stop. For now only used when application process reach request limit value. This closes #585 issue on GitHub.
This commit is contained in:
@@ -50,6 +50,13 @@ NGINX Unit updated to 1.26.0.
|
||||
</para>
|
||||
</change>
|
||||
|
||||
<change type="bugfix">
|
||||
<para>
|
||||
the router and app processes could crash when reaching requests limit
|
||||
in asynchronous or multithreaded apps.
|
||||
</para>
|
||||
</change>
|
||||
|
||||
</changes>
|
||||
|
||||
|
||||
|
||||
@@ -120,7 +120,8 @@ func nxt_go_add_port(ctx *C.nxt_unit_ctx_t, p *C.nxt_unit_port_t) C.int {
|
||||
}
|
||||
|
||||
//export nxt_go_remove_port
|
||||
func nxt_go_remove_port(unit *C.nxt_unit_t, p *C.nxt_unit_port_t) {
|
||||
func nxt_go_remove_port(unit *C.nxt_unit_t, ctx *C.nxt_unit_ctx_t,
|
||||
p *C.nxt_unit_port_t) {
|
||||
|
||||
key := port_key{
|
||||
pid: int(p.id.pid),
|
||||
|
||||
@@ -519,11 +519,11 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
|
||||
|
||||
void
|
||||
Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
|
||||
Unit::remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
{
|
||||
port_data_t *data;
|
||||
|
||||
if (port->data != NULL) {
|
||||
if (port->data != NULL && ctx != NULL) {
|
||||
data = (port_data_t *) port->data;
|
||||
|
||||
data->stop();
|
||||
|
||||
@@ -41,7 +41,8 @@ private:
|
||||
void shm_ack_handler(nxt_unit_ctx_t *ctx);
|
||||
|
||||
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
|
||||
static void remove_port(nxt_unit_t *unit, nxt_unit_port_t *port);
|
||||
static void remove_port(nxt_unit_t *unit, nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_t *port);
|
||||
|
||||
static void quit_cb(nxt_unit_ctx_t *ctx);
|
||||
void quit(nxt_unit_ctx_t *ctx);
|
||||
|
||||
@@ -687,7 +687,8 @@ nxt_app_parse_type(u_char *p, size_t length)
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
|
||||
nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init,
|
||||
nxt_common_app_conf_t *conf)
|
||||
{
|
||||
nxt_port_t *my_port, *main_port, *router_port;
|
||||
nxt_runtime_t *rt;
|
||||
@@ -730,5 +731,8 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
|
||||
|
||||
init->log_fd = 2;
|
||||
|
||||
init->shm_limit = conf->shm_limit;
|
||||
init->request_limit = conf->request_limit;
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
@@ -101,6 +101,7 @@ struct nxt_common_app_conf_s {
|
||||
nxt_conf_value_t *limits;
|
||||
|
||||
size_t shm_limit;
|
||||
uint32_t request_limit;
|
||||
|
||||
union {
|
||||
nxt_external_app_conf_t external;
|
||||
@@ -137,7 +138,7 @@ NXT_EXPORT extern nxt_str_t nxt_server;
|
||||
extern nxt_app_module_t nxt_external_module;
|
||||
|
||||
NXT_EXPORT nxt_int_t nxt_unit_default_init(nxt_task_t *task,
|
||||
nxt_unit_init_t *init);
|
||||
nxt_unit_init_t *init, nxt_common_app_conf_t *conf);
|
||||
|
||||
|
||||
#endif /* _NXT_APPLICATION_H_INCLIDED_ */
|
||||
|
||||
@@ -113,13 +113,13 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
"%PI,%ud,%d;"
|
||||
"%PI,%ud,%d;"
|
||||
"%PI,%ud,%d,%d;"
|
||||
"%d,%z,%Z",
|
||||
"%d,%z,%uD,%Z",
|
||||
NXT_VERSION, my_port->process->stream,
|
||||
main_port->pid, main_port->id, main_port->pair[1],
|
||||
router_port->pid, router_port->id, router_port->pair[1],
|
||||
my_port->pid, my_port->id, my_port->pair[0],
|
||||
my_port->pair[1],
|
||||
2, conf->shm_limit);
|
||||
2, conf->shm_limit, conf->request_limit);
|
||||
|
||||
if (nxt_slow_path(p == end)) {
|
||||
nxt_alert(task, "internal error: buffer too small for NXT_UNIT_INIT");
|
||||
|
||||
@@ -428,7 +428,7 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_unit_default_init(task, &java_init);
|
||||
nxt_unit_default_init(task, &java_init, app_conf);
|
||||
|
||||
java_init.callbacks.request_handler = nxt_java_request_handler;
|
||||
java_init.callbacks.websocket_handler = nxt_java_websocket_handler;
|
||||
@@ -437,7 +437,6 @@ nxt_java_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
java_init.request_data_size = sizeof(nxt_java_request_data_t);
|
||||
java_init.data = &java_data;
|
||||
java_init.ctx_data = env;
|
||||
java_init.shm_limit = app_conf->shm_limit;
|
||||
|
||||
ctx = nxt_unit_init(&java_init);
|
||||
if (nxt_slow_path(ctx == NULL)) {
|
||||
@@ -616,11 +615,6 @@ nxt_java_ready_handler(nxt_unit_ctx_t *ctx)
|
||||
nxt_java_data_t *java_data;
|
||||
nxt_java_app_conf_t *c;
|
||||
|
||||
/* Worker thread context. */
|
||||
if (!nxt_unit_is_main_ctx(ctx)) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
java_data = ctx->unit->data;
|
||||
c = java_data->conf;
|
||||
|
||||
|
||||
@@ -154,6 +154,12 @@ static nxt_conf_map_t nxt_common_app_limits_conf[] = {
|
||||
offsetof(nxt_common_app_conf_t, shm_limit),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("requests"),
|
||||
NXT_CONF_MAP_INT32,
|
||||
offsetof(nxt_common_app_conf_t, request_limit),
|
||||
},
|
||||
|
||||
};
|
||||
|
||||
|
||||
@@ -392,6 +398,7 @@ nxt_port_main_start_process_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
*p = '\0';
|
||||
|
||||
app_conf->shm_limit = 100 * 1024 * 1024;
|
||||
app_conf->request_limit = 0;
|
||||
|
||||
start += app_conf->name.length + 1;
|
||||
|
||||
|
||||
@@ -485,14 +485,13 @@ nxt_php_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
}
|
||||
}
|
||||
|
||||
ret = nxt_unit_default_init(task, &php_init);
|
||||
ret = nxt_unit_default_init(task, &php_init, conf);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
nxt_alert(task, "nxt_unit_default_init() failed");
|
||||
return ret;
|
||||
}
|
||||
|
||||
php_init.callbacks.request_handler = nxt_php_request_handler;
|
||||
php_init.shm_limit = conf->shm_limit;
|
||||
|
||||
unit_ctx = nxt_unit_init(&php_init);
|
||||
if (nxt_slow_path(unit_ctx == NULL)) {
|
||||
|
||||
@@ -224,8 +224,6 @@ struct nxt_port_s {
|
||||
/* Maximum interleave of message parts. */
|
||||
uint32_t max_share;
|
||||
|
||||
uint32_t app_responses;
|
||||
|
||||
uint32_t active_websockets;
|
||||
uint32_t active_requests;
|
||||
|
||||
|
||||
@@ -27,7 +27,6 @@ typedef struct {
|
||||
uint32_t spare_processes;
|
||||
nxt_msec_t timeout;
|
||||
nxt_msec_t idle_timeout;
|
||||
uint32_t requests;
|
||||
nxt_conf_value_t *limits_value;
|
||||
nxt_conf_value_t *processes_value;
|
||||
nxt_conf_value_t *targets_value;
|
||||
@@ -1293,12 +1292,6 @@ static nxt_conf_map_t nxt_router_app_limits_conf[] = {
|
||||
NXT_CONF_MAP_MSEC,
|
||||
offsetof(nxt_router_app_conf_t, timeout),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("requests"),
|
||||
NXT_CONF_MAP_INT32,
|
||||
offsetof(nxt_router_app_conf_t, requests),
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -1567,7 +1560,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
apcf.spare_processes = 0;
|
||||
apcf.timeout = 0;
|
||||
apcf.idle_timeout = 15000;
|
||||
apcf.requests = 0;
|
||||
apcf.limits_value = NULL;
|
||||
apcf.processes_value = NULL;
|
||||
apcf.targets_value = NULL;
|
||||
@@ -1647,7 +1639,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
nxt_debug(task, "application type: %V", &apcf.type);
|
||||
nxt_debug(task, "application processes: %D", apcf.processes);
|
||||
nxt_debug(task, "application request timeout: %M", apcf.timeout);
|
||||
nxt_debug(task, "application requests: %D", apcf.requests);
|
||||
|
||||
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
|
||||
|
||||
@@ -1678,7 +1669,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
||||
? apcf.spare_processes : 1;
|
||||
app->timeout = apcf.timeout;
|
||||
app->idle_timeout = apcf.idle_timeout;
|
||||
app->max_requests = apcf.requests;
|
||||
|
||||
app->targets = targets;
|
||||
|
||||
@@ -4676,7 +4666,7 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
|
||||
{
|
||||
int inc_use;
|
||||
uint32_t got_response, dec_requests;
|
||||
nxt_bool_t port_unchained, send_quit, adjust_idle_timer;
|
||||
nxt_bool_t adjust_idle_timer;
|
||||
nxt_port_t *main_app_port;
|
||||
|
||||
nxt_assert(port != NULL);
|
||||
@@ -4722,40 +4712,18 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
|
||||
|
||||
nxt_thread_mutex_lock(&app->mutex);
|
||||
|
||||
main_app_port->app_responses += got_response;
|
||||
main_app_port->active_requests -= got_response + dec_requests;
|
||||
app->active_requests -= got_response + dec_requests;
|
||||
|
||||
if (main_app_port->pair[1] != -1
|
||||
&& (app->max_requests == 0
|
||||
|| main_app_port->app_responses < app->max_requests))
|
||||
{
|
||||
if (main_app_port->app_link.next == NULL) {
|
||||
if (main_app_port->pair[1] != -1 && main_app_port->app_link.next == NULL) {
|
||||
nxt_queue_insert_tail(&app->ports, &main_app_port->app_link);
|
||||
|
||||
nxt_port_inc_use(main_app_port);
|
||||
}
|
||||
}
|
||||
|
||||
send_quit = (app->max_requests > 0
|
||||
&& main_app_port->app_responses >= app->max_requests);
|
||||
|
||||
if (send_quit) {
|
||||
port_unchained = nxt_queue_chk_remove(&main_app_port->app_link);
|
||||
|
||||
nxt_port_hash_remove(&app->port_hash, main_app_port);
|
||||
app->port_hash_count--;
|
||||
|
||||
main_app_port->app = NULL;
|
||||
app->processes--;
|
||||
|
||||
} else {
|
||||
port_unchained = 0;
|
||||
}
|
||||
|
||||
adjust_idle_timer = 0;
|
||||
|
||||
if (main_app_port->pair[1] != -1 && !send_quit
|
||||
if (main_app_port->pair[1] != -1
|
||||
&& main_app_port->active_requests == 0
|
||||
&& main_app_port->active_websockets == 0
|
||||
&& main_app_port->idle_link.next == NULL)
|
||||
@@ -4800,19 +4768,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_app_t *app, nxt_port_t *port,
|
||||
goto adjust_use;
|
||||
}
|
||||
|
||||
if (send_quit) {
|
||||
nxt_debug(task, "app '%V' %p send QUIT to port", &app->name, app);
|
||||
|
||||
nxt_port_socket_write(task, main_app_port, NXT_PORT_MSG_QUIT, -1, 0, 0,
|
||||
NULL);
|
||||
|
||||
if (port_unchained) {
|
||||
nxt_port_use(task, main_app_port, -1);
|
||||
}
|
||||
|
||||
goto adjust_use;
|
||||
}
|
||||
|
||||
nxt_debug(task, "app '%V' %p requests queue is empty, keep the port",
|
||||
&app->name, app);
|
||||
|
||||
|
||||
@@ -124,7 +124,6 @@ struct nxt_app_s {
|
||||
uint32_t max_processes;
|
||||
uint32_t spare_processes;
|
||||
uint32_t max_pending_processes;
|
||||
uint32_t max_requests;
|
||||
|
||||
uint32_t generation;
|
||||
|
||||
|
||||
260
src/nxt_unit.c
260
src/nxt_unit.c
@@ -25,6 +25,11 @@
|
||||
#define NXT_UNIT_LOCAL_BUF_SIZE \
|
||||
(NXT_UNIT_MAX_PLAIN_SIZE + sizeof(nxt_port_msg_t))
|
||||
|
||||
enum {
|
||||
NXT_QUIT_NORMAL = 0,
|
||||
NXT_QUIT_GRACEFUL = 1,
|
||||
};
|
||||
|
||||
typedef struct nxt_unit_impl_s nxt_unit_impl_t;
|
||||
typedef struct nxt_unit_mmap_s nxt_unit_mmap_t;
|
||||
typedef struct nxt_unit_mmaps_s nxt_unit_mmaps_t;
|
||||
@@ -51,7 +56,8 @@ nxt_inline void nxt_unit_mmap_buf_insert_tail(nxt_unit_mmap_buf_t **prev,
|
||||
nxt_inline void nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf);
|
||||
static int nxt_unit_read_env(nxt_unit_port_t *ready_port,
|
||||
nxt_unit_port_t *router_port, nxt_unit_port_t *read_port,
|
||||
int *log_fd, uint32_t *stream, uint32_t *shm_limit);
|
||||
int *log_fd, uint32_t *stream, uint32_t *shm_limit,
|
||||
uint32_t *request_limit);
|
||||
static int nxt_unit_ready(nxt_unit_ctx_t *ctx, int ready_fd, uint32_t stream,
|
||||
int queue_fd);
|
||||
static int nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
|
||||
@@ -130,6 +136,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
|
||||
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
|
||||
static int nxt_unit_run_once_impl(nxt_unit_ctx_t *ctx);
|
||||
static int nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf);
|
||||
static int nxt_unit_chk_ready(nxt_unit_ctx_t *ctx);
|
||||
static int nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx);
|
||||
static void nxt_unit_process_ready_req(nxt_unit_ctx_t *ctx);
|
||||
nxt_inline int nxt_unit_is_read_queue(nxt_unit_read_buf_t *rbuf);
|
||||
@@ -150,14 +157,14 @@ static nxt_unit_port_t *nxt_unit_add_port(nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_t *port, void *queue);
|
||||
static void nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx,
|
||||
nxt_queue_t *awaiting_req);
|
||||
static void nxt_unit_remove_port(nxt_unit_impl_t *lib,
|
||||
static void nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_id_t *port_id);
|
||||
static nxt_unit_port_t *nxt_unit_remove_port_unsafe(nxt_unit_impl_t *lib,
|
||||
nxt_unit_port_id_t *port_id);
|
||||
static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
|
||||
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
|
||||
nxt_unit_process_t *process);
|
||||
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
|
||||
static void nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param);
|
||||
static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
|
||||
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_t *port, const void *buf, size_t buf_size,
|
||||
@@ -174,7 +181,7 @@ static int nxt_unit_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
nxt_unit_read_buf_t *rbuf);
|
||||
static int nxt_unit_port_queue_recv(nxt_unit_port_t *port,
|
||||
nxt_unit_read_buf_t *rbuf);
|
||||
static int nxt_unit_app_queue_recv(nxt_unit_port_t *port,
|
||||
static int nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
nxt_unit_read_buf_t *rbuf);
|
||||
nxt_inline int nxt_unit_close(int fd);
|
||||
static int nxt_unit_fd_blocking(int fd);
|
||||
@@ -311,8 +318,9 @@ struct nxt_unit_ctx_impl_s {
|
||||
/* of nxt_unit_read_buf_t */
|
||||
nxt_queue_t free_rbuf;
|
||||
|
||||
int online;
|
||||
int ready;
|
||||
uint8_t online; /* 1 bit */
|
||||
uint8_t ready; /* 1 bit */
|
||||
uint8_t quit_param;
|
||||
|
||||
nxt_unit_mmap_buf_t ctx_buf[2];
|
||||
nxt_unit_read_buf_t ctx_read_buf;
|
||||
@@ -344,9 +352,11 @@ struct nxt_unit_impl_s {
|
||||
nxt_unit_callbacks_t callbacks;
|
||||
|
||||
nxt_atomic_t use_count;
|
||||
nxt_atomic_t request_count;
|
||||
|
||||
uint32_t request_data_size;
|
||||
uint32_t shm_mmap_limit;
|
||||
uint32_t request_limit;
|
||||
|
||||
pthread_mutex_t mutex;
|
||||
|
||||
@@ -414,7 +424,7 @@ nxt_unit_init(nxt_unit_init_t *init)
|
||||
{
|
||||
int rc, queue_fd;
|
||||
void *mem;
|
||||
uint32_t ready_stream, shm_limit;
|
||||
uint32_t ready_stream, shm_limit, request_limit;
|
||||
nxt_unit_ctx_t *ctx;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_port_t ready_port, router_port, read_port;
|
||||
@@ -446,13 +456,15 @@ nxt_unit_init(nxt_unit_init_t *init)
|
||||
|
||||
} else {
|
||||
rc = nxt_unit_read_env(&ready_port, &router_port, &read_port,
|
||||
&lib->log_fd, &ready_stream, &shm_limit);
|
||||
&lib->log_fd, &ready_stream, &shm_limit,
|
||||
&request_limit);
|
||||
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||
goto fail;
|
||||
}
|
||||
|
||||
lib->shm_mmap_limit = (shm_limit + PORT_MMAP_DATA_SIZE - 1)
|
||||
/ PORT_MMAP_DATA_SIZE;
|
||||
lib->request_limit = request_limit;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(lib->shm_mmap_limit < 1)) {
|
||||
@@ -564,6 +576,7 @@ nxt_unit_create(nxt_unit_init_t *init)
|
||||
lib->request_data_size = init->request_data_size;
|
||||
lib->shm_mmap_limit = (init->shm_limit + PORT_MMAP_DATA_SIZE - 1)
|
||||
/ PORT_MMAP_DATA_SIZE;
|
||||
lib->request_limit = init->request_limit;
|
||||
|
||||
lib->processes.slot = NULL;
|
||||
lib->ports.slot = NULL;
|
||||
@@ -573,6 +586,7 @@ nxt_unit_create(nxt_unit_init_t *init)
|
||||
nxt_queue_init(&lib->contexts);
|
||||
|
||||
lib->use_count = 0;
|
||||
lib->request_count = 0;
|
||||
lib->router_port = NULL;
|
||||
lib->shared_port = NULL;
|
||||
|
||||
@@ -632,6 +646,7 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
||||
ctx_impl->wait_items = 0;
|
||||
ctx_impl->online = 1;
|
||||
ctx_impl->ready = 0;
|
||||
ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
|
||||
|
||||
nxt_queue_init(&ctx_impl->free_req);
|
||||
nxt_queue_init(&ctx_impl->free_ws);
|
||||
@@ -780,7 +795,7 @@ nxt_unit_mmap_buf_unlink(nxt_unit_mmap_buf_t *mmap_buf)
|
||||
static int
|
||||
nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
||||
nxt_unit_port_t *read_port, int *log_fd, uint32_t *stream,
|
||||
uint32_t *shm_limit)
|
||||
uint32_t *shm_limit, uint32_t *request_limit)
|
||||
{
|
||||
int rc;
|
||||
int ready_fd, router_fd, read_in_fd, read_out_fd;
|
||||
@@ -825,12 +840,12 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
||||
"%"PRId64",%"PRIu32",%d;"
|
||||
"%"PRId64",%"PRIu32",%d;"
|
||||
"%"PRId64",%"PRIu32",%d,%d;"
|
||||
"%d,%"PRIu32,
|
||||
"%d,%"PRIu32",%"PRIu32,
|
||||
&ready_stream,
|
||||
&ready_pid, &ready_id, &ready_fd,
|
||||
&router_pid, &router_id, &router_fd,
|
||||
&read_pid, &read_id, &read_in_fd, &read_out_fd,
|
||||
log_fd, shm_limit);
|
||||
log_fd, shm_limit, request_limit);
|
||||
|
||||
if (nxt_slow_path(rc == EOF)) {
|
||||
nxt_unit_alert(NULL, "sscanf(%s) failed: %s (%d) for %s env",
|
||||
@@ -839,9 +854,9 @@ nxt_unit_read_env(nxt_unit_port_t *ready_port, nxt_unit_port_t *router_port,
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(rc != 13)) {
|
||||
if (nxt_slow_path(rc != 14)) {
|
||||
nxt_unit_alert(NULL, "invalid number of variables in %s env: "
|
||||
"found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 13, vars);
|
||||
"found %d of %d in %s", NXT_UNIT_INIT_ENV, rc, 14, vars);
|
||||
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
@@ -929,6 +944,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
|
||||
{
|
||||
int rc;
|
||||
pid_t pid;
|
||||
uint8_t quit_param;
|
||||
struct cmsghdr *cm;
|
||||
nxt_port_msg_t *port_msg;
|
||||
nxt_unit_impl_t *lib;
|
||||
@@ -959,7 +975,7 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
|
||||
if (nxt_slow_path(rbuf->size == 0)) {
|
||||
nxt_unit_debug(ctx, "read port closed");
|
||||
|
||||
nxt_unit_quit(ctx);
|
||||
nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
|
||||
rc = NXT_UNIT_OK;
|
||||
goto done;
|
||||
}
|
||||
@@ -1018,9 +1034,18 @@ nxt_unit_process_msg(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf,
|
||||
break;
|
||||
|
||||
case _NXT_PORT_MSG_QUIT:
|
||||
nxt_unit_debug(ctx, "#%"PRIu32": quit", port_msg->stream);
|
||||
if (recv_msg.size == sizeof(quit_param)) {
|
||||
memcpy(&quit_param, recv_msg.start, sizeof(quit_param));
|
||||
|
||||
} else {
|
||||
quit_param = NXT_QUIT_NORMAL;
|
||||
}
|
||||
|
||||
nxt_unit_debug(ctx, "#%"PRIu32": %squit", port_msg->stream,
|
||||
(quit_param == NXT_QUIT_GRACEFUL ? "graceful " : ""));
|
||||
|
||||
nxt_unit_quit(ctx, quit_param);
|
||||
|
||||
nxt_unit_quit(ctx);
|
||||
rc = NXT_UNIT_OK;
|
||||
break;
|
||||
|
||||
@@ -1220,15 +1245,36 @@ nxt_unit_ctx_ready(nxt_unit_ctx_t *ctx)
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
if (nxt_slow_path(ctx_impl->ready)) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
ctx_impl->ready = 1;
|
||||
|
||||
if (lib->callbacks.ready_handler) {
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
/* Call ready_handler() only for main context. */
|
||||
if (&lib->main_ctx == ctx_impl && lib->callbacks.ready_handler != NULL) {
|
||||
return lib->callbacks.ready_handler(ctx);
|
||||
}
|
||||
|
||||
if (&lib->main_ctx != ctx_impl) {
|
||||
/* Check if the main context is already stopped or quit. */
|
||||
if (nxt_slow_path(!lib->main_ctx.ready)) {
|
||||
ctx_impl->ready = 0;
|
||||
|
||||
nxt_unit_quit(ctx, lib->main_ctx.quit_param);
|
||||
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
if (lib->callbacks.add_port != NULL) {
|
||||
lib->callbacks.add_port(ctx, lib->shared_port);
|
||||
}
|
||||
}
|
||||
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
@@ -1741,10 +1787,12 @@ nxt_unit_request_info_get(nxt_unit_ctx_t *ctx)
|
||||
static void
|
||||
nxt_unit_request_info_release(nxt_unit_request_info_t *req)
|
||||
{
|
||||
nxt_unit_ctx_t *ctx;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
nxt_unit_request_info_impl_t *req_impl;
|
||||
|
||||
ctx_impl = nxt_container_of(req->ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
ctx = req->ctx;
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
|
||||
|
||||
req->response = NULL;
|
||||
@@ -1783,6 +1831,10 @@ nxt_unit_request_info_release(nxt_unit_request_info_t *req)
|
||||
nxt_queue_insert_tail(&ctx_impl->free_req, &req_impl->link);
|
||||
|
||||
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||
|
||||
if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
|
||||
nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4522,7 +4574,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
|
||||
rc = nxt_unit_run_once_impl(ctx);
|
||||
|
||||
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
|
||||
nxt_unit_quit(ctx);
|
||||
nxt_unit_quit(ctx, NXT_QUIT_NORMAL);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -4586,6 +4638,7 @@ static int
|
||||
nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
||||
{
|
||||
int nevents, res, err;
|
||||
nxt_uint_t nfds;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
nxt_unit_port_impl_t *port_impl;
|
||||
@@ -4593,7 +4646,7 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
||||
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
if (ctx_impl->wait_items > 0 || ctx_impl->ready == 0) {
|
||||
if (ctx_impl->wait_items > 0 || !nxt_unit_chk_ready(ctx)) {
|
||||
return nxt_unit_ctx_port_recv(ctx, ctx_impl->read_port, rbuf);
|
||||
}
|
||||
|
||||
@@ -4626,20 +4679,28 @@ retry:
|
||||
}
|
||||
}
|
||||
|
||||
res = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
|
||||
if (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
|
||||
res = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
|
||||
if (res == NXT_UNIT_OK) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
fds[1].fd = lib->shared_port->in_fd;
|
||||
fds[1].events = POLLIN;
|
||||
|
||||
nfds = 2;
|
||||
|
||||
} else {
|
||||
nfds = 1;
|
||||
}
|
||||
|
||||
fds[0].fd = ctx_impl->read_port->in_fd;
|
||||
fds[0].events = POLLIN;
|
||||
fds[0].revents = 0;
|
||||
|
||||
fds[1].fd = lib->shared_port->in_fd;
|
||||
fds[1].events = POLLIN;
|
||||
fds[1].revents = 0;
|
||||
|
||||
nevents = poll(fds, 2, -1);
|
||||
nevents = poll(fds, nfds, -1);
|
||||
if (nxt_slow_path(nevents == -1)) {
|
||||
err = errno;
|
||||
|
||||
@@ -4685,6 +4746,21 @@ retry:
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
nxt_unit_chk_ready(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
return (ctx_impl->ready
|
||||
&& (lib->request_limit == 0
|
||||
|| lib->request_count < lib->request_limit));
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
@@ -4723,6 +4799,10 @@ nxt_unit_process_pending_rbuf(nxt_unit_ctx_t *ctx)
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
if (!ctx_impl->ready) {
|
||||
nxt_unit_quit(ctx, NXT_QUIT_GRACEFUL);
|
||||
}
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
@@ -4903,16 +4983,14 @@ nxt_unit_run_shared(nxt_unit_ctx_t *ctx)
|
||||
int rc;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_read_buf_t *rbuf;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
|
||||
nxt_unit_ctx_use(ctx);
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
rc = NXT_UNIT_OK;
|
||||
|
||||
while (nxt_fast_path(ctx_impl->online)) {
|
||||
while (nxt_fast_path(nxt_unit_chk_ready(ctx))) {
|
||||
rbuf = nxt_unit_read_buf_get(ctx);
|
||||
if (nxt_slow_path(rbuf == NULL)) {
|
||||
rc = NXT_UNIT_ERROR;
|
||||
@@ -4949,17 +5027,15 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
|
||||
int rc;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_read_buf_t *rbuf;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
nxt_unit_request_info_t *req;
|
||||
|
||||
nxt_unit_ctx_use(ctx);
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
req = NULL;
|
||||
|
||||
if (nxt_slow_path(!ctx_impl->online)) {
|
||||
if (nxt_slow_path(!nxt_unit_chk_ready(ctx))) {
|
||||
goto done;
|
||||
}
|
||||
|
||||
@@ -4968,7 +5044,7 @@ nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx)
|
||||
goto done;
|
||||
}
|
||||
|
||||
rc = nxt_unit_app_queue_recv(lib->shared_port, rbuf);
|
||||
rc = nxt_unit_app_queue_recv(ctx, lib->shared_port, rbuf);
|
||||
if (rc != NXT_UNIT_OK) {
|
||||
nxt_unit_read_buf_release(ctx, rbuf);
|
||||
goto done;
|
||||
@@ -4984,17 +5060,6 @@ done:
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
nxt_unit_impl_t *lib;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
return (ctx == &lib->main_ctx.ctx);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_unit_process_port_msg(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
{
|
||||
@@ -5017,13 +5082,17 @@ nxt_unit_process_port_msg_impl(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_read_buf_t *rbuf;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
if (port == lib->shared_port && !nxt_unit_chk_ready(ctx)) {
|
||||
return NXT_UNIT_AGAIN;
|
||||
}
|
||||
|
||||
rbuf = nxt_unit_read_buf_get(ctx);
|
||||
if (nxt_slow_path(rbuf == NULL)) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
if (port == lib->shared_port) {
|
||||
rc = nxt_unit_shared_port_recv(ctx, port, rbuf);
|
||||
|
||||
@@ -5194,7 +5263,7 @@ nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl)
|
||||
pthread_mutex_unlock(&lib->mutex);
|
||||
|
||||
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
|
||||
nxt_unit_remove_port(lib, &ctx_impl->read_port->id);
|
||||
nxt_unit_remove_port(lib, NULL, &ctx_impl->read_port->id);
|
||||
nxt_unit_port_release(ctx_impl->read_port);
|
||||
}
|
||||
|
||||
@@ -5605,7 +5674,8 @@ nxt_unit_process_awaiting_req(nxt_unit_ctx_t *ctx, nxt_queue_t *awaiting_req)
|
||||
|
||||
|
||||
static void
|
||||
nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
|
||||
nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_id_t *port_id)
|
||||
{
|
||||
nxt_unit_port_t *port;
|
||||
nxt_unit_port_impl_t *port_impl;
|
||||
@@ -5623,7 +5693,7 @@ nxt_unit_remove_port(nxt_unit_impl_t *lib, nxt_unit_port_id_t *port_id)
|
||||
pthread_mutex_unlock(&lib->mutex);
|
||||
|
||||
if (lib->callbacks.remove_port != NULL && port != NULL) {
|
||||
lib->callbacks.remove_port(&lib->unit, port);
|
||||
lib->callbacks.remove_port(&lib->unit, ctx, port);
|
||||
}
|
||||
|
||||
if (nxt_fast_path(port != NULL)) {
|
||||
@@ -5700,7 +5770,7 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
|
||||
nxt_queue_remove(&port->link);
|
||||
|
||||
if (lib->callbacks.remove_port != NULL) {
|
||||
lib->callbacks.remove_port(&lib->unit, &port->port);
|
||||
lib->callbacks.remove_port(&lib->unit, NULL, &port->port);
|
||||
}
|
||||
|
||||
nxt_unit_port_release(&port->port);
|
||||
@@ -5712,26 +5782,60 @@ nxt_unit_remove_process(nxt_unit_impl_t *lib, nxt_unit_process_t *process)
|
||||
|
||||
|
||||
static void
|
||||
nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
||||
nxt_unit_quit(nxt_unit_ctx_t *ctx, uint8_t quit_param)
|
||||
{
|
||||
nxt_port_msg_t msg;
|
||||
nxt_bool_t skip_graceful_broadcast, quit;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_ctx_impl_t *ctx_impl;
|
||||
nxt_unit_callbacks_t *cb;
|
||||
nxt_unit_request_info_t *req;
|
||||
nxt_unit_request_info_impl_t *req_impl;
|
||||
|
||||
struct {
|
||||
nxt_port_msg_t msg;
|
||||
uint8_t quit_param;
|
||||
} nxt_packed m;
|
||||
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||
|
||||
if (!ctx_impl->online) {
|
||||
nxt_unit_debug(ctx, "quit: %d/%d/%d", (int) quit_param, ctx_impl->ready,
|
||||
ctx_impl->online);
|
||||
|
||||
if (nxt_slow_path(!ctx_impl->online)) {
|
||||
return;
|
||||
}
|
||||
|
||||
ctx_impl->online = 0;
|
||||
skip_graceful_broadcast = quit_param == NXT_QUIT_GRACEFUL
|
||||
&& !ctx_impl->ready;
|
||||
|
||||
cb = &lib->callbacks;
|
||||
|
||||
if (nxt_fast_path(ctx_impl->ready)) {
|
||||
ctx_impl->ready = 0;
|
||||
|
||||
if (cb->remove_port != NULL) {
|
||||
cb->remove_port(&lib->unit, ctx, lib->shared_port);
|
||||
}
|
||||
}
|
||||
|
||||
if (quit_param == NXT_QUIT_GRACEFUL) {
|
||||
pthread_mutex_lock(&ctx_impl->mutex);
|
||||
|
||||
quit = nxt_queue_is_empty(&ctx_impl->active_req)
|
||||
&& nxt_queue_is_empty(&ctx_impl->pending_rbuf)
|
||||
&& ctx_impl->wait_items == 0;
|
||||
|
||||
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||
|
||||
} else {
|
||||
quit = 1;
|
||||
ctx_impl->quit_param = NXT_QUIT_GRACEFUL;
|
||||
}
|
||||
|
||||
if (quit) {
|
||||
ctx_impl->online = 0;
|
||||
|
||||
if (cb->quit != NULL) {
|
||||
cb->quit(ctx);
|
||||
}
|
||||
@@ -5754,14 +5858,20 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
if (ctx != &lib->main_ctx.ctx) {
|
||||
if (nxt_fast_path(ctx_impl->read_port != NULL)) {
|
||||
nxt_unit_remove_port(lib, ctx, &ctx_impl->read_port->id);
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx != &lib->main_ctx.ctx || skip_graceful_broadcast) {
|
||||
return;
|
||||
}
|
||||
|
||||
memset(&msg, 0, sizeof(nxt_port_msg_t));
|
||||
memset(&m.msg, 0, sizeof(nxt_port_msg_t));
|
||||
|
||||
msg.pid = lib->pid;
|
||||
msg.type = _NXT_PORT_MSG_QUIT;
|
||||
m.msg.pid = lib->pid;
|
||||
m.msg.type = _NXT_PORT_MSG_QUIT;
|
||||
m.quit_param = quit_param;
|
||||
|
||||
pthread_mutex_lock(&lib->mutex);
|
||||
|
||||
@@ -5775,7 +5885,7 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
||||
}
|
||||
|
||||
(void) nxt_unit_port_send(ctx, ctx_impl->read_port,
|
||||
&msg, sizeof(msg), NULL, 0);
|
||||
&m, sizeof(m), NULL, 0);
|
||||
|
||||
} nxt_queue_loop;
|
||||
|
||||
@@ -6089,7 +6199,11 @@ nxt_unit_shared_port_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
|
||||
retry:
|
||||
|
||||
res = nxt_unit_app_queue_recv(port, rbuf);
|
||||
res = nxt_unit_app_queue_recv(ctx, port, rbuf);
|
||||
|
||||
if (res == NXT_UNIT_OK) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
if (res == NXT_UNIT_AGAIN) {
|
||||
res = nxt_unit_port_recv(ctx, port, rbuf);
|
||||
@@ -6194,13 +6308,20 @@ nxt_unit_port_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
|
||||
|
||||
|
||||
static int
|
||||
nxt_unit_app_queue_recv(nxt_unit_port_t *port, nxt_unit_read_buf_t *rbuf)
|
||||
nxt_unit_app_queue_recv(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||
nxt_unit_read_buf_t *rbuf)
|
||||
{
|
||||
uint32_t cookie;
|
||||
nxt_port_msg_t *port_msg;
|
||||
nxt_app_queue_t *queue;
|
||||
nxt_unit_impl_t *lib;
|
||||
nxt_unit_port_impl_t *port_impl;
|
||||
|
||||
struct {
|
||||
nxt_port_msg_t msg;
|
||||
uint8_t quit_param;
|
||||
} nxt_packed m;
|
||||
|
||||
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
|
||||
queue = port_impl->queue;
|
||||
|
||||
@@ -6214,6 +6335,25 @@ retry:
|
||||
port_msg = (nxt_port_msg_t *) rbuf->buf;
|
||||
|
||||
if (nxt_app_queue_cancel(queue, cookie, port_msg->stream)) {
|
||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||
|
||||
if (lib->request_limit != 0) {
|
||||
nxt_atomic_fetch_add(&lib->request_count, 1);
|
||||
|
||||
if (nxt_slow_path(lib->request_count >= lib->request_limit)) {
|
||||
nxt_unit_debug(ctx, "request limit reached");
|
||||
|
||||
memset(&m.msg, 0, sizeof(nxt_port_msg_t));
|
||||
|
||||
m.msg.pid = lib->pid;
|
||||
m.msg.type = _NXT_PORT_MSG_QUIT;
|
||||
m.quit_param = NXT_QUIT_GRACEFUL;
|
||||
|
||||
(void) nxt_unit_port_send(ctx, lib->main_ctx.read_port,
|
||||
&m, sizeof(m), NULL, 0);
|
||||
}
|
||||
}
|
||||
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
|
||||
@@ -136,7 +136,8 @@ struct nxt_unit_callbacks_s {
|
||||
int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
|
||||
|
||||
/* Remove previously added port. Optional. */
|
||||
void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port);
|
||||
void (*remove_port)(nxt_unit_t *, nxt_unit_ctx_t *,
|
||||
nxt_unit_port_t *port);
|
||||
|
||||
/* Remove all data associated with process pid including ports. Optional. */
|
||||
void (*remove_pid)(nxt_unit_t *, pid_t pid);
|
||||
@@ -167,6 +168,7 @@ struct nxt_unit_init_s {
|
||||
|
||||
uint32_t request_data_size;
|
||||
uint32_t shm_limit;
|
||||
uint32_t request_limit;
|
||||
|
||||
nxt_unit_callbacks_t callbacks;
|
||||
|
||||
@@ -215,8 +217,6 @@ int nxt_unit_run_shared(nxt_unit_ctx_t *ctx);
|
||||
|
||||
nxt_unit_request_info_t *nxt_unit_dequeue_request(nxt_unit_ctx_t *ctx);
|
||||
|
||||
int nxt_unit_is_main_ctx(nxt_unit_ctx_t *ctx);
|
||||
|
||||
/*
|
||||
* Receive and process one message, invoke configured callbacks.
|
||||
*
|
||||
|
||||
@@ -1184,13 +1184,12 @@ nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
nxt_unit_default_init(task, &perl_init);
|
||||
nxt_unit_default_init(task, &perl_init, common_conf);
|
||||
|
||||
perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler;
|
||||
perl_init.callbacks.ready_handler = nxt_perl_psgi_ready_handler;
|
||||
perl_init.data = c;
|
||||
perl_init.ctx_data = &pctx;
|
||||
perl_init.shm_limit = common_conf->shm_limit;
|
||||
|
||||
unit_ctx = nxt_unit_init(&perl_init);
|
||||
if (nxt_slow_path(unit_ctx == NULL)) {
|
||||
@@ -1292,11 +1291,6 @@ nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx)
|
||||
nxt_perl_app_conf_t *c;
|
||||
nxt_perl_psgi_ctx_t *pctx;
|
||||
|
||||
/* Worker thread context. */
|
||||
if (!nxt_unit_is_main_ctx(ctx)) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
c = ctx->unit->data;
|
||||
|
||||
if (c->threads <= 1) {
|
||||
|
||||
@@ -230,10 +230,9 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
}
|
||||
}
|
||||
|
||||
nxt_unit_default_init(task, &python_init);
|
||||
nxt_unit_default_init(task, &python_init, data->app);
|
||||
|
||||
python_init.data = c;
|
||||
python_init.shm_limit = data->app->shm_limit;
|
||||
python_init.callbacks.ready_handler = nxt_python_ready_handler;
|
||||
|
||||
proto = c->protocol;
|
||||
@@ -522,18 +521,6 @@ nxt_python_ready_handler(nxt_unit_ctx_t *ctx)
|
||||
nxt_py_thread_info_t *ti;
|
||||
nxt_python_app_conf_t *c;
|
||||
|
||||
if (nxt_py_proto.ready != NULL) {
|
||||
res = nxt_py_proto.ready(ctx);
|
||||
if (nxt_slow_path(res != NXT_UNIT_OK)) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
/* Worker thread context. */
|
||||
if (!nxt_unit_is_main_ctx(ctx)) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
c = ctx->unit->data;
|
||||
|
||||
if (c->threads <= 1) {
|
||||
|
||||
@@ -64,7 +64,6 @@ typedef struct {
|
||||
void (*ctx_data_free)(void *data);
|
||||
int (*startup)(void *data);
|
||||
int (*run)(nxt_unit_ctx_t *ctx);
|
||||
int (*ready)(nxt_unit_ctx_t *ctx);
|
||||
void (*done)(void);
|
||||
} nxt_python_proto_t;
|
||||
|
||||
|
||||
@@ -33,11 +33,10 @@ static PyObject *nxt_py_asgi_create_address(nxt_unit_sptr_t *sptr, uint8_t len,
|
||||
static PyObject *nxt_py_asgi_create_header(nxt_unit_field_t *f);
|
||||
static PyObject *nxt_py_asgi_create_subprotocols(nxt_unit_field_t *f);
|
||||
|
||||
static int nxt_python_asgi_ready(nxt_unit_ctx_t *ctx);
|
||||
|
||||
static int nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
|
||||
static int nxt_py_asgi_add_reader(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
|
||||
static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port);
|
||||
static void nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_t *port);
|
||||
static void nxt_py_asgi_quit(nxt_unit_ctx_t *ctx);
|
||||
static void nxt_py_asgi_shm_ack_handler(nxt_unit_ctx_t *ctx);
|
||||
|
||||
@@ -45,7 +44,6 @@ static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
|
||||
static void nxt_python_asgi_done(void);
|
||||
|
||||
static PyObject *nxt_py_port_read;
|
||||
static nxt_unit_port_t *nxt_py_shared_port;
|
||||
|
||||
static PyMethodDef nxt_py_port_read_method =
|
||||
{"unit_port_read", nxt_py_asgi_port_read, METH_VARARGS, ""};
|
||||
@@ -55,7 +53,6 @@ static nxt_python_proto_t nxt_py_asgi_proto = {
|
||||
.ctx_data_free = nxt_python_asgi_ctx_data_free,
|
||||
.startup = nxt_python_asgi_startup,
|
||||
.run = nxt_python_asgi_run,
|
||||
.ready = nxt_python_asgi_ready,
|
||||
.done = nxt_python_asgi_done,
|
||||
};
|
||||
|
||||
@@ -362,13 +359,6 @@ nxt_python_asgi_run(nxt_unit_ctx_t *ctx)
|
||||
|
||||
Py_DECREF(res);
|
||||
|
||||
nxt_py_asgi_remove_reader(ctx, nxt_py_shared_port);
|
||||
nxt_py_asgi_remove_reader(ctx, ctx_data->port);
|
||||
|
||||
if (ctx_data->port != NULL) {
|
||||
ctx_data->port = NULL;
|
||||
}
|
||||
|
||||
nxt_py_asgi_lifespan_shutdown(ctx);
|
||||
|
||||
return NXT_UNIT_OK;
|
||||
@@ -891,28 +881,10 @@ fail:
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
nxt_python_asgi_ready(nxt_unit_ctx_t *ctx)
|
||||
{
|
||||
nxt_unit_port_t *port;
|
||||
|
||||
if (nxt_slow_path(nxt_py_shared_port == NULL)) {
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
port = nxt_py_shared_port;
|
||||
|
||||
nxt_unit_debug(ctx, "asgi_ready %d %p %p", port->in_fd, ctx, port);
|
||||
|
||||
return nxt_py_asgi_add_reader(ctx, port);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
{
|
||||
int nb;
|
||||
nxt_py_asgi_ctx_data_t *ctx_data;
|
||||
|
||||
if (port->in_fd == -1) {
|
||||
return NXT_UNIT_OK;
|
||||
@@ -929,16 +901,6 @@ nxt_py_asgi_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||
|
||||
nxt_unit_debug(ctx, "asgi_add_port %d %p %p", port->in_fd, ctx, port);
|
||||
|
||||
if (port->id.id == NXT_UNIT_SHARED_PORT_ID) {
|
||||
nxt_py_shared_port = port;
|
||||
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
ctx_data = ctx->data;
|
||||
|
||||
ctx_data->port = port;
|
||||
|
||||
return nxt_py_asgi_add_reader(ctx, port);
|
||||
}
|
||||
|
||||
@@ -1008,17 +970,16 @@ clean_fd:
|
||||
|
||||
|
||||
static void
|
||||
nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_port_t *port)
|
||||
nxt_py_asgi_remove_port(nxt_unit_t *lib, nxt_unit_ctx_t *ctx,
|
||||
nxt_unit_port_t *port)
|
||||
{
|
||||
if (port->in_fd == -1) {
|
||||
if (port->in_fd == -1 || ctx == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_unit_debug(NULL, "asgi_remove_port %d %p", port->in_fd, port);
|
||||
|
||||
if (nxt_py_shared_port == port) {
|
||||
nxt_py_shared_port = NULL;
|
||||
}
|
||||
nxt_py_asgi_remove_reader(ctx, port);
|
||||
}
|
||||
|
||||
|
||||
@@ -1032,27 +993,6 @@ nxt_py_asgi_quit(nxt_unit_ctx_t *ctx)
|
||||
|
||||
ctx_data = ctx->data;
|
||||
|
||||
if (nxt_py_shared_port != NULL) {
|
||||
p = PyLong_FromLong(nxt_py_shared_port->in_fd);
|
||||
if (nxt_slow_path(p == NULL)) {
|
||||
nxt_unit_alert(NULL, "Python failed to create Long");
|
||||
nxt_python_print_exception();
|
||||
|
||||
} else {
|
||||
res = PyObject_CallFunctionObjArgs(ctx_data->loop_remove_reader,
|
||||
p, NULL);
|
||||
if (nxt_slow_path(res == NULL)) {
|
||||
nxt_unit_alert(NULL, "Python failed to remove_reader");
|
||||
nxt_python_print_exception();
|
||||
|
||||
} else {
|
||||
Py_DECREF(res);
|
||||
}
|
||||
|
||||
Py_DECREF(p);
|
||||
}
|
||||
}
|
||||
|
||||
p = PyLong_FromLong(0);
|
||||
if (nxt_slow_path(p == NULL)) {
|
||||
nxt_unit_alert(NULL, "Python failed to create Long");
|
||||
|
||||
@@ -34,7 +34,6 @@ typedef struct {
|
||||
PyObject *quit_future;
|
||||
PyObject *quit_future_set_result;
|
||||
PyObject **target_lifespans;
|
||||
nxt_unit_port_t *port;
|
||||
} nxt_py_asgi_ctx_data_t;
|
||||
|
||||
PyObject *nxt_py_asgi_enum_headers(PyObject *headers,
|
||||
|
||||
@@ -636,10 +636,12 @@ nxt_py_asgi_http_close_handler(nxt_unit_request_info_t *req)
|
||||
|
||||
nxt_unit_req_debug(req, "asgi_http_close_handler");
|
||||
|
||||
if (nxt_fast_path(http != NULL)) {
|
||||
http->closed = 1;
|
||||
|
||||
nxt_py_asgi_http_emit_disconnect(http);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static PyObject *
|
||||
|
||||
@@ -980,6 +980,10 @@ nxt_py_asgi_websocket_close_handler(nxt_unit_request_info_t *req)
|
||||
|
||||
nxt_unit_req_debug(req, "asgi_websocket_close_handler");
|
||||
|
||||
if (nxt_slow_path(ws == NULL)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (ws->receive_future == NULL) {
|
||||
ws->state = NXT_WS_DISCONNECTED;
|
||||
|
||||
|
||||
@@ -357,11 +357,10 @@ nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
goto fail;
|
||||
}
|
||||
|
||||
nxt_unit_default_init(task, &ruby_unit_init);
|
||||
nxt_unit_default_init(task, &ruby_unit_init, conf);
|
||||
|
||||
ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
|
||||
ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
|
||||
ruby_unit_init.shm_limit = conf->shm_limit;
|
||||
ruby_unit_init.data = c;
|
||||
ruby_unit_init.ctx_data = &ruby_ctx;
|
||||
|
||||
@@ -1258,11 +1257,6 @@ nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
|
||||
nxt_ruby_ctx_t *rctx;
|
||||
nxt_ruby_app_conf_t *c;
|
||||
|
||||
/* Worker thread context. */
|
||||
if (!nxt_unit_is_main_ctx(ctx)) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
c = ctx->unit->data;
|
||||
|
||||
if (c->threads <= 1) {
|
||||
|
||||
@@ -97,7 +97,7 @@ ready_handler(nxt_unit_ctx_t *ctx)
|
||||
|
||||
nxt_unit_debug(ctx, "ready");
|
||||
|
||||
if (!nxt_unit_is_main_ctx(ctx) || thread_count <= 1) {
|
||||
if (thread_count <= 1) {
|
||||
return NXT_UNIT_OK;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user