Mem pool cleanup introduced.
Used for connection mem pool cleanup, which can be used by buffers. Used for port mem pool to safely destroy linked process.
This commit is contained in:
@@ -10,6 +10,20 @@
|
||||
|
||||
#include <nxt_auto_config.h>
|
||||
|
||||
typedef struct nxt_port_s nxt_port_t;
|
||||
typedef struct nxt_task_s nxt_task_t;
|
||||
typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
|
||||
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
||||
typedef struct nxt_sig_event_s nxt_sig_event_t;
|
||||
typedef struct nxt_runtime_s nxt_runtime_t;
|
||||
|
||||
typedef struct nxt_thread_s nxt_thread_t;
|
||||
typedef struct nxt_event_engine_s nxt_event_engine_t;
|
||||
typedef struct nxt_log_s nxt_log_t;
|
||||
typedef struct nxt_thread_pool_s nxt_thread_pool_t;
|
||||
|
||||
typedef void (*nxt_work_handler_t)(nxt_task_t *task, void *obj, void *data);
|
||||
|
||||
#include <nxt_unix.h>
|
||||
#include <nxt_clang.h>
|
||||
#include <nxt_types.h>
|
||||
@@ -17,19 +31,8 @@
|
||||
#include <nxt_mp.h>
|
||||
#include <nxt_array.h>
|
||||
|
||||
typedef struct nxt_port_s nxt_port_t;
|
||||
typedef struct nxt_task_s nxt_task_t;
|
||||
typedef struct nxt_port_recv_msg_s nxt_port_recv_msg_t;
|
||||
typedef void (*nxt_port_handler_t)(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
||||
typedef struct nxt_sig_event_s nxt_sig_event_t;
|
||||
typedef struct nxt_runtime_s nxt_runtime_t;
|
||||
typedef uint16_t nxt_port_id_t;
|
||||
|
||||
typedef struct nxt_thread_s nxt_thread_t;
|
||||
typedef struct nxt_event_engine_s nxt_event_engine_t;
|
||||
typedef struct nxt_log_s nxt_log_t;
|
||||
typedef struct nxt_thread_pool_s nxt_thread_pool_t;
|
||||
|
||||
#include <nxt_queue.h>
|
||||
|
||||
#include <nxt_thread_id.h>
|
||||
|
||||
@@ -212,13 +212,13 @@ nxt_master_process_port_create(nxt_task_t *task, nxt_runtime_t *rt)
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_process_port_add(process, port);
|
||||
|
||||
ret = nxt_port_socket_init(task, port, 0);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
nxt_process_port_add(task, process, port);
|
||||
|
||||
nxt_runtime_port_add(rt, port);
|
||||
|
||||
/*
|
||||
@@ -380,12 +380,11 @@ nxt_master_create_worker_process(nxt_task_t *task, nxt_runtime_t *rt,
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
nxt_process_port_add(process, port);
|
||||
nxt_process_port_add(task, process, port);
|
||||
|
||||
ret = nxt_port_socket_init(task, port, 0);
|
||||
if (nxt_slow_path(ret != NXT_OK)) {
|
||||
nxt_mp_release(port->mem_pool, port);
|
||||
nxt_runtime_process_destroy(rt, process);
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
36
src/nxt_mp.c
36
src/nxt_mp.c
@@ -114,6 +114,8 @@ struct nxt_mp_s {
|
||||
nxt_tid_t tid;
|
||||
#endif
|
||||
|
||||
nxt_work_t *cleanup;
|
||||
|
||||
/* Lists of nxt_mp_page_t. */
|
||||
nxt_queue_t free_pages;
|
||||
nxt_queue_t nget_pages;
|
||||
@@ -283,6 +285,7 @@ void
|
||||
nxt_mp_destroy(nxt_mp_t *mp)
|
||||
{
|
||||
void *p;
|
||||
nxt_work_t *work, *next_work;
|
||||
nxt_mp_block_t *block;
|
||||
nxt_rbtree_node_t *node, *next;
|
||||
|
||||
@@ -290,6 +293,15 @@ nxt_mp_destroy(nxt_mp_t *mp)
|
||||
|
||||
nxt_mp_thread_assert(mp);
|
||||
|
||||
while (mp->cleanup != NULL) {
|
||||
work = mp->cleanup;
|
||||
next_work = work->next;
|
||||
|
||||
work->handler(work->task, work->obj, work->data);
|
||||
|
||||
mp->cleanup = next_work;
|
||||
}
|
||||
|
||||
next = nxt_rbtree_root(&mp->blocks);
|
||||
|
||||
while (next != nxt_rbtree_sentinel(&mp->blocks)) {
|
||||
@@ -1022,3 +1034,27 @@ nxt_mp_zget(nxt_mp_t *mp, size_t size)
|
||||
|
||||
return p;
|
||||
}
|
||||
|
||||
|
||||
nxt_int_t
|
||||
nxt_mp_cleanup(nxt_mp_t *mp, nxt_work_handler_t handler,
|
||||
nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_work_t *work;
|
||||
|
||||
work = nxt_mp_get(mp, sizeof(nxt_work_t));
|
||||
|
||||
if (nxt_slow_path(work == NULL)) {
|
||||
return NXT_ERROR;
|
||||
}
|
||||
|
||||
work->next = mp->cleanup;
|
||||
work->handler = handler;
|
||||
work->task = task;
|
||||
work->obj = obj;
|
||||
work->data = data;
|
||||
|
||||
mp->cleanup = work;
|
||||
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
@@ -109,6 +109,10 @@ NXT_EXPORT void *nxt_mp_zget(nxt_mp_t *mp, size_t size)
|
||||
NXT_MALLOC_LIKE;
|
||||
|
||||
|
||||
NXT_EXPORT nxt_int_t nxt_mp_cleanup(nxt_mp_t *mp, nxt_work_handler_t handler,
|
||||
nxt_task_t *task, void *obj, void *data);
|
||||
|
||||
|
||||
NXT_EXPORT void nxt_mp_thread_adopt(nxt_mp_t *mp);
|
||||
|
||||
#endif /* _NXT_MP_H_INCLUDED_ */
|
||||
|
||||
@@ -245,7 +245,7 @@ nxt_port_new_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||
return;
|
||||
}
|
||||
|
||||
nxt_process_port_add(process, port);
|
||||
nxt_process_port_add(task, process, port);
|
||||
|
||||
port->pair[0] = -1;
|
||||
port->pair[1] = msg->fd;
|
||||
|
||||
@@ -563,11 +563,31 @@ nxt_user_cred_set(nxt_task_t *task, nxt_user_cred_t *uc)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_process_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_runtime_t *rt;
|
||||
nxt_process_t *process;
|
||||
|
||||
process = obj;
|
||||
rt = data;
|
||||
|
||||
process->port_cleanups--;
|
||||
|
||||
if (process->port_cleanups == 0) {
|
||||
nxt_runtime_process_destroy(rt, process);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
nxt_process_port_add(nxt_process_t *process, nxt_port_t *port)
|
||||
nxt_process_port_add(nxt_task_t *task, nxt_process_t *process, nxt_port_t *port)
|
||||
{
|
||||
port->process = process;
|
||||
nxt_queue_insert_tail(&process->ports, &port->link);
|
||||
|
||||
nxt_mp_cleanup(port->mem_pool, nxt_process_port_mp_cleanup, task, process,
|
||||
task->thread->runtime);
|
||||
process->port_cleanups++;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -57,6 +57,7 @@ typedef struct {
|
||||
nxt_pid_t pid;
|
||||
nxt_queue_t ports; /* of nxt_port_t */
|
||||
nxt_bool_t ready;
|
||||
nxt_uint_t port_cleanups;
|
||||
|
||||
nxt_process_init_t *init;
|
||||
|
||||
@@ -88,7 +89,8 @@ NXT_EXPORT void nxt_process_arguments(nxt_task_t *task, char **orig_argv,
|
||||
#define nxt_process_port_first(process) \
|
||||
nxt_queue_link_data(nxt_queue_first(&process->ports), nxt_port_t, link)
|
||||
|
||||
NXT_EXPORT void nxt_process_port_add(nxt_process_t *process, nxt_port_t *port);
|
||||
NXT_EXPORT void nxt_process_port_add(nxt_task_t *task, nxt_process_t *process,
|
||||
nxt_port_t *port);
|
||||
|
||||
#define nxt_process_port_each(process, port) \
|
||||
nxt_queue_each(port, &process->ports, nxt_port_t, link)
|
||||
|
||||
@@ -27,7 +27,6 @@ struct nxt_start_worker_s {
|
||||
nxt_app_t *app;
|
||||
nxt_req_conn_link_t *rc;
|
||||
nxt_mp_t *mem_pool;
|
||||
void *joint;
|
||||
|
||||
nxt_work_t work;
|
||||
};
|
||||
@@ -161,16 +160,12 @@ static void
|
||||
nxt_router_sw_release(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_start_worker_t *sw;
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
|
||||
sw = obj;
|
||||
joint = sw->joint;
|
||||
|
||||
nxt_debug(task, "sw #%uxD release", sw->stream);
|
||||
|
||||
if (nxt_mp_release(sw->mem_pool, sw) == 0) {
|
||||
nxt_router_conf_release(task, joint);
|
||||
}
|
||||
nxt_mp_release(sw->mem_pool, sw);
|
||||
}
|
||||
|
||||
|
||||
@@ -1861,7 +1856,6 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
||||
nxt_app_t *app;
|
||||
nxt_port_t *port;
|
||||
nxt_work_t *work;
|
||||
nxt_process_t *process;
|
||||
nxt_queue_link_t *lnk;
|
||||
nxt_req_conn_link_t *rc;
|
||||
|
||||
@@ -1913,14 +1907,9 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
||||
nxt_router_app_free(app);
|
||||
|
||||
port->app = NULL;
|
||||
process = port->process;
|
||||
|
||||
nxt_port_release(port);
|
||||
|
||||
if (nxt_queue_is_empty(&process->ports)) {
|
||||
nxt_runtime_process_destroy(task->thread->runtime, process);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -2031,7 +2020,6 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
||||
sw->app = app;
|
||||
sw->rc = rc;
|
||||
sw->mem_pool = c->mem_pool;
|
||||
sw->joint = c->listen->socket.data;
|
||||
|
||||
sw->work.handler = nxt_router_send_sw_request;
|
||||
sw->work.task = task;
|
||||
@@ -2349,6 +2337,17 @@ nxt_router_conn_close(nxt_task_t *task, void *obj, void *data)
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
nxt_socket_conf_joint_t *joint;
|
||||
|
||||
joint = obj;
|
||||
|
||||
nxt_router_conf_release(task, joint);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
||||
{
|
||||
@@ -2380,9 +2379,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
||||
|
||||
task = &task->thread->engine->task;
|
||||
|
||||
if (nxt_mp_release(c->mem_pool, c) == 0) {
|
||||
nxt_router_conf_release(task, joint);
|
||||
}
|
||||
nxt_mp_cleanup(c->mem_pool, nxt_router_conn_mp_cleanup, task, joint, NULL);
|
||||
|
||||
nxt_mp_release(c->mem_pool, c);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -1491,6 +1491,8 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
||||
void
|
||||
nxt_runtime_process_destroy(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
{
|
||||
nxt_assert(process->port_cleanups == 0);
|
||||
|
||||
nxt_port_mmaps_destroy(process->incoming, 1);
|
||||
nxt_port_mmaps_destroy(process->outgoing, 1);
|
||||
|
||||
@@ -1657,6 +1659,10 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
case NXT_OK:
|
||||
rt->nprocesses--;
|
||||
|
||||
if (process->port_cleanups == 0) {
|
||||
nxt_runtime_process_destroy(rt, process);
|
||||
}
|
||||
|
||||
nxt_process_port_each(process, port) {
|
||||
|
||||
nxt_runtime_port_remove(rt, port);
|
||||
@@ -1665,10 +1671,6 @@ nxt_runtime_process_remove(nxt_runtime_t *rt, nxt_process_t *process)
|
||||
|
||||
} nxt_process_port_loop;
|
||||
|
||||
if (nxt_queue_is_empty(&process->ports)) {
|
||||
nxt_runtime_process_destroy(rt, process);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user