Using engine memiory pool for port write allocations.

To allow use port from different threads, the first step is to avoid using
port's memory pool for temporary allocations required to send data through
the port.  Including but not limited by:
  - buffers for data;
  - send message structures;
  - new mmap fd notifications;

It is still safe to use port memory pool for incoming buffers allocations
because recieve operation bound to single thread.
This commit is contained in:
Max Romanov
2017-10-04 14:58:13 +03:00
parent ba31199786
commit 414d508e04
7 changed files with 16 additions and 16 deletions

View File

@@ -1032,7 +1032,7 @@ nxt_controller_conf_store(nxt_task_t *task, nxt_conf_value_t *conf)
size = nxt_conf_json_length(conf, NULL); size = nxt_conf_json_length(conf, NULL);
b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0); b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
if (nxt_fast_path(b != NULL)) { if (nxt_fast_path(b != NULL)) {
b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL); b->mem.free = nxt_conf_json_print(b->mem.free, conf, NULL);

View File

@@ -769,7 +769,8 @@ nxt_main_cleanup_worker_process(nxt_task_t *task, nxt_pid_t pid)
port = nxt_process_port_first(process); port = nxt_process_port_first(process);
buf = nxt_buf_mem_alloc(port->mem_pool, sizeof(pid), 0); buf = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
sizeof(pid));
buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid)); buf->mem.free = nxt_cpymem(buf->mem.free, &pid, sizeof(pid));
nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID, nxt_port_socket_write(task, port, NXT_PORT_MSG_REMOVE_PID,
@@ -836,7 +837,8 @@ nxt_main_port_socket_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
nxt_log(task, NXT_LOG_CRIT, "%*s", size, ls.start); nxt_log(task, NXT_LOG_CRIT, "%*s", size, ls.start);
out = nxt_buf_mem_alloc(port->mem_pool, size + 1, 0); out = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
size + 1);
if (nxt_slow_path(out == NULL)) { if (nxt_slow_path(out == NULL)) {
return; return;
} }

View File

@@ -195,7 +195,8 @@ nxt_port_send_port(nxt_task_t *task, nxt_port_t *port, nxt_port_t *new_port,
nxt_buf_t *b; nxt_buf_t *b;
nxt_port_msg_new_port_t *msg; nxt_port_msg_new_port_t *msg;
b = nxt_buf_mem_ts_alloc(task, port->mem_pool, sizeof(nxt_port_data_t)); b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
sizeof(nxt_port_data_t));
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
@@ -347,7 +348,8 @@ nxt_port_change_log_file(nxt_task_t *task, nxt_runtime_t *rt, nxt_uint_t slot,
port = nxt_process_port_first(process); port = nxt_process_port_first(process);
b = nxt_buf_mem_alloc(port->mem_pool, sizeof(nxt_port_data_t), 0); b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool,
sizeof(nxt_port_data_t));
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
continue; continue;
} }

View File

@@ -426,7 +426,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
nxt_debug(task, "request %z bytes shm buffer", size); nxt_debug(task, "request %z bytes shm buffer", size);
b = nxt_buf_mem_ts_alloc(task, port->mem_pool, 0); b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, 0);
if (nxt_slow_path(b == NULL)) { if (nxt_slow_path(b == NULL)) {
return NULL; return NULL;
} }
@@ -436,7 +436,7 @@ nxt_port_mmap_get_buf(nxt_task_t *task, nxt_port_t *port, size_t size)
hdr = nxt_port_mmap_get(task, port, &c, size); hdr = nxt_port_mmap_get(task, port, &c, size);
if (nxt_slow_path(hdr == NULL)) { if (nxt_slow_path(hdr == NULL)) {
nxt_mp_release(port->mem_pool, b); nxt_mp_release(task->thread->engine->mem_pool, b);
return NULL; return NULL;
} }

View File

@@ -194,7 +194,8 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
} nxt_queue_loop; } nxt_queue_loop;
msg = nxt_mp_retain(port->mem_pool, sizeof(nxt_port_send_msg_t)); msg = nxt_mp_retain(task->thread->engine->mem_pool,
sizeof(nxt_port_send_msg_t));
if (nxt_slow_path(msg == NULL)) { if (nxt_slow_path(msg == NULL)) {
return NXT_ERROR; return NXT_ERROR;
} }
@@ -215,7 +216,7 @@ nxt_port_socket_write(nxt_task_t *task, nxt_port_t *port, nxt_uint_t type,
msg->work.data = task->thread->engine; msg->work.data = task->thread->engine;
msg->engine = task->thread->engine; msg->engine = task->thread->engine;
msg->mem_pool = port->mem_pool; msg->mem_pool = msg->engine->mem_pool;
msg->port_msg.stream = stream; msg->port_msg.stream = stream;
msg->port_msg.pid = nxt_pid; msg->port_msg.pid = nxt_pid;

View File

@@ -2498,7 +2498,7 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
size = app->name.length + 1 + app->conf.length; size = app->name.length + 1 + app->conf.length;
b = nxt_buf_mem_alloc(main_port->mem_pool, size, 0); b = nxt_buf_mem_ts_alloc(task, task->thread->engine->mem_pool, size);
nxt_buf_cpystr(b, &app->name); nxt_buf_cpystr(b, &app->name);
*b->mem.free++ = '\0'; *b->mem.free++ = '\0';
@@ -2964,7 +2964,6 @@ static void
nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c, nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_app_parse_ctx_t *ap) nxt_app_parse_ctx_t *ap)
{ {
nxt_mp_t *port_mp;
nxt_int_t res; nxt_int_t res;
nxt_port_t *port; nxt_port_t *port;
nxt_event_engine_t *engine; nxt_event_engine_t *engine;
@@ -3012,13 +3011,8 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid); nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
port_mp = port->mem_pool;
port->mem_pool = c->mem_pool;
nxt_router_process_http_request_mp(task, ra, port); nxt_router_process_http_request_mp(task, ra, port);
port->mem_pool = port_mp;
nxt_router_ra_release(task, ra, ra->work.data); nxt_router_ra_release(task, ra, ra->work.data);
} }

View File

@@ -297,6 +297,7 @@ nxt_runtime_event_engines(nxt_task_t *task, nxt_runtime_t *rt)
#endif #endif
engine->id = rt->last_engine_id++; engine->id = rt->last_engine_id++;
engine->mem_pool = nxt_mp_create(1024, 128, 256, 32);
nxt_queue_init(&rt->engines); nxt_queue_init(&rt->engines);
nxt_queue_insert_tail(&rt->engines, &engine->link); nxt_queue_insert_tail(&rt->engines, &engine->link);