From 414d508e04d26ebef0e3e1ba4ed518b11d3af1a0 Mon Sep 17 00:00:00 2001 From: Max Romanov Date: Wed, 4 Oct 2017 14:58:13 +0300 Subject: [PATCH] Using engine memiory pool for port write allocations. To allow use port from different threads, the first step is to avoid using port's memory pool for temporary allocations required to send data through the port. Including but not limited by: - buffers for data; - send message structures; - new mmap fd notifications; It is still safe to use port memory pool for incoming buffers allocations because recieve operation bound to single thread. --- src/nxt_controller.c | 2 +- src/nxt_main_process.c | 6 ++++-- src/nxt_port.c | 6 ++++-- src/nxt_port_memory.c | 4 ++-- src/nxt_port_socket.c | 5 +++-- src/nxt_router.c | 8 +------- src/nxt_runtime.c | 1 + 7 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/nxt_controller.c b/src/nxt_controller.c index d7a3d1a7..4a0d065b 100644 --- a/src/nxt_controller.c +++ b/src/nxt_controller.c @@ -1032,7 +1032,7 @@ nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf) size = nxt_conf_json_length(conf, NULL); - b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0); + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); if (nxt_fast_path(b != NULL)) { b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); diff --git a/src/nxt_main_process.c b/src/nxt_main_process.c index 7de643ca..90c06a05 100644 --- a/src/nxt_main_process.c +++ b/src/nxt_main_process.c @@ -769,7 +769,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid) port = nxt_process_port_first(process); - buf = nxt_buf_mem_alloc(port->mem_pool, sizeof(pid), 0); + buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(pid)); buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, @@ -836,7 +837,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg) nxt_log(task, NXT_LOG_CRIT, "%*s", size, ls.start); - out = nxt_buf_mem_alloc(port->mem_pool, size + 1, 0); + out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + size + 1); if (nxt_slow_path(out == NULL)) { return; } diff --git a/src/nxt_port.c b/src/nxt_port.c index bec08477..d7f42012 100644 --- a/src/nxt_port.c +++ b/src/nxt_port.c @@ -195,7 +195,8 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port, nxt_buf_t *b; nxt_port_msg_new_port_t *msg; - b = nxt_buf_mem_ts_alloc(task, port->mem_pool, sizeof(nxt_port_data_t)); + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(nxt_port_data_t)); if (nxt_slow_path(b == NULL)) { return NXT_ERROR; } @@ -347,7 +348,8 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot, port = nxt_process_port_first(process); - b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, + sizeof(nxt_port_data_t)); if (nxt_slow_path(b == NULL)) { continue; } diff --git a/src/nxt_port_memory.c b/src/nxt_port_memory.c index ec4bbdf7..677421e3 100644 --- a/src/nxt_port_memory.c +++ b/src/nxt_port_memory.c @@ -426,7 +426,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) nxt_debug(task, "request %z bytes shm buffer", size); - b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0); if (nxt_slow_path(b == NULL)) { return NULL; } @@ -436,7 +436,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size) hdr = nxt_port_mmap_get(task, port, &c, size); if (nxt_slow_path(hdr == NULL)) { - nxt_mp_release(port->mem_pool, b); + nxt_mp_release(task->thread->engine->mem_pool, b); return NULL; } diff --git a/src/nxt_port_socket.c b/src/nxt_port_socket.c index 3f7ad954..75706459 100644 --- a/src/nxt_port_socket.c +++ b/src/nxt_port_socket.c @@ -194,7 +194,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, } nxt_queue_loop; - msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t)); + msg = nxt_mp_retain(task->thread->engine->mem_pool, + sizeof(nxt_port_send_msg_t)); if (nxt_slow_path(msg == NULL)) { return NXT_ERROR; } @@ -215,7 +216,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type, msg->work.data = task->thread->engine; msg->engine = task->thread->engine; - msg->mem_pool = port->mem_pool; + msg->mem_pool = msg->engine->mem_pool; msg->port_msg.stream = stream; msg->port_msg.pid = nxt_pid; diff --git a/src/nxt_router.c b/src/nxt_router.c index f923d72c..df3caf82 100644 --- a/src/nxt_router.c +++ b/src/nxt_router.c @@ -2498,7 +2498,7 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data) size = app->name.length + 1 + app->conf.length; - b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0); + b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size); nxt_buf_cpystr(b, &app->name); *b->mem.free++ = '\0'; @@ -2964,7 +2964,6 @@ static void nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_app_parse_ctx_t *ap) { - nxt_mp_t *port_mp; nxt_int_t res; nxt_port_t *port; nxt_event_engine_t *engine; @@ -3012,13 +3011,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); - port_mp = port->mem_pool; - port->mem_pool = c->mem_pool; - nxt_router_process_http_request_mp(task, ra, port); - port->mem_pool = port_mp; - nxt_router_ra_release(task, ra, ra->work.data); } diff --git a/src/nxt_runtime.c b/src/nxt_runtime.c index bfe62045..deab980e 100644 --- a/src/nxt_runtime.c +++ b/src/nxt_runtime.c @@ -297,6 +297,7 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt) #endif engine->id = rt->last_engine_id++; + engine->mem_pool = nxt_mp_create(1024, 128, 256, 32); nxt_queue_init(&rt->engines); nxt_queue_insert_tail(&rt->engines, &engine->link);