Router: using joint job queues instead of arrays to pass

listening socket handlers to worker engines.
This commit is contained in:
Igor Sysoev
2017-07-14 17:17:15 +03:00
parent 668aabac3c
commit 3ed35d725a
2 changed files with 36 additions and 91 deletions

View File

@@ -67,9 +67,9 @@ static nxt_int_t nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf, static nxt_int_t nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf); nxt_router_engine_conf_t *recf);
static void nxt_router_engine_socket_count(nxt_queue_t *sockets); static void nxt_router_engine_socket_count(nxt_queue_t *sockets);
static nxt_int_t nxt_router_engine_joints_create(nxt_mp_t *mp, static nxt_int_t nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_t *recf, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_queue_t *sockets, nxt_array_t *array, nxt_work_handler_t handler); nxt_work_handler_t handler);
static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf, static nxt_int_t nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets); nxt_router_engine_conf_t *recf, nxt_queue_t *sockets);
@@ -81,8 +81,7 @@ static void nxt_router_apps_sort(nxt_router_t *router,
nxt_router_temp_conf_t *tmcf); nxt_router_temp_conf_t *tmcf);
static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf); static void nxt_router_engines_post(nxt_router_temp_conf_t *tmcf);
static void nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, static void nxt_router_engine_post(nxt_router_engine_conf_t *recf);
nxt_router_engine_conf_t *recf);
static void nxt_router_thread_start(void *data); static void nxt_router_thread_start(void *data);
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj, static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
@@ -944,26 +943,17 @@ static nxt_int_t
nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf) nxt_router_engine_conf_t *recf)
{ {
nxt_mp_t *mp;
nxt_int_t ret; nxt_int_t ret;
nxt_thread_spinlock_t *lock; nxt_thread_spinlock_t *lock;
recf->creating = nxt_array_create(tmcf->mem_pool, 4, ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
sizeof(nxt_joint_job_t)); nxt_router_listen_socket_create);
if (nxt_slow_path(recf->creating == NULL)) {
return NXT_ERROR;
}
mp = tmcf->conf->mem_pool;
ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating,
recf->creating, nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
} }
ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating, ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
recf->creating, nxt_router_listen_socket_create); nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
} }
@@ -985,42 +975,21 @@ static nxt_int_t
nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf, nxt_router_engine_conf_update(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf) nxt_router_engine_conf_t *recf)
{ {
nxt_mp_t *mp;
nxt_int_t ret; nxt_int_t ret;
nxt_thread_spinlock_t *lock; nxt_thread_spinlock_t *lock;
recf->creating = nxt_array_create(tmcf->mem_pool, 4, ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->creating,
sizeof(nxt_joint_job_t)); nxt_router_listen_socket_create);
if (nxt_slow_path(recf->creating == NULL)) {
return NXT_ERROR;
}
mp = tmcf->conf->mem_pool;
ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->creating,
recf->creating, nxt_router_listen_socket_create);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
} }
recf->updating = nxt_array_create(tmcf->mem_pool, 4, ret = nxt_router_engine_joints_create(tmcf, recf, &tmcf->updating,
sizeof(nxt_joint_job_t)); nxt_router_listen_socket_update);
if (nxt_slow_path(recf->updating == NULL)) {
return NXT_ERROR;
}
ret = nxt_router_engine_joints_create(mp, tmcf, recf, &tmcf->updating,
recf->updating, nxt_router_listen_socket_update);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
} }
recf->deleting = nxt_array_create(tmcf->mem_pool, 4,
sizeof(nxt_joint_job_t));
if (nxt_slow_path(recf->deleting == NULL)) {
return NXT_ERROR;
}
ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting); ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->deleting);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
@@ -1044,12 +1013,6 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
{ {
nxt_int_t ret; nxt_int_t ret;
recf->deleting = nxt_array_create(tmcf->mem_pool,
4, sizeof(nxt_joint_job_t));
if (nxt_slow_path(recf->deleting == NULL)) {
return NXT_ERROR;
}
ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating); ret = nxt_router_engine_joints_delete(tmcf, recf, &tmcf->updating);
if (nxt_slow_path(ret != NXT_OK)) { if (nxt_slow_path(ret != NXT_OK)) {
return ret; return ret;
@@ -1060,8 +1023,8 @@ nxt_router_engine_conf_delete(nxt_router_temp_conf_t *tmcf,
static nxt_int_t static nxt_int_t
nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf, nxt_router_engine_joints_create(nxt_router_temp_conf_t *tmcf,
nxt_router_engine_conf_t *recf, nxt_queue_t *sockets, nxt_array_t *array, nxt_router_engine_conf_t *recf, nxt_queue_t *sockets,
nxt_work_handler_t handler) nxt_work_handler_t handler)
{ {
nxt_joint_job_t *job; nxt_joint_job_t *job;
@@ -1072,19 +1035,24 @@ nxt_router_engine_joints_create(nxt_mp_t *mp, nxt_router_temp_conf_t *tmcf,
qlk != nxt_queue_tail(sockets); qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk)) qlk = nxt_queue_next(qlk))
{ {
job = nxt_array_add(array); job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) { if (nxt_slow_path(job == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
job->work.next = recf->jobs;
recf->jobs = &job->work;
job->task = tmcf->engine->task; job->task = tmcf->engine->task;
job->work.next = NULL;
job->work.handler = handler; job->work.handler = handler;
job->work.task = &job->task; job->work.task = &job->task;
job->work.obj = job; job->work.obj = job;
job->tmcf = tmcf; job->tmcf = tmcf;
joint = nxt_mp_alloc(mp, sizeof(nxt_socket_conf_joint_t)); tmcf->count++;
joint = nxt_mp_alloc(tmcf->conf->mem_pool,
sizeof(nxt_socket_conf_joint_t));
if (nxt_slow_path(joint == NULL)) { if (nxt_slow_path(joint == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
@@ -1129,18 +1097,22 @@ nxt_router_engine_joints_delete(nxt_router_temp_conf_t *tmcf,
qlk != nxt_queue_tail(sockets); qlk != nxt_queue_tail(sockets);
qlk = nxt_queue_next(qlk)) qlk = nxt_queue_next(qlk))
{ {
job = nxt_array_add(recf->deleting); job = nxt_mp_get(tmcf->mem_pool, sizeof(nxt_joint_job_t));
if (nxt_slow_path(job == NULL)) { if (nxt_slow_path(job == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
job->work.next = recf->jobs;
recf->jobs = &job->work;
job->task = tmcf->engine->task; job->task = tmcf->engine->task;
job->work.next = NULL;
job->work.handler = nxt_router_listen_socket_delete; job->work.handler = nxt_router_listen_socket_delete;
job->work.task = &job->task; job->work.task = &job->task;
job->work.obj = job; job->work.obj = job;
job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link); job->work.data = nxt_queue_link_data(qlk, nxt_socket_conf_t, link);
job->tmcf = tmcf; job->tmcf = tmcf;
tmcf->count++;
} }
return NXT_OK; return NXT_OK;
@@ -1231,47 +1203,22 @@ nxt_router_engines_post(nxt_router_temp_conf_t *tmcf)
recf = tmcf->engines->elts; recf = tmcf->engines->elts;
for (n = tmcf->engines->nelts; n != 0; n--) { for (n = tmcf->engines->nelts; n != 0; n--) {
nxt_router_engine_post(tmcf, recf); nxt_router_engine_post(recf);
recf++; recf++;
} }
} }
static void static void
nxt_router_engine_post(nxt_router_temp_conf_t *tmcf, nxt_router_engine_post(nxt_router_engine_conf_t *recf)
nxt_router_engine_conf_t *recf)
{ {
nxt_uint_t n; nxt_work_t *work, *next;
nxt_joint_job_t *job;
if (recf->creating != NULL) { for (work = recf->jobs; work != NULL; work = next) {
job = recf->creating->elts; next = work->next;
work->next = NULL;
for (n = recf->creating->nelts; n != 0; n--) { nxt_event_engine_post(recf->engine, work);
nxt_event_engine_post(recf->engine, &job->work);
job++;
tmcf->count++;
}
}
if (recf->updating != NULL) {
job = recf->updating->elts;
for (n = recf->updating->nelts; n != 0; n--) {
nxt_event_engine_post(recf->engine, &job->work);
job++;
tmcf->count++;
}
}
if (recf->deleting != NULL) {
job = recf->deleting->elts;
for (n = recf->deleting->nelts; n != 0; n--) {
nxt_event_engine_post(recf->engine, &job->work);
job++;
tmcf->count++;
}
} }
} }

View File

@@ -35,9 +35,7 @@ typedef struct {
typedef struct { typedef struct {
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
nxt_array_t *creating; /* of nxt_joint_job_t */ nxt_work_t *jobs;
nxt_array_t *updating; /* of nxt_joint_job_t */
nxt_array_t *deleting; /* of nxt_joint_job_t */
} nxt_router_engine_conf_t; } nxt_router_engine_conf_t;