Memory pool thread safety checks in DEBUG build and usage fixes.
This commit is contained in:
56
src/nxt_mp.c
56
src/nxt_mp.c
@@ -109,6 +109,11 @@ struct nxt_mp_s {
|
||||
uint32_t cluster_size;
|
||||
uint32_t retain;
|
||||
|
||||
#if (NXT_DEBUG)
|
||||
nxt_pid_t pid;
|
||||
nxt_tid_t tid;
|
||||
#endif
|
||||
|
||||
/* Lists of nxt_mp_page_t. */
|
||||
nxt_queue_t free_pages;
|
||||
nxt_queue_t nget_pages;
|
||||
@@ -186,6 +191,49 @@ nxt_lg2(uint64_t v)
|
||||
#endif
|
||||
|
||||
|
||||
#if (NXT_DEBUG)
|
||||
|
||||
nxt_inline void
|
||||
nxt_mp_thread_assert(nxt_mp_t *mp)
|
||||
{
|
||||
nxt_tid_t tid;
|
||||
nxt_thread_t *thread;
|
||||
|
||||
thread = nxt_thread();
|
||||
tid = nxt_thread_tid(thread);
|
||||
|
||||
if (nxt_fast_path(mp->tid == tid)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(nxt_pid != mp->pid)) {
|
||||
mp->pid = nxt_pid;
|
||||
mp->tid = tid;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_log_alert(thread->log, "mem_pool locked by thread %PT", mp->tid);
|
||||
nxt_abort();
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
#define nxt_mp_thread_assert(mp)
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
void
|
||||
nxt_mp_thread_adopt(nxt_mp_t *mp)
|
||||
{
|
||||
#if (NXT_DEBUG)
|
||||
mp->pid = nxt_pid;
|
||||
mp->tid = nxt_thread_tid(NULL);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
nxt_mp_t *
|
||||
nxt_mp_create(size_t cluster_size, size_t page_alignment, size_t page_size,
|
||||
size_t min_chunk_size)
|
||||
@@ -417,6 +465,8 @@ nxt_mp_alloc_small(nxt_mp_t *mp, size_t size)
|
||||
nxt_mp_page_t *page;
|
||||
nxt_queue_link_t *link;
|
||||
|
||||
nxt_mp_thread_assert(mp);
|
||||
|
||||
p = NULL;
|
||||
|
||||
if (size <= mp->page_size / 2) {
|
||||
@@ -489,6 +539,8 @@ nxt_mp_get_small(nxt_mp_t *mp, nxt_queue_t *pages, size_t size)
|
||||
nxt_mp_page_t *page;
|
||||
nxt_queue_link_t *link, *next;
|
||||
|
||||
nxt_mp_thread_assert(mp);
|
||||
|
||||
for (link = nxt_queue_first(pages);
|
||||
link != nxt_queue_tail(pages);
|
||||
link = next)
|
||||
@@ -604,6 +656,8 @@ nxt_mp_alloc_large(nxt_mp_t *mp, size_t alignment, size_t size)
|
||||
uint8_t type;
|
||||
nxt_mp_block_t *block;
|
||||
|
||||
nxt_mp_thread_assert(mp);
|
||||
|
||||
/* Allocation must be less than 4G. */
|
||||
if (nxt_slow_path(size >= 0xFFFFFFFF)) {
|
||||
return NULL;
|
||||
@@ -664,6 +718,8 @@ nxt_mp_free(nxt_mp_t *mp, void *p)
|
||||
nxt_thread_t *thread;
|
||||
nxt_mp_block_t *block;
|
||||
|
||||
nxt_mp_thread_assert(mp);
|
||||
|
||||
nxt_debug_alloc("mp free %p", p);
|
||||
|
||||
block = nxt_mp_find_block(&mp->blocks, p);
|
||||
|
||||
@@ -109,4 +109,6 @@ NXT_EXPORT void *nxt_mp_zget(nxt_mp_t *mp, size_t size)
|
||||
NXT_MALLOC_LIKE;
|
||||
|
||||
|
||||
NXT_EXPORT void nxt_mp_thread_adopt(nxt_mp_t *mp);
|
||||
|
||||
#endif /* _NXT_MP_H_INCLUDED_ */
|
||||
|
||||
@@ -103,7 +103,8 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
nxt_thread_mutex_lock(&process->incoming_mutex);
|
||||
|
||||
if (process->incoming == NULL) {
|
||||
process->incoming = nxt_array_create(process->mem_pool, 1,
|
||||
process->incoming_mp = nxt_mp_create(1024, 128, 256, 32);
|
||||
process->incoming = nxt_array_create(process->incoming_mp, 1,
|
||||
sizeof(nxt_port_mmap_t));
|
||||
}
|
||||
|
||||
@@ -113,6 +114,8 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
goto fail;
|
||||
}
|
||||
|
||||
nxt_mp_thread_adopt(process->incoming_mp);
|
||||
|
||||
port_mmap = nxt_array_zero_add(process->incoming);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN, "failed to add mmap to incoming array");
|
||||
@@ -192,7 +195,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
port_mmap = NULL;
|
||||
|
||||
if (process->outgoing == NULL) {
|
||||
process->outgoing = nxt_array_create(process->mem_pool, 1,
|
||||
process->outgoing_mp = nxt_mp_create(1024, 128, 256, 32);
|
||||
process->outgoing = nxt_array_create(process->outgoing_mp, 1,
|
||||
sizeof(nxt_port_mmap_t));
|
||||
}
|
||||
|
||||
@@ -202,6 +206,8 @@ nxt_port_new_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
nxt_mp_thread_adopt(process->outgoing_mp);
|
||||
|
||||
port_mmap = nxt_array_zero_add(process->outgoing);
|
||||
if (nxt_slow_path(port_mmap == NULL)) {
|
||||
nxt_log(task, NXT_LOG_WARN,
|
||||
|
||||
@@ -57,8 +57,10 @@ typedef struct {
|
||||
|
||||
nxt_process_init_t *init;
|
||||
nxt_thread_mutex_t incoming_mutex;
|
||||
nxt_mp_t *incoming_mp;
|
||||
nxt_array_t *incoming; /* of nxt_port_mmap_t */
|
||||
nxt_thread_mutex_t outgoing_mutex;
|
||||
nxt_mp_t *outgoing_mp;
|
||||
nxt_array_t *outgoing; /* of nxt_port_mmap_t */
|
||||
|
||||
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
|
||||
|
||||
@@ -926,6 +926,8 @@ nxt_router_thread_start(void *data)
|
||||
thread->task = &engine->task;
|
||||
thread->fiber = &engine->fibers->fiber;
|
||||
|
||||
nxt_mp_thread_adopt(engine->port->mem_pool);
|
||||
|
||||
engine->port->socket.task = task;
|
||||
nxt_port_create(task, engine->port, nxt_router_app_port_handlers);
|
||||
|
||||
|
||||
@@ -1522,8 +1522,6 @@ nxt_runtime_process_find(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||
lhq.key.start = (u_char *) &pid;
|
||||
lhq.proto = &lvlhsh_processes_proto;
|
||||
|
||||
/* TODO lock processes */
|
||||
|
||||
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
|
||||
nxt_thread_log_debug("process %PI found", pid);
|
||||
return lhq.value;
|
||||
@@ -1546,8 +1544,6 @@ nxt_runtime_process_get(nxt_runtime_t *rt, nxt_pid_t pid)
|
||||
lhq.key.start = (u_char *) &pid;
|
||||
lhq.proto = &lvlhsh_processes_proto;
|
||||
|
||||
/* TODO lock processes */
|
||||
|
||||
if (nxt_lvlhsh_find(&rt->processes, &lhq) == NXT_OK) {
|
||||
nxt_thread_log_debug("process %PI found", pid);
|
||||
return lhq.value;
|
||||
@@ -1599,8 +1595,6 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
lhq.value = process;
|
||||
lhq.pool = rt->mem_pool;
|
||||
|
||||
/* TODO lock processes */
|
||||
|
||||
switch (nxt_lvlhsh_insert(&rt->processes, &lhq)) {
|
||||
|
||||
case NXT_OK:
|
||||
@@ -1627,7 +1621,9 @@ nxt_runtime_process_add(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
void
|
||||
nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
{
|
||||
uint32_t i;
|
||||
nxt_port_t *port;
|
||||
nxt_port_mmap_t *port_mmap;
|
||||
nxt_lvlhsh_query_t lhq;
|
||||
|
||||
lhq.key_hash = nxt_murmur_hash2(&process->pid, sizeof(process->pid));
|
||||
@@ -1638,8 +1634,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
lhq.value = process;
|
||||
lhq.pool = rt->mem_pool;
|
||||
|
||||
/* TODO lock processes */
|
||||
|
||||
switch (nxt_lvlhsh_delete(&rt->processes, &lhq)) {
|
||||
|
||||
case NXT_OK:
|
||||
@@ -1651,6 +1645,34 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
|
||||
} nxt_process_port_loop;
|
||||
|
||||
if (process->incoming) {
|
||||
nxt_mp_thread_adopt(process->incoming_mp);
|
||||
|
||||
port_mmap = process->incoming->elts;
|
||||
|
||||
for (i = 0; i < process->incoming->nelts; i++) {
|
||||
nxt_port_mmap_destroy(port_mmap);
|
||||
}
|
||||
|
||||
nxt_thread_mutex_destroy(&process->incoming_mutex);
|
||||
|
||||
nxt_mp_destroy(process->incoming_mp);
|
||||
}
|
||||
|
||||
if (process->outgoing) {
|
||||
nxt_mp_thread_adopt(process->outgoing_mp);
|
||||
|
||||
port_mmap = process->outgoing->elts;
|
||||
|
||||
for (i = 0; i < process->outgoing->nelts; i++) {
|
||||
nxt_port_mmap_destroy(port_mmap);
|
||||
}
|
||||
|
||||
nxt_thread_mutex_destroy(&process->outgoing_mutex);
|
||||
|
||||
nxt_mp_destroy(process->outgoing_mp);
|
||||
}
|
||||
|
||||
nxt_mp_free(rt->mem_pool, process);
|
||||
break;
|
||||
|
||||
@@ -1681,8 +1703,6 @@ nxt_runtime_port_first(nxt_runtime_t *rt, nxt_lvlhsh_each_t *lhe)
|
||||
void
|
||||
nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
|
||||
{
|
||||
/* TODO lock ports */
|
||||
|
||||
nxt_port_hash_add(&rt->ports, rt->mem_pool, port);
|
||||
}
|
||||
|
||||
@@ -1690,8 +1710,6 @@ nxt_runtime_port_add(nxt_runtime_t *rt, nxt_port_t *port)
|
||||
void
|
||||
nxt_runtime_port_remove(nxt_runtime_t *rt, nxt_port_t *port)
|
||||
{
|
||||
/* TODO lock ports */
|
||||
|
||||
nxt_port_hash_remove(&rt->ports, rt->mem_pool, port);
|
||||
|
||||
if (port->pair[0] != -1) {
|
||||
@@ -1714,7 +1732,5 @@ nxt_port_t *
|
||||
nxt_runtime_port_find(nxt_runtime_t *rt, nxt_pid_t pid,
|
||||
nxt_port_id_t port_id)
|
||||
{
|
||||
/* TODO lock ports */
|
||||
|
||||
return nxt_port_hash_find(&rt->ports, pid, port_id);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user