Re-engineered timers.
To optimize rbtree operations, all changes are stored in array and later processed in batches. The previous implementation of this mechanics had a number of design flaws. Each change was saved in a new array entry; until the changes were applied, the timer remained in an intermediate state (NXT_TIMER_CHANGING). This intermediate state didn't allow to identify if time was going to be disabled or enabled. However, the nxt_conn_io_read() function relied on this information; as a result, in some cases the read timeout wasn't set. Also, the nxt_timer_delete() function did not reliably track whether a timer was added to the work queue. It checked the NXT_TIMER_ENQUEUED state of a timer, but this state could be reset to NXT_TIMER_DISABLED by a nxt_timer_disable() call or another nxt_timer_delete() call. Now, instead of keeping the whole history of the timer's changes, the new implementation updates the timer state immediately, and only one operation is added to the array to add or delete timer in the rbtree according to its final state.
This commit is contained in:
@@ -105,7 +105,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
|
|||||||
* occured during read operation, it toggled write event
|
* occured during read operation, it toggled write event
|
||||||
* internally so only read timer should be set.
|
* internally so only read timer should be set.
|
||||||
*/
|
*/
|
||||||
if (c->read_timer.state == NXT_TIMER_DISABLED) {
|
if (!c->read_timer.enabled) {
|
||||||
nxt_conn_timer(engine, c, state, &c->read_timer);
|
nxt_conn_timer(engine, c, state, &c->read_timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -117,7 +117,7 @@ nxt_conn_io_read(nxt_task_t *task, void *obj, void *data)
|
|||||||
nxt_fd_event_enable_read(engine, &c->socket);
|
nxt_fd_event_enable_read(engine, &c->socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (state->timer_autoreset || c->read_timer.state == NXT_TIMER_DISABLED) {
|
if (state->timer_autoreset || !c->read_timer.enabled) {
|
||||||
nxt_conn_timer(engine, c, state, &c->read_timer);
|
nxt_conn_timer(engine, c, state, &c->read_timer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
109
src/nxt_timer.c
109
src/nxt_timer.c
@@ -32,6 +32,10 @@ nxt_timers_init(nxt_timers_t *timers, nxt_uint_t mchanges)
|
|||||||
{
|
{
|
||||||
nxt_rbtree_init(&timers->tree, nxt_timer_rbtree_compare);
|
nxt_rbtree_init(&timers->tree, nxt_timer_rbtree_compare);
|
||||||
|
|
||||||
|
if (mchanges > NXT_TIMER_MAX_CHANGES) {
|
||||||
|
mchanges = NXT_TIMER_MAX_CHANGES;
|
||||||
|
}
|
||||||
|
|
||||||
timers->mchanges = mchanges;
|
timers->mchanges = mchanges;
|
||||||
|
|
||||||
timers->changes = nxt_malloc(sizeof(nxt_timer_change_t) * mchanges);
|
timers->changes = nxt_malloc(sizeof(nxt_timer_change_t) * mchanges);
|
||||||
@@ -71,7 +75,9 @@ nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer,
|
|||||||
|
|
||||||
time = engine->timers.now + timeout;
|
time = engine->timers.now + timeout;
|
||||||
|
|
||||||
if (timer->state != NXT_TIMER_CHANGING) {
|
nxt_debug(timer->task, "timer add: %M %M:%M", timer->time, timeout, time);
|
||||||
|
|
||||||
|
timer->enabled = 1;
|
||||||
|
|
||||||
if (nxt_timer_is_in_tree(timer)) {
|
if (nxt_timer_is_in_tree(timer)) {
|
||||||
|
|
||||||
@@ -82,55 +88,34 @@ nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer,
|
|||||||
* decreases number of rbtree operations for fast connections.
|
* decreases number of rbtree operations for fast connections.
|
||||||
*/
|
*/
|
||||||
if (nxt_abs(diff) < timer->precision) {
|
if (nxt_abs(diff) < timer->precision) {
|
||||||
nxt_debug(timer->task, "timer previous: %M:%d",
|
nxt_debug(timer->task, "timer previous: %M", time);
|
||||||
time, timer->state);
|
|
||||||
|
|
||||||
timer->state = NXT_TIMER_WAITING;
|
nxt_timer_change(engine, timer, NXT_TIMER_NOPE, 0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
nxt_debug(timer->task, "timer add: %M:%d %M:%M",
|
|
||||||
timer->time, timer->state, timeout, time);
|
|
||||||
|
|
||||||
nxt_timer_change(engine, timer, NXT_TIMER_ADD, time);
|
nxt_timer_change(engine, timer, NXT_TIMER_ADD, time);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_timer_disable(nxt_event_engine_t *engine, nxt_timer_t *timer)
|
|
||||||
{
|
|
||||||
nxt_debug(timer->task, "timer disable: %M:%d", timer->time, timer->state);
|
|
||||||
|
|
||||||
if (timer->state != NXT_TIMER_CHANGING) {
|
|
||||||
timer->state = NXT_TIMER_DISABLED;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
nxt_timer_change(engine, timer, NXT_TIMER_DISABLE, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
nxt_bool_t
|
nxt_bool_t
|
||||||
nxt_timer_delete(nxt_event_engine_t *engine, nxt_timer_t *timer)
|
nxt_timer_delete(nxt_event_engine_t *engine, nxt_timer_t *timer)
|
||||||
{
|
{
|
||||||
nxt_bool_t pending;
|
nxt_debug(timer->task, "timer delete: %M", timer->time);
|
||||||
|
|
||||||
if (nxt_timer_is_in_tree(timer) || timer->state == NXT_TIMER_CHANGING) {
|
timer->enabled = 0;
|
||||||
nxt_debug(timer->task, "timer delete: %M:%d",
|
|
||||||
timer->time, timer->state);
|
if (nxt_timer_is_in_tree(timer)) {
|
||||||
|
|
||||||
nxt_timer_change(engine, timer, NXT_TIMER_DELETE, 0);
|
nxt_timer_change(engine, timer, NXT_TIMER_DELETE, 0);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pending = (timer->state == NXT_TIMER_ENQUEUED);
|
nxt_timer_change(engine, timer, NXT_TIMER_NOPE, 0);
|
||||||
|
|
||||||
timer->state = NXT_TIMER_DISABLED;
|
return (timer->queued || timer->change != NXT_TIMER_NO_CHANGE);
|
||||||
|
|
||||||
return pending;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -143,31 +128,35 @@ nxt_timer_change(nxt_event_engine_t *engine, nxt_timer_t *timer,
|
|||||||
|
|
||||||
timers = &engine->timers;
|
timers = &engine->timers;
|
||||||
|
|
||||||
|
if (timer->change == NXT_TIMER_NO_CHANGE) {
|
||||||
|
|
||||||
|
if (change == NXT_TIMER_NOPE) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (timers->nchanges >= timers->mchanges) {
|
if (timers->nchanges >= timers->mchanges) {
|
||||||
nxt_timer_changes_commit(engine);
|
nxt_timer_changes_commit(engine);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
timers->nchanges++;
|
||||||
|
timer->change = timers->nchanges;
|
||||||
|
}
|
||||||
|
|
||||||
nxt_debug(timer->task, "timer change: %M:%d", time, change);
|
nxt_debug(timer->task, "timer change: %M:%d", time, change);
|
||||||
|
|
||||||
timer->state = NXT_TIMER_CHANGING;
|
ch = &timers->changes[timer->change - 1];
|
||||||
|
|
||||||
ch = &timers->changes[timers->nchanges];
|
|
||||||
|
|
||||||
ch->change = change;
|
ch->change = change;
|
||||||
ch->time = time;
|
ch->time = time;
|
||||||
ch->timer = timer;
|
ch->timer = timer;
|
||||||
|
|
||||||
timers->nchanges++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_timer_changes_commit(nxt_event_engine_t *engine)
|
nxt_timer_changes_commit(nxt_event_engine_t *engine)
|
||||||
{
|
{
|
||||||
int32_t diff;
|
|
||||||
nxt_timer_t *timer;
|
nxt_timer_t *timer;
|
||||||
nxt_timers_t *timers;
|
nxt_timers_t *timers;
|
||||||
nxt_timer_state_t state;
|
|
||||||
nxt_timer_change_t *ch, *end;
|
nxt_timer_change_t *ch, *end;
|
||||||
|
|
||||||
timers = &engine->timers;
|
timers = &engine->timers;
|
||||||
@@ -178,28 +167,16 @@ nxt_timer_changes_commit(nxt_event_engine_t *engine)
|
|||||||
end = ch + timers->nchanges;
|
end = ch + timers->nchanges;
|
||||||
|
|
||||||
while (ch < end) {
|
while (ch < end) {
|
||||||
state = NXT_TIMER_DISABLED;
|
|
||||||
|
|
||||||
timer = ch->timer;
|
timer = ch->timer;
|
||||||
|
|
||||||
switch (ch->change) {
|
switch (ch->change) {
|
||||||
|
|
||||||
|
case NXT_TIMER_NOPE:
|
||||||
|
break;
|
||||||
|
|
||||||
case NXT_TIMER_ADD:
|
case NXT_TIMER_ADD:
|
||||||
if (nxt_timer_is_in_tree(timer)) {
|
if (nxt_timer_is_in_tree(timer)) {
|
||||||
|
nxt_debug(timer->task, "timer rbtree delete: %M", timer->time);
|
||||||
diff = nxt_msec_diff(ch->time, timer->time);
|
|
||||||
/* See the comment in nxt_timer_add(). */
|
|
||||||
|
|
||||||
if (nxt_abs(diff) < timer->precision) {
|
|
||||||
nxt_debug(timer->task, "timer rbtree previous: %M:%d",
|
|
||||||
ch->time, timer->state);
|
|
||||||
|
|
||||||
state = NXT_TIMER_WAITING;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_debug(timer->task, "timer rbtree delete: %M:%d",
|
|
||||||
timer->time, timer->state);
|
|
||||||
|
|
||||||
nxt_rbtree_delete(&timers->tree, &timer->node);
|
nxt_rbtree_delete(&timers->tree, &timer->node);
|
||||||
}
|
}
|
||||||
@@ -210,26 +187,19 @@ nxt_timer_changes_commit(nxt_event_engine_t *engine)
|
|||||||
|
|
||||||
nxt_rbtree_insert(&timers->tree, &timer->node);
|
nxt_rbtree_insert(&timers->tree, &timer->node);
|
||||||
nxt_timer_in_tree_set(timer);
|
nxt_timer_in_tree_set(timer);
|
||||||
state = NXT_TIMER_WAITING;
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case NXT_TIMER_DELETE:
|
case NXT_TIMER_DELETE:
|
||||||
if (nxt_timer_is_in_tree(timer)) {
|
nxt_debug(timer->task, "timer rbtree delete: %M", timer->time);
|
||||||
nxt_debug(timer->task, "timer rbtree delete: %M:%d",
|
|
||||||
timer->time, timer->state);
|
|
||||||
|
|
||||||
nxt_rbtree_delete(&timers->tree, &timer->node);
|
nxt_rbtree_delete(&timers->tree, &timer->node);
|
||||||
nxt_timer_in_tree_clear(timer);
|
nxt_timer_in_tree_clear(timer);
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case NXT_TIMER_DISABLE:
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
timer->state = state;
|
timer->change = NXT_TIMER_NO_CHANGE;
|
||||||
|
|
||||||
ch++;
|
ch++;
|
||||||
}
|
}
|
||||||
@@ -270,7 +240,7 @@ nxt_timer_find(nxt_event_engine_t *engine)
|
|||||||
* return much earlier and the disabled timer can be reactivated.
|
* return much earlier and the disabled timer can be reactivated.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
if (timer->state != NXT_TIMER_DISABLED) {
|
if (timer->enabled) {
|
||||||
time = timer->time;
|
time = timer->time;
|
||||||
timers->minimum = time;
|
timers->minimum = time;
|
||||||
|
|
||||||
@@ -324,14 +294,13 @@ nxt_timer_expire(nxt_event_engine_t *engine, nxt_msec_t now)
|
|||||||
|
|
||||||
next = nxt_rbtree_node_successor(tree, node);
|
next = nxt_rbtree_node_successor(tree, node);
|
||||||
|
|
||||||
nxt_debug(timer->task, "timer expire delete: %M:%d",
|
nxt_debug(timer->task, "timer expire delete: %M", timer->time);
|
||||||
timer->time, timer->state);
|
|
||||||
|
|
||||||
nxt_rbtree_delete(tree, &timer->node);
|
nxt_rbtree_delete(tree, &timer->node);
|
||||||
nxt_timer_in_tree_clear(timer);
|
nxt_timer_in_tree_clear(timer);
|
||||||
|
|
||||||
if (timer->state != NXT_TIMER_DISABLED) {
|
if (timer->enabled) {
|
||||||
timer->state = NXT_TIMER_ENQUEUED;
|
timer->queued = 1;
|
||||||
|
|
||||||
nxt_work_queue_add(timer->work_queue, nxt_timer_handler,
|
nxt_work_queue_add(timer->work_queue, nxt_timer_handler,
|
||||||
timer->task, timer, NULL);
|
timer->task, timer, NULL);
|
||||||
@@ -347,8 +316,10 @@ nxt_timer_handler(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
timer = obj;
|
timer = obj;
|
||||||
|
|
||||||
if (timer->state == NXT_TIMER_ENQUEUED) {
|
timer->queued = 0;
|
||||||
timer->state = NXT_TIMER_DISABLED;
|
|
||||||
|
if (timer->enabled && timer->change == NXT_TIMER_NO_CHANGE) {
|
||||||
|
timer->enabled = 0;
|
||||||
|
|
||||||
timer->handler(task, timer, NULL);
|
timer->handler(task, timer, NULL);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,20 +13,24 @@
|
|||||||
//#define NXT_TIMER_DEFAULT_PRECISION 1
|
//#define NXT_TIMER_DEFAULT_PRECISION 1
|
||||||
|
|
||||||
|
|
||||||
typedef enum {
|
/*
|
||||||
NXT_TIMER_DISABLED = 0,
|
* The nxt_timer_t structure can hold up to 14 bits of change index,
|
||||||
NXT_TIMER_CHANGING,
|
* but 0 reserved for NXT_TIMER_NO_CHANGE.
|
||||||
NXT_TIMER_WAITING,
|
*/
|
||||||
NXT_TIMER_ENQUEUED,
|
#define NXT_TIMER_MAX_CHANGES 16383
|
||||||
} nxt_timer_state_t;
|
#define NXT_TIMER_NO_CHANGE 0
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
/* The rbtree node must be the first field. */
|
/* The rbtree node must be the first field. */
|
||||||
NXT_RBTREE_NODE (node);
|
NXT_RBTREE_NODE (node);
|
||||||
|
|
||||||
nxt_timer_state_t state:8;
|
|
||||||
uint8_t precision;
|
uint8_t precision;
|
||||||
|
|
||||||
|
uint16_t change:14;
|
||||||
|
uint16_t enabled:1;
|
||||||
|
uint16_t queued:1;
|
||||||
|
|
||||||
nxt_msec_t time;
|
nxt_msec_t time;
|
||||||
|
|
||||||
nxt_work_queue_t *work_queue;
|
nxt_work_queue_t *work_queue;
|
||||||
@@ -37,13 +41,13 @@ typedef struct {
|
|||||||
} nxt_timer_t;
|
} nxt_timer_t;
|
||||||
|
|
||||||
|
|
||||||
#define NXT_TIMER { NXT_RBTREE_NODE_INIT, NXT_TIMER_DISABLED, \
|
#define NXT_TIMER { NXT_RBTREE_NODE_INIT, 0, NXT_TIMER_NO_CHANGE, \
|
||||||
0, 0, NULL, NULL, NULL, NULL }
|
0, 0, 0, NULL, NULL, NULL, NULL }
|
||||||
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
NXT_TIMER_ADD = 0,
|
NXT_TIMER_NOPE = 0,
|
||||||
NXT_TIMER_DISABLE,
|
NXT_TIMER_ADD,
|
||||||
NXT_TIMER_DELETE,
|
NXT_TIMER_DELETE,
|
||||||
} nxt_timer_operation_t;
|
} nxt_timer_operation_t;
|
||||||
|
|
||||||
@@ -94,10 +98,16 @@ void nxt_timer_expire(nxt_event_engine_t *engine, nxt_msec_t now);
|
|||||||
|
|
||||||
NXT_EXPORT void nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer,
|
NXT_EXPORT void nxt_timer_add(nxt_event_engine_t *engine, nxt_timer_t *timer,
|
||||||
nxt_msec_t timeout);
|
nxt_msec_t timeout);
|
||||||
NXT_EXPORT void nxt_timer_disable(nxt_event_engine_t *engine,
|
|
||||||
nxt_timer_t *timer);
|
|
||||||
NXT_EXPORT nxt_bool_t nxt_timer_delete(nxt_event_engine_t *engine,
|
NXT_EXPORT nxt_bool_t nxt_timer_delete(nxt_event_engine_t *engine,
|
||||||
nxt_timer_t *timer);
|
nxt_timer_t *timer);
|
||||||
|
|
||||||
|
nxt_inline void
|
||||||
|
nxt_timer_disable(nxt_event_engine_t *engine, nxt_timer_t *timer)
|
||||||
|
{
|
||||||
|
nxt_debug(timer->task, "timer disable: %M", timer->time);
|
||||||
|
|
||||||
|
timer->enabled = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif /* _NXT_TIMER_H_INCLUDED_ */
|
#endif /* _NXT_TIMER_H_INCLUDED_ */
|
||||||
|
|||||||
Reference in New Issue
Block a user