Incoming and outgoing port_mmap arrays are protected with mutexes.
This commit is contained in:
@@ -25,9 +25,13 @@ typedef struct nxt_sig_event_s nxt_sig_event_t;
|
|||||||
typedef struct nxt_runtime_s nxt_runtime_t;
|
typedef struct nxt_runtime_s nxt_runtime_t;
|
||||||
typedef uint16_t nxt_port_id_t;
|
typedef uint16_t nxt_port_id_t;
|
||||||
|
|
||||||
|
typedef struct nxt_thread_s nxt_thread_t;
|
||||||
|
typedef struct nxt_event_engine_s nxt_event_engine_t;
|
||||||
|
typedef struct nxt_log_s nxt_log_t;
|
||||||
|
typedef struct nxt_thread_pool_s nxt_thread_pool_t;
|
||||||
|
|
||||||
#include <nxt_queue.h>
|
#include <nxt_queue.h>
|
||||||
|
|
||||||
typedef struct nxt_thread_s nxt_thread_t;
|
|
||||||
#include <nxt_thread_id.h>
|
#include <nxt_thread_id.h>
|
||||||
|
|
||||||
#include <nxt_errno.h>
|
#include <nxt_errno.h>
|
||||||
@@ -36,16 +40,19 @@ typedef struct nxt_thread_s nxt_thread_t;
|
|||||||
#include <nxt_random.h>
|
#include <nxt_random.h>
|
||||||
#include <nxt_string.h>
|
#include <nxt_string.h>
|
||||||
#include <nxt_lvlhsh.h>
|
#include <nxt_lvlhsh.h>
|
||||||
|
#include <nxt_atomic.h>
|
||||||
|
#include <nxt_spinlock.h>
|
||||||
|
#include <nxt_work_queue.h>
|
||||||
|
#include <nxt_log.h>
|
||||||
|
#include <nxt_thread_time.h>
|
||||||
|
#include <nxt_rbtree.h>
|
||||||
|
#include <nxt_timer.h>
|
||||||
|
#include <nxt_fiber.h>
|
||||||
|
#include <nxt_thread.h>
|
||||||
#include <nxt_process.h>
|
#include <nxt_process.h>
|
||||||
#include <nxt_utf8.h>
|
#include <nxt_utf8.h>
|
||||||
#include <nxt_file_name.h>
|
#include <nxt_file_name.h>
|
||||||
|
|
||||||
typedef struct nxt_log_s nxt_log_t;
|
|
||||||
#include <nxt_log.h>
|
|
||||||
|
|
||||||
|
|
||||||
#include <nxt_atomic.h>
|
|
||||||
#include <nxt_rbtree.h>
|
|
||||||
#include <nxt_sprintf.h>
|
#include <nxt_sprintf.h>
|
||||||
#include <nxt_parse.h>
|
#include <nxt_parse.h>
|
||||||
|
|
||||||
@@ -59,11 +66,8 @@ typedef struct nxt_sockaddr_s nxt_sockaddr_t;
|
|||||||
#include <nxt_malloc.h>
|
#include <nxt_malloc.h>
|
||||||
#include <nxt_mem_map.h>
|
#include <nxt_mem_map.h>
|
||||||
#include <nxt_socket.h>
|
#include <nxt_socket.h>
|
||||||
#include <nxt_spinlock.h>
|
|
||||||
#include <nxt_dyld.h>
|
#include <nxt_dyld.h>
|
||||||
|
|
||||||
#include <nxt_work_queue.h>
|
|
||||||
|
|
||||||
|
|
||||||
typedef void *(*nxt_mem_proto_alloc_t)(void *pool, size_t size);
|
typedef void *(*nxt_mem_proto_alloc_t)(void *pool, size_t size);
|
||||||
typedef void (*nxt_mem_proto_free_t)(void *pool, void *p);
|
typedef void (*nxt_mem_proto_free_t)(void *pool, void *p);
|
||||||
@@ -75,15 +79,6 @@ typedef struct {
|
|||||||
|
|
||||||
|
|
||||||
#include <nxt_mem_zone.h>
|
#include <nxt_mem_zone.h>
|
||||||
#include <nxt_thread_time.h>
|
|
||||||
|
|
||||||
typedef struct nxt_event_engine_s nxt_event_engine_t;
|
|
||||||
#include <nxt_timer.h>
|
|
||||||
#include <nxt_fiber.h>
|
|
||||||
|
|
||||||
typedef struct nxt_thread_pool_s nxt_thread_pool_t;
|
|
||||||
#include <nxt_thread.h>
|
|
||||||
|
|
||||||
#include <nxt_signal.h>
|
#include <nxt_signal.h>
|
||||||
#if (NXT_THREADS)
|
#if (NXT_THREADS)
|
||||||
#include <nxt_semaphore.h>
|
#include <nxt_semaphore.h>
|
||||||
|
|||||||
@@ -96,6 +96,8 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_thread_mutex_lock(&process->incoming_mutex);
|
||||||
|
|
||||||
if (process->incoming == NULL) {
|
if (process->incoming == NULL) {
|
||||||
process->incoming = nxt_array_create(process->mem_pool, 1,
|
process->incoming = nxt_array_create(process->mem_pool, 1,
|
||||||
sizeof(nxt_port_mmap_t));
|
sizeof(nxt_port_mmap_t));
|
||||||
@@ -135,6 +137,8 @@ nxt_port_incoming_port_mmap(nxt_task_t *task, nxt_process_t *process,
|
|||||||
|
|
||||||
fail:
|
fail:
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&process->incoming_mutex);
|
||||||
|
|
||||||
return hdr;
|
return hdr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -309,6 +313,8 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
|||||||
port_mmap = NULL;
|
port_mmap = NULL;
|
||||||
hdr = NULL;
|
hdr = NULL;
|
||||||
|
|
||||||
|
nxt_thread_mutex_lock(&process->outgoing_mutex);
|
||||||
|
|
||||||
if (process->outgoing == NULL) {
|
if (process->outgoing == NULL) {
|
||||||
hdr = nxt_port_new_port_mmap(task, process, port);
|
hdr = nxt_port_new_port_mmap(task, process, port);
|
||||||
|
|
||||||
@@ -336,6 +342,8 @@ nxt_port_mmap_get(nxt_task_t *task, nxt_port_t *port, nxt_chunk_id_t *c,
|
|||||||
|
|
||||||
unlock_return:
|
unlock_return:
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&process->outgoing_mutex);
|
||||||
|
|
||||||
return hdr;
|
return hdr;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -355,6 +363,8 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
|||||||
|
|
||||||
hdr = NULL;
|
hdr = NULL;
|
||||||
|
|
||||||
|
nxt_thread_mutex_lock(&process->incoming_mutex);
|
||||||
|
|
||||||
incoming = process->incoming;
|
incoming = process->incoming;
|
||||||
|
|
||||||
if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
|
if (nxt_fast_path(incoming != NULL && incoming->nelts > id)) {
|
||||||
@@ -365,6 +375,8 @@ nxt_port_get_port_incoming_mmap(nxt_task_t *task, nxt_pid_t spid, uint32_t id)
|
|||||||
"failed to get incoming mmap #%d for process %PI", id, spid);
|
"failed to get incoming mmap #%d for process %PI", id, spid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_thread_mutex_unlock(&process->incoming_mutex);
|
||||||
|
|
||||||
return hdr;
|
return hdr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -56,8 +56,10 @@ typedef struct {
|
|||||||
nxt_port_id_t last_port_id;
|
nxt_port_id_t last_port_id;
|
||||||
|
|
||||||
nxt_process_init_t *init;
|
nxt_process_init_t *init;
|
||||||
nxt_array_t *incoming; /* of nxt_mmap_t */
|
nxt_thread_mutex_t incoming_mutex;
|
||||||
nxt_array_t *outgoing; /* of nxt_mmap_t */
|
nxt_array_t *incoming; /* of nxt_port_mmap_t */
|
||||||
|
nxt_thread_mutex_t outgoing_mutex;
|
||||||
|
nxt_array_t *outgoing; /* of nxt_port_mmap_t */
|
||||||
|
|
||||||
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
|
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
|
||||||
} nxt_process_t;
|
} nxt_process_t;
|
||||||
|
|||||||
@@ -1473,6 +1473,9 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
|||||||
/* TODO each process should have it's own mem_pool for ports allocation */
|
/* TODO each process should have it's own mem_pool for ports allocation */
|
||||||
process->mem_pool = rt->mem_pool;
|
process->mem_pool = rt->mem_pool;
|
||||||
|
|
||||||
|
nxt_thread_mutex_create(&process->incoming_mutex);
|
||||||
|
nxt_thread_mutex_create(&process->outgoing_mutex);
|
||||||
|
|
||||||
return process;
|
return process;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user