Libunit refactoring: port management.

- Changed the port management callbacks to notifications, which e. g. avoids
the need to call the libunit function
- Added context and library instance reference counts for a safer resource
release
- Added the router main port initialization
This commit is contained in:
Max Romanov
2020-08-11 19:19:55 +03:00
parent 3a721e1d96
commit ec3389b63b
8 changed files with 380 additions and 337 deletions

View File

@@ -14,7 +14,7 @@ static void nxt_cgo_request_handler(nxt_unit_request_info_t *req);
static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst, static nxt_cgo_str_t *nxt_cgo_str_init(nxt_cgo_str_t *dst,
nxt_unit_sptr_t *sptr, uint32_t length); nxt_unit_sptr_t *sptr, uint32_t length);
static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port); static int nxt_cgo_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
static void nxt_cgo_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); static void nxt_cgo_remove_port(nxt_unit_t *, nxt_unit_port_t *port);
static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, static ssize_t nxt_cgo_port_send(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
const void *buf, size_t buf_size, const void *oob, size_t oob_size); const void *buf, size_t buf_size, const void *oob, size_t oob_size);
static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, static ssize_t nxt_cgo_port_recv(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
@@ -108,16 +108,17 @@ nxt_cgo_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
nxt_go_add_port(port->id.pid, port->id.id, nxt_go_add_port(port->id.pid, port->id.id,
port->in_fd, port->out_fd); port->in_fd, port->out_fd);
return nxt_unit_add_port(ctx, port); port->in_fd = -1;
port->out_fd = -1;
return NXT_UNIT_OK;
} }
static void static void
nxt_cgo_remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) nxt_cgo_remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
{ {
nxt_go_remove_port(port_id->pid, port_id->id); nxt_go_remove_port(port->id.pid, port->id.id);
nxt_unit_remove_port(ctx, port_id);
} }

View File

@@ -18,7 +18,8 @@ static void delete_port_data(uv_handle_t* handle);
napi_ref Unit::constructor_; napi_ref Unit::constructor_;
struct nxt_nodejs_ctx_t { struct port_data_t {
nxt_unit_ctx_t *ctx;
nxt_unit_port_id_t port_id; nxt_unit_port_id_t port_id;
uv_poll_t poll; uv_poll_t poll;
}; };
@@ -360,8 +361,8 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
int err; int err;
Unit *obj; Unit *obj;
uv_loop_t *loop; uv_loop_t *loop;
port_data_t *data;
napi_status status; napi_status status;
nxt_nodejs_ctx_t *node_ctx;
if (port->in_fd != -1) { if (port->in_fd != -1) {
obj = reinterpret_cast<Unit *>(ctx->unit->data); obj = reinterpret_cast<Unit *>(ctx->unit->data);
@@ -378,27 +379,28 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
node_ctx = new nxt_nodejs_ctx_t; data = new port_data_t;
err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); err = uv_poll_init(loop, &data->poll, port->in_fd);
if (err < 0) { if (err < 0) {
nxt_unit_warn(ctx, "Failed to init uv.poll"); nxt_unit_warn(ctx, "Failed to init uv.poll");
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); err = uv_poll_start(&data->poll, UV_READABLE, nxt_uv_read_callback);
if (err < 0) { if (err < 0) {
nxt_unit_warn(ctx, "Failed to start uv.poll"); nxt_unit_warn(ctx, "Failed to start uv.poll");
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
ctx->data = node_ctx; port->data = data;
node_ctx->port_id = port->id; data->ctx = ctx;
node_ctx->poll.data = ctx; data->port_id = port->id;
data->poll.data = ctx;
} }
return nxt_unit_add_port(ctx, port); return NXT_UNIT_OK;
} }
@@ -410,35 +412,31 @@ operator == (const nxt_unit_port_id_t &p1, const nxt_unit_port_id_t &p2)
void void
Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id) Unit::remove_port(nxt_unit_t *unit, nxt_unit_port_t *port)
{ {
nxt_nodejs_ctx_t *node_ctx; port_data_t *data;
if (ctx->data != NULL) { if (port->data != NULL) {
node_ctx = (nxt_nodejs_ctx_t *) ctx->data; data = (port_data_t *) port->data;
if (node_ctx->port_id == *port_id) { if (data->port_id == port->id) {
uv_poll_stop(&node_ctx->poll); uv_poll_stop(&data->poll);
node_ctx->poll.data = node_ctx; data->poll.data = data;
uv_close((uv_handle_t *) &node_ctx->poll, delete_port_data); uv_close((uv_handle_t *) &data->poll, delete_port_data);
ctx->data = NULL;
} }
} }
nxt_unit_remove_port(ctx, port_id);
} }
static void static void
delete_port_data(uv_handle_t* handle) delete_port_data(uv_handle_t* handle)
{ {
nxt_nodejs_ctx_t *node_ctx; port_data_t *data;
node_ctx = (nxt_nodejs_ctx_t *) handle->data; data = (port_data_t *) handle->data;
delete node_ctx; delete data;
} }

