Work queue thread safety checks for DEBUG build.

This commit is contained in:
Max Romanov
2017-07-07 16:01:34 +03:00
parent 5529e9f0d1
commit e15b975e9d
3 changed files with 44 additions and 7 deletions

View File

@@ -235,6 +235,7 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
ssize_t n; ssize_t n;
nxt_port_t *port; nxt_port_t *port;
struct iovec iov[NXT_IOBUF_MAX * 10]; struct iovec iov[NXT_IOBUF_MAX * 10];
nxt_work_queue_t *wq;
nxt_queue_link_t *link; nxt_queue_link_t *link;
nxt_port_method_t m; nxt_port_method_t m;
nxt_port_send_msg_t *msg; nxt_port_send_msg_t *msg;
@@ -301,22 +302,19 @@ nxt_port_write_handler(nxt_task_t *task, void *obj, void *data)
goto fail; goto fail;
} }
wq = &task->thread->engine->fast_work_queue;
if (msg->buf != plain_buf) { if (msg->buf != plain_buf) {
/* /*
* Complete crafted mmap_msgs buf and restore msg->buf * Complete crafted mmap_msgs buf and restore msg->buf
* for regular completion call. * for regular completion call.
*/ */
nxt_port_mmap_completion(task, nxt_port_mmap_completion(task, wq, msg->buf);
port->socket.write_work_queue,
msg->buf);
msg->buf = plain_buf; msg->buf = plain_buf;
} }
msg->buf = nxt_sendbuf_completion(task, msg->buf = nxt_sendbuf_completion(task, wq, msg->buf, plain_size);
port->socket.write_work_queue,
msg->buf,
plain_size);
if (msg->buf != NULL) { if (msg->buf != NULL) {
/* /*

View File

@@ -32,6 +32,39 @@ static void nxt_work_queue_allocate(nxt_work_queue_cache_t *cache);
static nxt_uint_t nxt_work_queue_bucket_items = 409; static nxt_uint_t nxt_work_queue_bucket_items = 409;
#if (NXT_DEBUG)
nxt_inline void
nxt_work_queue_thread_assert(nxt_work_queue_t *wq)
{
nxt_tid_t tid;
nxt_thread_t *thread;
thread = nxt_thread();
tid = nxt_thread_tid(thread);
if (nxt_fast_path(wq->tid == tid)) {
return;
}
if (nxt_slow_path(nxt_pid != wq->pid)) {
wq->pid = nxt_pid;
wq->tid = tid;
return;
}
nxt_log_alert(thread->log, "work queue locked by thread %PT", wq->tid);
nxt_abort();
}
#else
#define nxt_work_queue_thread_assert(wq)
#endif
void void
nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size) nxt_work_queue_cache_create(nxt_work_queue_cache_t *cache, size_t chunk_size)
{ {
@@ -109,6 +142,8 @@ nxt_work_queue_add(nxt_work_queue_t *wq, nxt_work_handler_t handler,
{ {
nxt_work_t *work; nxt_work_t *work;
nxt_work_queue_thread_assert(wq);
for ( ;; ) { for ( ;; ) {
work = wq->cache->next; work = wq->cache->next;
@@ -144,6 +179,8 @@ nxt_work_queue_pop(nxt_work_queue_t *wq, nxt_task_t **task, void **obj,
{ {
nxt_work_t *work; nxt_work_t *work;
nxt_work_queue_thread_assert(wq);
work = wq->head; work = wq->head;
wq->head = work->next; wq->head = work->next;

View File

@@ -68,6 +68,8 @@ struct nxt_work_queue_s {
nxt_work_queue_cache_t *cache; nxt_work_queue_cache_t *cache;
#if (NXT_DEBUG) #if (NXT_DEBUG)
const char *name; const char *name;
int32_t pid;
nxt_tid_t tid;
#endif #endif
}; };