Introducing app joint to accurate app release.

For accurate app descriptor release, it is required to count the number of
use counts.  Use count increased when:
- app linked to configuration app queue;
- socket conf stores pointer to app;
- request for start app process posted to router service thread;

Application port has pointer to app, but it does not increase use count
to avoid use count loop.

Timer needs a pointer to nxt_timer_t which is stored in engine timers tree.
nxt_timer_t now resides in nxt_app_joint_t and does not lock the application.

Start process port RPC handlers is also linked to nxt_app_joint_t.

App joint (nxt_app_joint_t) is a 'weak pointer':
- single threaded;
- use countable;
- store pointer to nxt_app_t (which can be NULL);

nxt_app_t has pointer to nxt_app_joint_t and update its pointer to app.
This commit is contained in:
Max Romanov
2018-08-10 19:27:13 +03:00
parent 941616f893
commit 86740ab34b
4 changed files with 169 additions and 94 deletions

View File

@@ -109,11 +109,7 @@ nxt_port_release(nxt_task_t *task, nxt_port_t *port)
nxt_debug(task, "port %p %d:%d release, type %d", port, port->pid,
port->id, port->type);
if (port->app != NULL) {
nxt_router_app_use(task, port->app, -1);
port->app = NULL;
}
if (port->link.next != NULL) {
nxt_assert(port->process != NULL);

View File

@@ -236,7 +236,7 @@ static void nxt_router_app_port_ready(nxt_task_t *task,
static void nxt_router_app_port_error(nxt_task_t *task,
nxt_port_recv_msg_t *msg, void *data);
static void nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app);
static void nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
uint32_t request_failed, uint32_t got_response);
static nxt_int_t nxt_router_app_port(nxt_task_t *task, nxt_app_t *app,
@@ -252,12 +252,16 @@ static void nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_app_idle_timeout(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_app_release_handler(nxt_task_t *task, void *obj,
static void nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj,
void *data);
static void nxt_router_free_app(nxt_task_t *task, void *obj, void *data);
static const nxt_http_request_state_t nxt_http_request_send_state;
static void nxt_http_request_send_body(nxt_task_t *task, void *obj, void *data);
static void nxt_router_app_joint_use(nxt_task_t *task,
nxt_app_joint_t *app_joint, int i);
static nxt_router_t *nxt_router;
static const nxt_str_t http_prefix = nxt_string("HTTP_");
@@ -360,12 +364,16 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
*b->mem.free++ = '\0';
nxt_buf_cpystr(b, &app->conf);
nxt_router_app_joint_use(task, app->joint, 1);
stream = nxt_port_rpc_register_handler(task, port,
nxt_router_app_port_ready,
nxt_router_app_port_error,
-1, app);
-1, app->joint);
if (nxt_slow_path(stream == 0)) {
nxt_router_app_joint_use(task, app->joint, -1);
goto failed;
}
@@ -374,9 +382,14 @@ nxt_router_start_app_process_handler(nxt_task_t *task, nxt_port_t *port,
if (nxt_slow_path(ret != NXT_OK)) {
nxt_port_rpc_cancel(task, port, stream);
nxt_router_app_joint_use(task, app->joint, -1);
goto failed;
}
nxt_router_app_use(task, app, -1);
return;
failed:
@@ -397,6 +410,19 @@ failed:
}
static void
nxt_router_app_joint_use(nxt_task_t *task, nxt_app_joint_t *app_joint, int i)
{
app_joint->use_count += i;
if (app_joint->use_count == 0) {
nxt_assert(app_joint->app == NULL);
nxt_free(app_joint);
}
}
static nxt_int_t
nxt_router_start_app_process(nxt_task_t *task, nxt_app_t *app)
{
@@ -1098,7 +1124,7 @@ nxt_router_conf_error(nxt_task_t *task, nxt_router_temp_conf_t *tmcf)
nxt_queue_each(app, &tmcf->apps, nxt_app_t, link) {
nxt_router_app_quit(task, app);
nxt_router_app_unlink(task, app);
} nxt_queue_loop;
@@ -1284,6 +1310,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_str_t name, path;
nxt_app_t *app, *prev;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
@@ -1375,6 +1402,13 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
apcf.limits_value = NULL;
apcf.processes_value = NULL;
app_joint = nxt_malloc(sizeof(nxt_app_joint_t));
if (nxt_slow_path(app_joint == NULL)) {
goto app_fail;
}
nxt_memzero(app_joint, sizeof(nxt_app_joint_t));
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
nxt_nitems(nxt_router_app_conf), &apcf);
if (ret != NXT_OK) {
@@ -1453,7 +1487,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->timeout = apcf.timeout;
app->res_timeout = apcf.res_timeout * 1000000;
app->idle_timeout = apcf.idle_timeout;
app->live = 1;
app->max_pending_responses = 2;
app->max_requests = apcf.requests;
@@ -1461,12 +1494,6 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
app->engine = engine;
app->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
app->idle_timer.work_queue = &engine->fast_work_queue;
app->idle_timer.handler = nxt_router_app_idle_timeout;
app->idle_timer.task = &engine->task;
app->idle_timer.log = app->idle_timer.task->log;
app->adjust_idle_work.handler = nxt_router_adjust_idle_timer;
app->adjust_idle_work.task = &engine->task;
app->adjust_idle_work.obj = app;
@@ -1474,6 +1501,21 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_queue_insert_tail(&tmcf->apps, &app->link);
nxt_router_app_use(task, app, 1);
app->joint = app_joint;
app_joint->use_count = 1;
app_joint->app = app;
app_joint->idle_timer.precision = NXT_TIMER_DEFAULT_PRECISION;
app_joint->idle_timer.work_queue = &engine->fast_work_queue;
app_joint->idle_timer.handler = nxt_router_app_idle_timeout;
app_joint->idle_timer.task = &engine->task;
app_joint->idle_timer.log = app_joint->idle_timer.task->log;
app_joint->free_app_work.handler = nxt_router_free_app;
app_joint->free_app_work.task = &engine->task;
app_joint->free_app_work.obj = app_joint;
}
http = nxt_conf_get_path(conf, &http_path);
@@ -1975,8 +2017,6 @@ nxt_router_app_prefork_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
port = msg->u.new_port;
port->app = app;
nxt_router_app_use(task, app, 1);
app->pending_processes--;
app->processes++;
app->idle_processes++;
@@ -2329,7 +2369,7 @@ nxt_router_apps_sort(nxt_task_t *task, nxt_router_t *router,
nxt_queue_each(app, &router->apps, nxt_app_t, link) {
nxt_router_app_quit(task, app);
nxt_router_app_unlink(task, app);
} nxt_queue_loop;
@@ -3367,13 +3407,26 @@ nxt_router_app_port_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
nxt_app_t *app;
nxt_port_t *port;
nxt_app_joint_t *app_joint;
app = data;
app_joint = data;
port = msg->u.new_port;
nxt_assert(app != NULL);
nxt_assert(app_joint != NULL);
nxt_assert(port != NULL);
app = app_joint->app;
nxt_router_app_joint_use(task, app_joint, -1);
if (nxt_slow_path(app == NULL)) {
nxt_debug(task, "new port ready for released app, send QUIT");
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
return;
}
port->app = app;
nxt_thread_mutex_lock(&app->mutex);
@@ -3397,12 +3450,23 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
void *data)
{
nxt_app_t *app;
nxt_app_joint_t *app_joint;
nxt_queue_link_t *lnk;
nxt_req_app_link_t *ra;
app = data;
app_joint = data;
nxt_assert(app != NULL);
nxt_assert(app_joint != NULL);
app = app_joint->app;
nxt_router_app_joint_use(task, app_joint, -1);
if (nxt_slow_path(app == NULL)) {
nxt_debug(task, "start error for released app");
return;
}
nxt_debug(task, "app '%V' %p start error", &app->name, app);
@@ -3432,10 +3496,10 @@ nxt_router_app_port_error(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_router_ra_error(ra, 500, "Failed to start application process");
nxt_router_ra_use(task, ra, -1);
}
nxt_router_app_use(task, app, -1);
}
nxt_inline nxt_port_t *
nxt_router_app_get_port_for_quit(nxt_app_t *app);
void
nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
@@ -3446,17 +3510,12 @@ nxt_router_app_use(nxt_task_t *task, nxt_app_t *app, int i)
if (i < 0 && c == -i) {
nxt_assert(app->live == 0);
nxt_assert(app->processes == 0);
nxt_assert(app->idle_processes == 0);
nxt_assert(app->pending_processes == 0);
nxt_assert(nxt_queue_is_empty(&app->requests));
nxt_assert(nxt_queue_is_empty(&app->ports));
nxt_assert(nxt_queue_is_empty(&app->spare_ports));
nxt_assert(nxt_queue_is_empty(&app->idle_ports));
if (task->thread->engine != app->engine) {
nxt_event_engine_post(app->engine, &app->joint->free_app_work);
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
} else {
nxt_router_free_app(task, app->joint, NULL);
}
}
}
@@ -3542,7 +3601,6 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
app->idle_processes--;
}
/* Caller is responsible to decrease app use count. */
port->app = NULL;
app->processes--;
@@ -3557,39 +3615,15 @@ nxt_router_app_get_port_for_quit(nxt_app_t *app)
static void
nxt_router_app_quit(nxt_task_t *task, nxt_app_t *app)
nxt_router_app_unlink(nxt_task_t *task, nxt_app_t *app)
{
nxt_port_t *port;
nxt_debug(task, "app '%V' %p unlink", &app->name, app);
nxt_queue_remove(&app->link);
app->live = 0;
for ( ;; ) {
port = nxt_router_app_get_port_for_quit(app);
if (port == NULL) {
break;
}
nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
nxt_port_use(task, port, -1);
nxt_router_app_use(task, app, -1);
}
if (nxt_timer_is_in_tree(&app->idle_timer)) {
nxt_assert(app->engine == task->thread->engine);
app->idle_timer.handler = nxt_router_app_release_handler;
nxt_timer_add(app->engine, &app->idle_timer, 0);
} else {
nxt_router_app_use(task, app, -1);
}
}
static void
nxt_router_app_process_request(nxt_task_t *task, void *obj, void *data)
@@ -3640,10 +3674,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
port->app_pending_responses -= request_failed + got_response;
port->app_responses += got_response;
if (nxt_slow_path(app->live == 0)) {
goto app_dead;
}
if (port->pair[1] != -1
&& (app->max_pending_responses == 0
|| port->app_pending_responses < app->max_pending_responses)
@@ -3687,8 +3717,6 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
}
}
app_dead:
/* Pop first pending request for this port. */
if ((request_failed > 0 || got_response > 0)
&& !nxt_queue_is_empty(&port->pending_requests))
@@ -3740,8 +3768,8 @@ app_dead:
re_ra_cancelled:
send_quit = (app->live == 0 && port->app_pending_responses == 0)
|| (app->max_requests > 0 && port->app_pending_responses == 0
send_quit = (app->max_requests > 0
&& port->app_pending_responses == 0
&& port->app_responses >= app->max_requests);
if (send_quit) {
@@ -3827,8 +3855,6 @@ re_ra_cancelled:
nxt_port_use(task, port, -1);
}
nxt_router_app_use(task, app, -1);
goto adjust_use;
}
@@ -3879,8 +3905,7 @@ nxt_router_app_port_close(nxt_task_t *task, nxt_port_t *port)
app->processes--;
start_process = app->live != 0
&& !task->thread->engine->shutdown
start_process = !task->thread->engine->shutdown
&& nxt_router_app_can_start(app)
&& (!nxt_queue_is_empty(&app->requests)
|| nxt_router_app_need_start(app));
@@ -3923,7 +3948,7 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_assert(app->engine == engine);
threshold = engine->timers.now + app->idle_timer.precision;
threshold = engine->timers.now + app->joint->idle_timer.precision;
timeout = 0;
nxt_thread_mutex_lock(&app->mutex);
@@ -3962,7 +3987,6 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
nxt_port_use(task, port, -1);
nxt_router_app_use(task, app, -1);
nxt_thread_mutex_lock(&app->mutex);
}
@@ -3970,10 +3994,10 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
nxt_thread_mutex_unlock(&app->mutex);
if (timeout > threshold) {
nxt_timer_add(engine, &app->idle_timer, timeout - threshold);
nxt_timer_add(engine, &app->joint->idle_timer, timeout - threshold);
} else {
nxt_timer_disable(engine, &app->idle_timer);
nxt_timer_disable(engine, &app->joint->idle_timer);
}
if (queued) {
@@ -3985,26 +4009,73 @@ nxt_router_adjust_idle_timer(nxt_task_t *task, void *obj, void *data)
static void
nxt_router_app_idle_timeout(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
nxt_timer_t *timer;
nxt_app_joint_t *app_joint;
timer = obj;
app = nxt_container_of(timer, nxt_app_t, idle_timer);
app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
nxt_router_adjust_idle_timer(task, app, NULL);
if (nxt_fast_path(app_joint->app != NULL)) {
nxt_router_adjust_idle_timer(task, app_joint->app, NULL);
}
}
static void
nxt_router_app_release_handler(nxt_task_t *task, void *obj, void *data)
nxt_router_app_joint_release_handler(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
nxt_timer_t *timer;
nxt_app_joint_t *app_joint;
timer = obj;
app = nxt_container_of(timer, nxt_app_t, idle_timer);
app_joint = nxt_container_of(timer, nxt_app_joint_t, idle_timer);
nxt_router_app_use(task, app, -1);
nxt_router_app_joint_use(task, app_joint, -1);
}
static void
nxt_router_free_app(nxt_task_t *task, void *obj, void *data)
{
nxt_app_t *app;
nxt_port_t *port;
nxt_app_joint_t *app_joint;
app_joint = obj;
app = app_joint->app;
for ( ;; ) {
port = nxt_router_app_get_port_for_quit(app);
if (port == NULL) {
break;
}
nxt_debug(task, "send QUIT to app '%V' pid %PI", &app->name, port->pid);
nxt_port_socket_write(task, port, NXT_PORT_MSG_QUIT, -1, 0, 0, NULL);
nxt_port_use(task, port, -1);
}
nxt_assert(app->processes == 0);
nxt_assert(app->idle_processes == 0);
nxt_assert(nxt_queue_is_empty(&app->requests));
nxt_assert(nxt_queue_is_empty(&app->ports));
nxt_assert(nxt_queue_is_empty(&app->spare_ports));
nxt_assert(nxt_queue_is_empty(&app->idle_ports));
nxt_thread_mutex_destroy(&app->mutex);
nxt_free(app);
app_joint->app = NULL;
if (nxt_timer_delete(task->thread->engine, &app_joint->idle_timer)) {
app_joint->idle_timer.handler = nxt_router_app_joint_release_handler;
nxt_timer_add(task->thread->engine, &app_joint->idle_timer, 0);
} else {
nxt_router_app_joint_use(task, app_joint, -1);
}
}

View File

@@ -81,13 +81,20 @@ typedef struct {
} nxt_joint_job_t;
typedef struct {
uint32_t use_count;
nxt_app_t *app;
nxt_timer_t idle_timer;
nxt_work_t free_app_work;
} nxt_app_joint_t;
struct nxt_app_s {
nxt_thread_mutex_t mutex; /* Protects ports queue. */
nxt_queue_t ports; /* of nxt_port_t.app_link */
nxt_queue_t spare_ports; /* of nxt_port_t.idle_link */
nxt_queue_t idle_ports; /* of nxt_port_t.idle_link */
nxt_timer_t idle_timer;
nxt_work_t adjust_idle_work;
nxt_event_engine_t *engine;
@@ -110,13 +117,14 @@ struct nxt_app_s {
nxt_msec_t idle_timeout;
nxt_app_type_t type:8;
uint8_t live; /* 1 bit */
nxt_queue_link_t link;
nxt_str_t conf;
nxt_atomic_t use_count;
nxt_app_joint_t *joint;
};

View File

@@ -195,8 +195,8 @@ class TestUnitPythonApplication(unit.TestUnitApplicationPython):
self.assertEqual(resp, {}, 'reconfigure 2 keep-alive 3')
@unittest.expectedFailure
def test_python_keepalive_reconfigure_3(self):
self.skip_alerts.append(r'sendmsg.+failed')
self.load('empty')
(resp, sock) = self.http(b"""GET / HTTP/1.1