View File

@@ -40,7 +40,7 @@ private:
void shm_ack_handler(nxt_unit_ctx_t *ctx); void shm_ack_handler(nxt_unit_ctx_t *ctx);
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static void remove_port(nxt_unit_t *unit, nxt_unit_port_t *port);
static void quit_cb(nxt_unit_ctx_t *ctx); static void quit_cb(nxt_unit_ctx_t *ctx);
void quit(nxt_unit_ctx_t *ctx); void quit(nxt_unit_ctx_t *ctx);

View File

@@ -1263,7 +1263,7 @@ nxt_app_parse_type(u_char *p, size_t length)
nxt_int_t nxt_int_t
nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init) nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
{ {
nxt_port_t *my_port, *main_port; nxt_port_t *my_port, *main_port, *router_port;
nxt_runtime_t *rt; nxt_runtime_t *rt;
nxt_memzero(init, sizeof(nxt_unit_init_t)); nxt_memzero(init, sizeof(nxt_unit_init_t));
@@ -1275,6 +1275,11 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
return NXT_ERROR; return NXT_ERROR;
} }
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
if (nxt_slow_path(router_port == NULL)) {
return NXT_ERROR;
}
my_port = nxt_runtime_port_find(rt, nxt_pid, 0); my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
if (nxt_slow_path(my_port == NULL)) { if (nxt_slow_path(my_port == NULL)) {
return NXT_ERROR; return NXT_ERROR;
@@ -1289,6 +1294,13 @@ nxt_unit_default_init(nxt_task_t *task, nxt_unit_init_t *init)
init->ready_stream = my_port->process->stream; init->ready_stream = my_port->process->stream;
init->router_port.id.pid = router_port->pid;
init->router_port.id.id = router_port->id;
init->router_port.in_fd = -1;
init->router_port.out_fd = router_port->pair[1];
nxt_fd_blocking(task, router_port->pair[1]);
init->read_port.id.pid = my_port->pid; init->read_port.id.pid = my_port->pid;
init->read_port.id.id = my_port->id; init->read_port.id.id = my_port->id;
init->read_port.in_fd = my_port->pair[0]; init->read_port.in_fd = my_port->pair[0];

View File

@@ -69,7 +69,7 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
nxt_str_t str; nxt_str_t str;
nxt_int_t rc; nxt_int_t rc;
nxt_uint_t i, argc; nxt_uint_t i, argc;
nxt_port_t *my_port, *main_port; nxt_port_t *my_port, *main_port, *router_port;
nxt_runtime_t *rt; nxt_runtime_t *rt;
nxt_conf_value_t *value; nxt_conf_value_t *value;
nxt_common_app_conf_t *conf; nxt_common_app_conf_t *conf;
@@ -79,9 +79,12 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
conf = data->app; conf = data->app;
main_port = rt->port_by_type[NXT_PROCESS_MAIN]; main_port = rt->port_by_type[NXT_PROCESS_MAIN];
router_port = rt->port_by_type[NXT_PROCESS_ROUTER];
my_port = nxt_runtime_port_find(rt, nxt_pid, 0); my_port = nxt_runtime_port_find(rt, nxt_pid, 0);
if (nxt_slow_path(main_port == NULL || my_port == NULL)) { if (nxt_slow_path(main_port == NULL || my_port == NULL
|| router_port == NULL))
{
return NXT_ERROR; return NXT_ERROR;
} }
@@ -90,6 +93,11 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
return NXT_ERROR; return NXT_ERROR;
} }
rc = nxt_external_fd_no_cloexec(task, router_port->pair[1]);
if (nxt_slow_path(rc != NXT_OK)) {
return NXT_ERROR;
}
rc = nxt_external_fd_no_cloexec(task, my_port->pair[0]); rc = nxt_external_fd_no_cloexec(task, my_port->pair[0]);
if (nxt_slow_path(rc != NXT_OK)) { if (nxt_slow_path(rc != NXT_OK)) {
return NXT_ERROR; return NXT_ERROR;
@@ -101,9 +109,11 @@ nxt_external_start(nxt_task_t *task, nxt_process_data_t *data)
"%s;%uD;" "%s;%uD;"
"%PI,%ud,%d;" "%PI,%ud,%d;"
"%PI,%ud,%d;" "%PI,%ud,%d;"
"%PI,%ud,%d;"
"%d,%z,%Z", "%d,%z,%Z",
NXT_VERSION, my_port->process->stream, NXT_VERSION, my_port->process->stream,
main_port->pid, main_port->id, main_port->pair[1], main_port->pid, main_port->id, main_port->pair[1],
router_port->pid, router_port->id, router_port->pair[1],
my_port->pid, my_port->id, my_port->pair[0], my_port->pid, my_port->id, my_port->pair[0],
2, conf->shm_limit); 2, conf->shm_limit);

