Circular queues implementations and a test.
- naive circular queue, described in the article "A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev: https://drops.dagstuhl.de/opus/volltexte/2019/11335/pdf/LIPIcs-DISC-2019-28.pdf - circular queue, proposed by Valentin Bartenev in the "Unit router application IPC" design draft
This commit is contained in:
52
auto/make
52
auto/make
@@ -130,6 +130,42 @@ END
|
||||
|
||||
done
|
||||
|
||||
nxt_src=src/test/nxt_cq_test.c
|
||||
nxt_obj=src/test/nxt_ncq_test.o
|
||||
nxt_dep=src/test/nxt_ncq_test.dep
|
||||
nxt_dep_flags=`nxt_gen_dep_flags`
|
||||
nxt_dep_post=`nxt_gen_dep_post`
|
||||
cat << END >> $NXT_MAKEFILE
|
||||
|
||||
$NXT_BUILD_DIR/$nxt_obj: $nxt_src $NXT_VERSION_H
|
||||
\$(CC) -c \$(CFLAGS) -DNXT_NCQ_TEST=1 \$(NXT_LIB_INCS) $NXT_LIB_AUX_CFLAGS \\
|
||||
-o $NXT_BUILD_DIR/$nxt_obj \\
|
||||
$nxt_dep_flags \\
|
||||
$nxt_src
|
||||
$nxt_dep_post
|
||||
|
||||
-include $NXT_BUILD_DIR/$nxt_dep
|
||||
|
||||
END
|
||||
|
||||
nxt_src=src/test/nxt_cq_test.c
|
||||
nxt_obj=src/test/nxt_vbcq_test.o
|
||||
nxt_dep=src/test/nxt_vbcq_test.dep
|
||||
nxt_dep_flags=`nxt_gen_dep_flags`
|
||||
nxt_dep_post=`nxt_gen_dep_post`
|
||||
cat << END >> $NXT_MAKEFILE
|
||||
|
||||
$NXT_BUILD_DIR/$nxt_obj: $nxt_src $NXT_VERSION_H
|
||||
\$(CC) -c \$(CFLAGS) -DNXT_NCQ_TEST=0 \$(NXT_LIB_INCS) $NXT_LIB_AUX_CFLAGS \\
|
||||
-o $NXT_BUILD_DIR/$nxt_obj \\
|
||||
$nxt_dep_flags \\
|
||||
$nxt_src
|
||||
$nxt_dep_post
|
||||
|
||||
-include $NXT_BUILD_DIR/$nxt_dep
|
||||
|
||||
END
|
||||
|
||||
$echo >> $NXT_MAKEFILE
|
||||
|
||||
|
||||
@@ -151,6 +187,8 @@ if [ $NXT_TESTS = YES ]; then
|
||||
|
||||
.PHONY: tests
|
||||
tests: $NXT_BUILD_DIR/tests $NXT_BUILD_DIR/utf8_file_name_test \\
|
||||
$NXT_BUILD_DIR/ncq_test \\
|
||||
$NXT_BUILD_DIR/vbcq_test \\
|
||||
$NXT_BUILD_DIR/unit_app_test $NXT_BUILD_DIR/unit_websocket_chat \\
|
||||
$NXT_BUILD_DIR/unit_websocket_echo
|
||||
|
||||
@@ -169,6 +207,20 @@ $NXT_BUILD_DIR/utf8_file_name_test: $NXT_LIB_UTF8_FILE_NAME_TEST_SRCS \\
|
||||
$NXT_BUILD_DIR/$NXT_LIB_STATIC \\
|
||||
$NXT_LD_OPT $NXT_LIBM $NXT_LIBS $NXT_LIB_AUX_LIBS
|
||||
|
||||
$NXT_BUILD_DIR/ncq_test: $NXT_BUILD_DIR/src/test/nxt_ncq_test.o \\
|
||||
$NXT_BUILD_DIR/$NXT_LIB_STATIC
|
||||
\$(NXT_EXEC_LINK) -o $NXT_BUILD_DIR/ncq_test \\
|
||||
\$(CFLAGS) $NXT_BUILD_DIR/src/test/nxt_ncq_test.o \\
|
||||
$NXT_BUILD_DIR/$NXT_LIB_STATIC \\
|
||||
$NXT_LD_OPT $NXT_LIBM $NXT_LIBS $NXT_LIB_AUX_LIBS
|
||||
|
||||
$NXT_BUILD_DIR/vbcq_test: $NXT_BUILD_DIR/src/test/nxt_vbcq_test.o \\
|
||||
$NXT_BUILD_DIR/$NXT_LIB_STATIC
|
||||
\$(NXT_EXEC_LINK) -o $NXT_BUILD_DIR/vbcq_test \\
|
||||
\$(CFLAGS) $NXT_BUILD_DIR/src/test/nxt_vbcq_test.o \\
|
||||
$NXT_BUILD_DIR/$NXT_LIB_STATIC \\
|
||||
$NXT_LD_OPT $NXT_LIBM $NXT_LIBS $NXT_LIB_AUX_LIBS
|
||||
|
||||
$NXT_BUILD_DIR/unit_app_test: $NXT_BUILD_DIR/src/test/nxt_unit_app_test.o \\
|
||||
$NXT_BUILD_DIR/$NXT_LIB_UNIT_STATIC
|
||||
\$(NXT_EXEC_LINK) -o $NXT_BUILD_DIR/unit_app_test \\
|
||||
|
||||
162
src/nxt_nncq.h
Normal file
162
src/nxt_nncq.h
Normal file
@@ -0,0 +1,162 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_NNCQ_H_INCLUDED_
|
||||
#define _NXT_NNCQ_H_INCLUDED_
|
||||
|
||||
|
||||
/* Numeric Naive Circular Queue */
|
||||
|
||||
#define NXT_NNCQ_SIZE 16384
|
||||
|
||||
typedef uint32_t nxt_nncq_atomic_t;
|
||||
typedef uint16_t nxt_nncq_cycle_t;
|
||||
|
||||
typedef struct {
|
||||
nxt_nncq_atomic_t head;
|
||||
nxt_nncq_atomic_t entries[NXT_NNCQ_SIZE];
|
||||
nxt_nncq_atomic_t tail;
|
||||
} nxt_nncq_t;
|
||||
|
||||
|
||||
static inline nxt_nncq_atomic_t
|
||||
nxt_nncq_head(nxt_nncq_t const volatile *q)
|
||||
{
|
||||
return q->head;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_atomic_t
|
||||
nxt_nncq_tail(nxt_nncq_t const volatile *q)
|
||||
{
|
||||
return q->tail;
|
||||
}
|
||||
|
||||
|
||||
static inline void
|
||||
nxt_nncq_tail_cmp_inc(nxt_nncq_t volatile *q, nxt_nncq_atomic_t t)
|
||||
{
|
||||
nxt_atomic_cmp_set(&q->tail, t, t + 1);
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_atomic_t
|
||||
nxt_nncq_index(nxt_nncq_t const volatile *q, nxt_nncq_atomic_t i)
|
||||
{
|
||||
return i % NXT_NNCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_atomic_t
|
||||
nxt_nncq_map(nxt_nncq_t const volatile *q, nxt_nncq_atomic_t i)
|
||||
{
|
||||
return i % NXT_NNCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_cycle_t
|
||||
nxt_nncq_cycle(nxt_nncq_t const volatile *q, nxt_nncq_atomic_t i)
|
||||
{
|
||||
return i / NXT_NNCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_cycle_t
|
||||
nxt_nncq_next_cycle(nxt_nncq_t const volatile *q, nxt_nncq_cycle_t i)
|
||||
{
|
||||
return i + 1;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_atomic_t
|
||||
nxt_nncq_new_entry(nxt_nncq_t const volatile *q, nxt_nncq_cycle_t cycle,
|
||||
nxt_nncq_atomic_t i)
|
||||
{
|
||||
return cycle * NXT_NNCQ_SIZE + (i % NXT_NNCQ_SIZE);
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nncq_atomic_t
|
||||
nxt_nncq_empty(nxt_nncq_t const volatile *q)
|
||||
{
|
||||
return NXT_NNCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_nncq_init(nxt_nncq_t volatile *q)
|
||||
{
|
||||
q->head = NXT_NNCQ_SIZE;
|
||||
nxt_memzero((void *) q->entries, NXT_NNCQ_SIZE * sizeof(nxt_nncq_atomic_t));
|
||||
q->tail = NXT_NNCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_nncq_enqueue(nxt_nncq_t volatile *q, nxt_nncq_atomic_t val)
|
||||
{
|
||||
nxt_nncq_cycle_t e_cycle, t_cycle;
|
||||
nxt_nncq_atomic_t n, t, e, j;
|
||||
|
||||
for ( ;; ) {
|
||||
t = nxt_nncq_tail(q);
|
||||
j = nxt_nncq_map(q, t);
|
||||
e = q->entries[j];
|
||||
|
||||
e_cycle = nxt_nncq_cycle(q, e);
|
||||
t_cycle = nxt_nncq_cycle(q, t);
|
||||
|
||||
if (e_cycle == t_cycle) {
|
||||
nxt_nncq_tail_cmp_inc(q, t);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nxt_nncq_next_cycle(q, e_cycle) != t_cycle) {
|
||||
continue;
|
||||
}
|
||||
|
||||
n = nxt_nncq_new_entry(q, t_cycle, val);
|
||||
|
||||
if (nxt_atomic_cmp_set(&q->entries[j], e, n)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
nxt_nncq_tail_cmp_inc(q, t);
|
||||
}
|
||||
|
||||
|
||||
static nxt_nncq_atomic_t
|
||||
nxt_nncq_dequeue(nxt_nncq_t volatile *q)
|
||||
{
|
||||
nxt_nncq_cycle_t e_cycle, h_cycle;
|
||||
nxt_nncq_atomic_t h, j, e;
|
||||
|
||||
for ( ;; ) {
|
||||
h = nxt_nncq_head(q);
|
||||
j = nxt_nncq_map(q, h);
|
||||
e = q->entries[j];
|
||||
|
||||
e_cycle = nxt_nncq_cycle(q, e);
|
||||
h_cycle = nxt_nncq_cycle(q, h);
|
||||
|
||||
if (e_cycle != h_cycle) {
|
||||
if (nxt_nncq_next_cycle(q, e_cycle) == h_cycle) {
|
||||
return nxt_nncq_empty(q);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
if (nxt_atomic_cmp_set(&q->head, h, h + 1)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return nxt_nncq_index(q, e);
|
||||
}
|
||||
|
||||
|
||||
#endif /* _NXT_NNCQ_H_INCLUDED_ */
|
||||
146
src/nxt_nvbcq.h
Normal file
146
src/nxt_nvbcq.h
Normal file
@@ -0,0 +1,146 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#ifndef _NXT_NVBCQ_H_INCLUDED_
|
||||
#define _NXT_NVBCQ_H_INCLUDED_
|
||||
|
||||
|
||||
/* Numeric VBart Circular Queue */
|
||||
|
||||
#define NXT_NVBCQ_SIZE 16384
|
||||
|
||||
typedef uint32_t nxt_nvbcq_atomic_t;
|
||||
|
||||
struct nxt_nvbcq_s {
|
||||
nxt_nvbcq_atomic_t head;
|
||||
nxt_nvbcq_atomic_t entries[NXT_NVBCQ_SIZE];
|
||||
nxt_nvbcq_atomic_t tail;
|
||||
};
|
||||
|
||||
typedef struct nxt_nvbcq_s nxt_nvbcq_t;
|
||||
|
||||
|
||||
static inline nxt_nvbcq_atomic_t
|
||||
nxt_nvbcq_head(nxt_nvbcq_t const volatile *q)
|
||||
{
|
||||
return q->head;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nvbcq_atomic_t
|
||||
nxt_nvbcq_tail(nxt_nvbcq_t const volatile *q)
|
||||
{
|
||||
return q->tail;
|
||||
}
|
||||
|
||||
|
||||
static inline void
|
||||
nxt_nvbcq_tail_cmp_inc(nxt_nvbcq_t volatile *q, nxt_nvbcq_atomic_t t)
|
||||
{
|
||||
nxt_atomic_cmp_set(&q->tail, t, t + 1);
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nvbcq_atomic_t
|
||||
nxt_nvbcq_index(nxt_nvbcq_t const volatile *q, nxt_nvbcq_atomic_t i)
|
||||
{
|
||||
return i % NXT_NVBCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nvbcq_atomic_t
|
||||
nxt_nvbcq_map(nxt_nvbcq_t const volatile *q, nxt_nvbcq_atomic_t i)
|
||||
{
|
||||
return i % NXT_NVBCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static inline nxt_nvbcq_atomic_t
|
||||
nxt_nvbcq_empty(nxt_nvbcq_t const volatile *q)
|
||||
{
|
||||
return NXT_NVBCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_nvbcq_init(nxt_nvbcq_t volatile *q)
|
||||
{
|
||||
nxt_nvbcq_atomic_t i;
|
||||
|
||||
q->head = 0;
|
||||
|
||||
for (i = 0; i < NXT_NVBCQ_SIZE; i++) {
|
||||
q->entries[i] = NXT_NVBCQ_SIZE;
|
||||
}
|
||||
|
||||
q->tail = NXT_NVBCQ_SIZE;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_nvbcq_enqueue(nxt_nvbcq_t volatile *q, nxt_nvbcq_atomic_t val)
|
||||
{
|
||||
nxt_nvbcq_atomic_t t, h, i;
|
||||
|
||||
t = nxt_nvbcq_tail(q);
|
||||
h = t - NXT_NVBCQ_SIZE;
|
||||
|
||||
for ( ;; ) {
|
||||
i = nxt_nvbcq_map(q, t);
|
||||
|
||||
if (q->entries[i] == NXT_NVBCQ_SIZE
|
||||
&& nxt_atomic_cmp_set(&q->entries[i], NXT_NVBCQ_SIZE, val))
|
||||
{
|
||||
nxt_nvbcq_tail_cmp_inc(q, t);
|
||||
return;
|
||||
}
|
||||
|
||||
if ((t - h) == NXT_NVBCQ_SIZE) {
|
||||
h = nxt_nvbcq_head(q);
|
||||
|
||||
if ((t - h) == NXT_NVBCQ_SIZE) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
t++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static nxt_nvbcq_atomic_t
|
||||
nxt_nvbcq_dequeue(nxt_nvbcq_t volatile *q)
|
||||
{
|
||||
nxt_nvbcq_atomic_t h, t, i, e;
|
||||
|
||||
h = nxt_nvbcq_head(q);
|
||||
t = h + NXT_NVBCQ_SIZE;
|
||||
|
||||
for ( ;; ) {
|
||||
i = nxt_nvbcq_map(q, h);
|
||||
e = q->entries[i];
|
||||
|
||||
if (e < NXT_NVBCQ_SIZE
|
||||
&& nxt_atomic_cmp_set(&q->entries[i], e, NXT_NVBCQ_SIZE))
|
||||
{
|
||||
nxt_atomic_cmp_set(&q->head, h, h + 1);
|
||||
|
||||
return e;
|
||||
}
|
||||
|
||||
if ((t - h) == NXT_NVBCQ_SIZE) {
|
||||
t = nxt_nvbcq_tail(q);
|
||||
|
||||
if ((t - h) == NXT_NVBCQ_SIZE) {
|
||||
return NXT_NVBCQ_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
h++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif /* _NXT_NVBCQ_H_INCLUDED_ */
|
||||
578
src/test/nxt_cq_test.c
Normal file
578
src/test/nxt_cq_test.c
Normal file
@@ -0,0 +1,578 @@
|
||||
|
||||
/*
|
||||
* Copyright (C) NGINX, Inc.
|
||||
*/
|
||||
|
||||
#include <nxt_main.h>
|
||||
#include <math.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
#ifndef NXT_NCQ_TEST
|
||||
#define NXT_NCQ_TEST 1
|
||||
#endif
|
||||
|
||||
#define NXT_QTEST_USE_THREAD 0
|
||||
|
||||
#if NXT_NCQ_TEST
|
||||
#include <nxt_nncq.h>
|
||||
#else
|
||||
#include <nxt_nvbcq.h>
|
||||
#endif
|
||||
|
||||
|
||||
#define MAX_ITER 20
|
||||
#define STAT_ITER 5
|
||||
#define MIN_COV 0.02
|
||||
|
||||
extern char **environ;
|
||||
static uintptr_t nops = 10000000;
|
||||
|
||||
static uintptr_t nprocs_enq = 0;
|
||||
static uintptr_t nprocs_deq = 0;
|
||||
static uintptr_t nprocs_wenq = 0;
|
||||
static uintptr_t nprocs_wdeq = 0;
|
||||
static uintptr_t nprocs_enq_deq = 0;
|
||||
static uintptr_t nprocs_cas = 0;
|
||||
static uintptr_t nprocs_faa = 0;
|
||||
|
||||
static uintptr_t nprocs = 1;
|
||||
|
||||
|
||||
static size_t
|
||||
elapsed_time(size_t us)
|
||||
{
|
||||
struct timeval t;
|
||||
|
||||
gettimeofday(&t, NULL);
|
||||
|
||||
return t.tv_sec * 1000000 + t.tv_usec - us;
|
||||
}
|
||||
|
||||
|
||||
static double
|
||||
mean(const double *times, int n)
|
||||
{
|
||||
int i;
|
||||
double sum;
|
||||
|
||||
sum = 0;
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
sum += times[i];
|
||||
}
|
||||
|
||||
return sum / n;
|
||||
}
|
||||
|
||||
|
||||
static double
|
||||
cov(const double *times, double mean, int n)
|
||||
{
|
||||
int i;
|
||||
double variance;
|
||||
|
||||
variance = 0;
|
||||
|
||||
for (i = 0; i < n; i++) {
|
||||
variance += (times[i] - mean) * (times[i] - mean);
|
||||
}
|
||||
|
||||
variance /= n;
|
||||
|
||||
return sqrt(variance) / mean;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
#if NXT_NCQ_TEST
|
||||
nxt_nncq_t free_queue;
|
||||
nxt_nncq_t active_queue;
|
||||
#else
|
||||
nxt_nvbcq_t free_queue;
|
||||
nxt_nvbcq_t active_queue;
|
||||
#endif
|
||||
uint32_t counter;
|
||||
} nxt_cq_t;
|
||||
|
||||
|
||||
static nxt_cq_t *pgq;
|
||||
|
||||
|
||||
#if NXT_NCQ_TEST
|
||||
#define nxt_cq_enqueue nxt_nncq_enqueue
|
||||
#define nxt_cq_dequeue nxt_nncq_dequeue
|
||||
#define nxt_cq_empty nxt_nncq_empty
|
||||
#define nxt_cq_init nxt_nncq_init
|
||||
#define NXT_CQ_SIZE NXT_NNCQ_SIZE
|
||||
#else
|
||||
#define nxt_cq_enqueue nxt_nvbcq_enqueue
|
||||
#define nxt_cq_dequeue nxt_nvbcq_dequeue
|
||||
#define nxt_cq_empty nxt_nvbcq_empty
|
||||
#define nxt_cq_init nxt_nvbcq_init
|
||||
#define NXT_CQ_SIZE NXT_NVBCQ_SIZE
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
int id;
|
||||
uint64_t enq;
|
||||
uint64_t deq;
|
||||
uint64_t wait_enq;
|
||||
uint64_t wait_deq;
|
||||
uint64_t own_res;
|
||||
uint64_t cas;
|
||||
uint64_t faa;
|
||||
|
||||
#if NXT_QTEST_USE_THREAD
|
||||
nxt_thread_handle_t handle;
|
||||
#else
|
||||
nxt_pid_t pid;
|
||||
int status;
|
||||
#endif
|
||||
} nxt_worker_info_t;
|
||||
|
||||
|
||||
static void
|
||||
cas_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uint32_t c;
|
||||
uintptr_t i;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_cas; i++) {
|
||||
c = q->counter;
|
||||
|
||||
if (nxt_atomic_cmp_set(&q->counter, c, c + 1)) {
|
||||
++wi->cas;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
faa_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uintptr_t i;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_faa; i++) {
|
||||
nxt_atomic_fetch_add(&q->counter, 1);
|
||||
wi->faa++;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
enq_deq_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uintptr_t i, v;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_enq_deq; i++) {
|
||||
v = nxt_cq_dequeue(&q->free_queue);
|
||||
|
||||
if (v != nxt_cq_empty(&q->free_queue)) {
|
||||
nxt_cq_enqueue(&q->active_queue, wi->id);
|
||||
wi->enq++;
|
||||
}
|
||||
|
||||
v = nxt_cq_dequeue(&q->active_queue);
|
||||
|
||||
if (v != nxt_cq_empty(&q->active_queue)) {
|
||||
nxt_cq_enqueue(&q->free_queue, v);
|
||||
wi->deq++;
|
||||
|
||||
if ((int) v == wi->id) {
|
||||
wi->own_res++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
enq_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uintptr_t i, v;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_enq; i++) {
|
||||
v = nxt_cq_dequeue(&q->free_queue);
|
||||
|
||||
if (v != nxt_cq_empty(&q->free_queue)) {
|
||||
nxt_cq_enqueue(&q->active_queue, v);
|
||||
wi->enq++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
deq_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uintptr_t i, v;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_deq; i++) {
|
||||
v = nxt_cq_dequeue(&q->active_queue);
|
||||
|
||||
if (v != nxt_cq_empty(&q->active_queue)) {
|
||||
nxt_cq_enqueue(&q->free_queue, v);
|
||||
++wi->deq;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
wenq_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uintptr_t i, v;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_wenq; i++) {
|
||||
|
||||
do {
|
||||
wi->wait_enq++;
|
||||
v = nxt_cq_dequeue(&q->free_queue);
|
||||
} while (v == nxt_cq_empty(&q->free_queue));
|
||||
|
||||
nxt_cq_enqueue(&q->active_queue, v);
|
||||
|
||||
wi->enq++;
|
||||
wi->wait_enq--;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
wdeq_worker(void *p)
|
||||
{
|
||||
nxt_cq_t *q;
|
||||
uintptr_t i, v;
|
||||
nxt_worker_info_t *wi;
|
||||
|
||||
q = pgq;
|
||||
wi = p;
|
||||
|
||||
for (i = 0; i < nops / nprocs_wdeq; i++) {
|
||||
|
||||
do {
|
||||
wi->wait_deq++;
|
||||
v = nxt_cq_dequeue(&q->active_queue);
|
||||
} while (v == nxt_cq_empty(&q->active_queue));
|
||||
|
||||
nxt_cq_enqueue(&q->free_queue, v);
|
||||
|
||||
wi->deq++;
|
||||
wi->wait_deq--;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
worker_create(nxt_worker_info_t *wi, int id, nxt_thread_start_t start)
|
||||
{
|
||||
wi->id = id;
|
||||
|
||||
#if NXT_QTEST_USE_THREAD
|
||||
nxt_thread_link_t *link;
|
||||
|
||||
link = nxt_zalloc(sizeof(nxt_thread_link_t));
|
||||
|
||||
link->start = start;
|
||||
link->work.data = wi;
|
||||
|
||||
return nxt_thread_create(&wi->handle, link);
|
||||
|
||||
#else
|
||||
pid_t pid = fork();
|
||||
|
||||
if (pid == 0) {
|
||||
start(wi);
|
||||
exit(0);
|
||||
|
||||
} else {
|
||||
wi->pid = pid;
|
||||
}
|
||||
|
||||
return NXT_OK;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
worker_wait(nxt_worker_info_t *wi)
|
||||
{
|
||||
#if NXT_QTEST_USE_THREAD
|
||||
pthread_join(wi->handle, NULL);
|
||||
|
||||
#else
|
||||
waitpid(wi->pid, &wi->status, 0);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
int nxt_cdecl
|
||||
main(int argc, char **argv)
|
||||
{
|
||||
int i, k, id, verbose, objective, rk;
|
||||
char *a;
|
||||
size_t start, elapsed;
|
||||
double *stats, m, c;
|
||||
uint64_t total_ops;
|
||||
uintptr_t j;
|
||||
nxt_task_t task;
|
||||
nxt_thread_t *thr;
|
||||
nxt_worker_info_t *wi;
|
||||
double times[MAX_ITER], mopsec[MAX_ITER];
|
||||
|
||||
verbose = 0;
|
||||
objective = 0;
|
||||
|
||||
for (i = 1; i < argc; i++) {
|
||||
a = argv[i];
|
||||
|
||||
if (strcmp(a, "-v") == 0) {
|
||||
verbose++;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "-n") == 0 && (i + 1) < argc) {
|
||||
nops = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--enq") == 0 && (i + 1) < argc) {
|
||||
nprocs_enq = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--deq") == 0 && (i + 1) < argc) {
|
||||
nprocs_deq = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--wenq") == 0 && (i + 1) < argc) {
|
||||
nprocs_wenq = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--wdeq") == 0 && (i + 1) < argc) {
|
||||
nprocs_wdeq = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--ed") == 0 && (i + 1) < argc) {
|
||||
nprocs_enq_deq = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--cas") == 0 && (i + 1) < argc) {
|
||||
nprocs_cas = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--faa") == 0 && (i + 1) < argc) {
|
||||
nprocs_faa = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (strcmp(a, "--obj") == 0 && (i + 1) < argc) {
|
||||
objective = atoi(argv[++i]);
|
||||
continue;
|
||||
}
|
||||
|
||||
printf("unknown option %s", a);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (nxt_lib_start("ncq_test", argv, &environ) != NXT_OK) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
nprocs = nprocs_enq + nprocs_deq + nprocs_wenq + nprocs_wdeq
|
||||
+ nprocs_enq_deq + nprocs_cas + nprocs_faa;
|
||||
|
||||
if (nprocs == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
nxt_main_log.level = NXT_LOG_INFO;
|
||||
task.log = &nxt_main_log;
|
||||
|
||||
thr = nxt_thread();
|
||||
thr->task = &task;
|
||||
|
||||
pgq = mmap(NULL, sizeof(nxt_cq_t), PROT_READ | PROT_WRITE,
|
||||
MAP_ANON | MAP_SHARED, -1, 0);
|
||||
if (pgq == MAP_FAILED) {
|
||||
return 2;
|
||||
}
|
||||
|
||||
nxt_cq_init(&pgq->free_queue);
|
||||
nxt_cq_init(&pgq->active_queue);
|
||||
|
||||
for(i = 0; i < NXT_CQ_SIZE; i++) {
|
||||
nxt_cq_enqueue(&pgq->free_queue, i);
|
||||
}
|
||||
|
||||
if (verbose >= 1) {
|
||||
printf("number of workers: %d\n", (int) nprocs);
|
||||
printf("number of ops: %d\n", (int) nops);
|
||||
}
|
||||
|
||||
wi = mmap(NULL, nprocs * sizeof(nxt_worker_info_t), PROT_READ | PROT_WRITE,
|
||||
MAP_ANON | MAP_SHARED, -1, 0);
|
||||
if (wi == MAP_FAILED) {
|
||||
return 3;
|
||||
}
|
||||
|
||||
for (k = 0; k < MAX_ITER; k++) {
|
||||
nxt_memzero(wi, nprocs * sizeof(nxt_worker_info_t));
|
||||
|
||||
nxt_cq_init(&pgq->free_queue);
|
||||
nxt_cq_init(&pgq->active_queue);
|
||||
|
||||
for(i = 0; i < NXT_CQ_SIZE; i++) {
|
||||
nxt_cq_enqueue(&pgq->free_queue, i);
|
||||
}
|
||||
|
||||
start = elapsed_time(0);
|
||||
|
||||
id = 0;
|
||||
|
||||
for (j = 0; j < nprocs_enq; j++, id++) {
|
||||
worker_create(wi + id, id, enq_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs_deq; j++, id++) {
|
||||
worker_create(wi + id, id, deq_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs_wenq; j++, id++) {
|
||||
worker_create(wi + id, id, wenq_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs_wdeq; j++, id++) {
|
||||
worker_create(wi + id, id, wdeq_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs_enq_deq; j++, id++) {
|
||||
worker_create(wi + id, id, enq_deq_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs_cas; j++, id++) {
|
||||
worker_create(wi + id, id, cas_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs_faa; j++, id++) {
|
||||
worker_create(wi + id, id, faa_worker);
|
||||
}
|
||||
|
||||
for (j = 0; j < nprocs; j++) {
|
||||
worker_wait(wi + j);
|
||||
}
|
||||
|
||||
elapsed = elapsed_time(start);
|
||||
|
||||
for (j = 1; j < nprocs; j++) {
|
||||
wi[0].enq += wi[j].enq;
|
||||
wi[0].deq += wi[j].deq;
|
||||
wi[0].wait_enq += wi[j].wait_enq;
|
||||
wi[0].wait_deq += wi[j].wait_deq;
|
||||
wi[0].own_res += wi[j].own_res;
|
||||
wi[0].cas += wi[j].cas;
|
||||
wi[0].faa += wi[j].faa;
|
||||
}
|
||||
|
||||
total_ops = wi[0].enq + wi[0].deq + wi[0].cas + wi[0].faa;
|
||||
|
||||
if (total_ops == 0) {
|
||||
total_ops = nops;
|
||||
}
|
||||
|
||||
times[k] = elapsed / 1000.0;
|
||||
mopsec[k] = (double) total_ops / elapsed;
|
||||
|
||||
if (verbose >= 2) {
|
||||
printf("enq %10"PRIu64"\n", wi[0].enq);
|
||||
printf("deq %10"PRIu64"\n", wi[0].deq);
|
||||
printf("wait_enq %10"PRIu64"\n", wi[0].wait_enq);
|
||||
printf("wait_deq %10"PRIu64"\n", wi[0].wait_deq);
|
||||
printf("own_res %10"PRIu64"\n", wi[0].own_res);
|
||||
printf("cas %10"PRIu64"\n", wi[0].cas);
|
||||
printf("faa %10"PRIu64"\n", wi[0].faa);
|
||||
printf("total ops %10"PRIu64"\n", total_ops);
|
||||
printf("Mops/sec %13.2f\n", mopsec[k]);
|
||||
|
||||
printf("elapsed %10d us\n", (int) elapsed);
|
||||
printf("per op %10d ns\n", (int) ((1000 * elapsed) / total_ops));
|
||||
}
|
||||
|
||||
if (k >= STAT_ITER) {
|
||||
stats = (objective == 0) ? times : mopsec;
|
||||
|
||||
m = mean(stats + k - STAT_ITER, STAT_ITER);
|
||||
c = cov(stats + k - STAT_ITER, m, STAT_ITER);
|
||||
|
||||
if (verbose >= 1) {
|
||||
if (objective == 0) {
|
||||
printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
|
||||
"mean time %.2f ms; cov %.4f\n",
|
||||
(int) k + 1, times[k], mopsec[k], m, c);
|
||||
|
||||
} else {
|
||||
printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f; "
|
||||
"mean Mop/sec %.2f; cov %.4f\n",
|
||||
(int) k + 1, times[k], mopsec[k], m, c);
|
||||
}
|
||||
}
|
||||
|
||||
if (c < MIN_COV) {
|
||||
rk = k - STAT_ITER;
|
||||
|
||||
for (i = rk + 1; i <= k; i++) {
|
||||
if (fabs(stats[i] - m) < fabs(stats[rk] - m)) {
|
||||
rk = i;
|
||||
}
|
||||
}
|
||||
|
||||
printf("#%d %.2f ms; %.2f\n", rk, times[rk], mopsec[rk]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (verbose >= 1) {
|
||||
printf(" #%02d elapsed time: %.2f ms; Mops/sec %.2f\n",
|
||||
(int) k + 1, times[k], mopsec[k]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user