View File

@@ -61,7 +61,7 @@ nxt_bool_t nxt_proc_conn_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {
{ 1, 0, 0, 0, 0 }, { 1, 0, 0, 0, 0 },
{ 1, 0, 0, 1, 0 }, { 1, 0, 0, 1, 0 },
{ 1, 0, 1, 0, 1 }, { 1, 0, 1, 0, 1 },
{ 1, 0, 0, 0, 0 }, { 1, 0, 0, 1, 0 },
}; };
nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = { nxt_bool_t nxt_proc_remove_notify_matrix[NXT_PROCESS_MAX][NXT_PROCESS_MAX] = {

File diff suppressed because it is too large Load Diff

View File

@@ -130,15 +130,15 @@ struct nxt_unit_callbacks_s {
int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port); int (*add_port)(nxt_unit_ctx_t *, nxt_unit_port_t *port);
/* Remove previously added port. Optional. */ /* Remove previously added port. Optional. */
void (*remove_port)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id); void (*remove_port)(nxt_unit_t *, nxt_unit_port_t *port);
/* Remove all data associated with process pid including ports. Optional. */ /* Remove all data associated with process pid including ports. Optional. */
void (*remove_pid)(nxt_unit_ctx_t *, pid_t pid); void (*remove_pid)(nxt_unit_t *, pid_t pid);
/* Gracefully quit the application. Optional. */ /* Gracefully quit the application. Optional. */
void (*quit)(nxt_unit_ctx_t *); void (*quit)(nxt_unit_ctx_t *);
/* Shared memory release acknowledgement. */ /* Shared memory release acknowledgement. Optional. */
void (*shm_ack_handler)(nxt_unit_ctx_t *); void (*shm_ack_handler)(nxt_unit_ctx_t *);
/* Send data and control to process pid using port id. Optional. */ /* Send data and control to process pid using port id. Optional. */
@@ -149,7 +149,6 @@ struct nxt_unit_callbacks_s {
/* Receive data on port id. Optional. */ /* Receive data on port id. Optional. */
ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id, ssize_t (*port_recv)(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
void *buf, size_t buf_size, void *oob, size_t oob_size); void *buf, size_t buf_size, void *oob, size_t oob_size);
}; };
@@ -165,6 +164,7 @@ struct nxt_unit_init_s {
nxt_unit_port_t ready_port; nxt_unit_port_t ready_port;
uint32_t ready_stream; uint32_t ready_stream;
nxt_unit_port_t router_port;
nxt_unit_port_t read_port; nxt_unit_port_t read_port;
int log_fd; int log_fd;
}; };
@@ -222,45 +222,9 @@ void nxt_unit_done(nxt_unit_ctx_t *);
*/ */
nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *); nxt_unit_ctx_t *nxt_unit_ctx_alloc(nxt_unit_ctx_t *, void *);
/* Free unused context. It is not required to free main context. */
void nxt_unit_ctx_free(nxt_unit_ctx_t *);
/* Initialize port_id, calculate hash. */ /* Initialize port_id, calculate hash. */
void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id); void nxt_unit_port_id_init(nxt_unit_port_id_t *port_id, pid_t pid, uint16_t id);
/*
* Create extra incoming port, perform all required actions to propogate
* the port to Unit server. Fills structure referenced by port_id with
* current pid and new port id.
*/
int nxt_unit_create_send_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *dst,
nxt_unit_port_id_t *port_id);
/* Default 'add_port' implementation. */
int nxt_unit_add_port(nxt_unit_ctx_t *, nxt_unit_port_t *port);
/* Find previously added port. */
nxt_unit_port_t *nxt_unit_find_port(nxt_unit_ctx_t *,
nxt_unit_port_id_t *port_id);
/* Find, fill output 'port' and remove port from storage. */
void nxt_unit_find_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
nxt_unit_port_t *port);
/* Default 'remove_port' implementation. */
void nxt_unit_remove_port(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id);
/* Default 'remove_pid' implementation. */
void nxt_unit_remove_pid(nxt_unit_ctx_t *, pid_t pid);
/* Default 'quit' implementation. */
void nxt_unit_quit(nxt_unit_ctx_t *);
/* Default 'port_send' implementation. */
ssize_t nxt_unit_port_send(nxt_unit_ctx_t *, int fd,
const void *buf, size_t buf_size,
const void *oob, size_t oob_size);
/* Default 'port_recv' implementation. */ /* Default 'port_recv' implementation. */
ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size, ssize_t nxt_unit_port_recv(nxt_unit_ctx_t *, int fd, void *buf, size_t buf_size,
void *oob, size_t oob_size); void *oob, size_t oob_size);