Changing router to application port exchange protocol.
The application process needs to request the port from the router instead of the latter pushing the port before sending a request to the application. This is required to simplify the communication between the router and the application and to prepare the router to use the application shared port and then the queue.
This commit is contained in:
@@ -25,6 +25,7 @@ struct nxt_port_handlers_s {
|
|||||||
/* File descriptor exchange. */
|
/* File descriptor exchange. */
|
||||||
nxt_port_handler_t change_file;
|
nxt_port_handler_t change_file;
|
||||||
nxt_port_handler_t new_port;
|
nxt_port_handler_t new_port;
|
||||||
|
nxt_port_handler_t get_port;
|
||||||
nxt_port_handler_t mmap;
|
nxt_port_handler_t mmap;
|
||||||
|
|
||||||
/* New process */
|
/* New process */
|
||||||
@@ -77,6 +78,7 @@ typedef enum {
|
|||||||
|
|
||||||
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
|
_NXT_PORT_MSG_CHANGE_FILE = nxt_port_handler_idx(change_file),
|
||||||
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
|
_NXT_PORT_MSG_NEW_PORT = nxt_port_handler_idx(new_port),
|
||||||
|
_NXT_PORT_MSG_GET_PORT = nxt_port_handler_idx(get_port),
|
||||||
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
|
_NXT_PORT_MSG_MMAP = nxt_port_handler_idx(mmap),
|
||||||
|
|
||||||
_NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
|
_NXT_PORT_MSG_PROCESS_CREATED = nxt_port_handler_idx(process_created),
|
||||||
@@ -107,6 +109,7 @@ typedef enum {
|
|||||||
NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
|
NXT_PORT_MSG_ACCESS_LOG = nxt_msg_last(_NXT_PORT_MSG_ACCESS_LOG),
|
||||||
NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
|
NXT_PORT_MSG_CHANGE_FILE = nxt_msg_last(_NXT_PORT_MSG_CHANGE_FILE),
|
||||||
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
|
NXT_PORT_MSG_NEW_PORT = nxt_msg_last(_NXT_PORT_MSG_NEW_PORT),
|
||||||
|
NXT_PORT_MSG_GET_PORT = nxt_msg_last(_NXT_PORT_MSG_GET_PORT),
|
||||||
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
|
NXT_PORT_MSG_MMAP = nxt_msg_last(_NXT_PORT_MSG_MMAP)
|
||||||
| NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
|
| NXT_PORT_MSG_CLOSE_FD | NXT_PORT_MSG_SYNC,
|
||||||
|
|
||||||
@@ -238,6 +241,12 @@ typedef struct {
|
|||||||
} nxt_port_msg_new_port_t;
|
} nxt_port_msg_new_port_t;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
nxt_port_id_t id;
|
||||||
|
nxt_pid_t pid;
|
||||||
|
} nxt_port_msg_get_port_t;
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* nxt_port_data_t size is allocation size
|
* nxt_port_data_t size is allocation size
|
||||||
* which enables effective reuse of memory pool cache.
|
* which enables effective reuse of memory pool cache.
|
||||||
|
|||||||
@@ -1107,43 +1107,6 @@ nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port)
|
|
||||||
{
|
|
||||||
nxt_thread_mutex_lock(&process->cp_mutex);
|
|
||||||
|
|
||||||
nxt_port_hash_add(&process->connected_ports, port);
|
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_process_connected_port_remove(nxt_process_t *process, nxt_port_t *port)
|
|
||||||
{
|
|
||||||
nxt_thread_mutex_lock(&process->cp_mutex);
|
|
||||||
|
|
||||||
nxt_port_hash_remove(&process->connected_ports, port);
|
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
nxt_port_t *
|
|
||||||
nxt_process_connected_port_find(nxt_process_t *process, nxt_port_t *port)
|
|
||||||
{
|
|
||||||
nxt_port_t *res;
|
|
||||||
|
|
||||||
nxt_thread_mutex_lock(&process->cp_mutex);
|
|
||||||
|
|
||||||
res = nxt_port_hash_find(&process->connected_ports, port->pid, port->id);
|
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&process->cp_mutex);
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status)
|
nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -95,7 +95,6 @@ typedef struct {
|
|||||||
nxt_port_mmaps_t outgoing;
|
nxt_port_mmaps_t outgoing;
|
||||||
|
|
||||||
nxt_thread_mutex_t cp_mutex;
|
nxt_thread_mutex_t cp_mutex;
|
||||||
nxt_lvlhsh_t connected_ports; /* of nxt_port_t */
|
|
||||||
|
|
||||||
uint32_t stream;
|
uint32_t stream;
|
||||||
|
|
||||||
@@ -172,14 +171,6 @@ void nxt_process_use(nxt_task_t *task, nxt_process_t *process, int i);
|
|||||||
|
|
||||||
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
void nxt_process_close_ports(nxt_task_t *task, nxt_process_t *process);
|
||||||
|
|
||||||
void nxt_process_connected_port_add(nxt_process_t *process, nxt_port_t *port);
|
|
||||||
|
|
||||||
void nxt_process_connected_port_remove(nxt_process_t *process,
|
|
||||||
nxt_port_t *port);
|
|
||||||
|
|
||||||
nxt_port_t *nxt_process_connected_port_find(nxt_process_t *process,
|
|
||||||
nxt_port_t *port);
|
|
||||||
|
|
||||||
void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status);
|
void nxt_process_quit(nxt_task_t *task, nxt_uint_t exit_status);
|
||||||
void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
void nxt_signal_quit_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
||||||
|
|
||||||
|
|||||||
100
src/nxt_router.c
100
src/nxt_router.c
@@ -182,6 +182,8 @@ static void nxt_router_engine_post(nxt_event_engine_t *engine,
|
|||||||
nxt_work_t *jobs);
|
nxt_work_t *jobs);
|
||||||
|
|
||||||
static void nxt_router_thread_start(void *data);
|
static void nxt_router_thread_start(void *data);
|
||||||
|
static void nxt_router_rt_add_port(nxt_task_t *task, void *obj,
|
||||||
|
void *data);
|
||||||
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
|
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
|
||||||
void *data);
|
void *data);
|
||||||
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
|
static void nxt_router_listen_socket_update(nxt_task_t *task, void *obj,
|
||||||
@@ -253,6 +255,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
|
|||||||
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
|
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
|
||||||
void *data);
|
void *data);
|
||||||
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
static void nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
||||||
|
static void nxt_router_get_port_handler(nxt_task_t *task,
|
||||||
|
nxt_port_recv_msg_t *msg);
|
||||||
|
|
||||||
extern const nxt_http_request_state_t nxt_http_websocket;
|
extern const nxt_http_request_state_t nxt_http_websocket;
|
||||||
|
|
||||||
@@ -274,6 +278,7 @@ static const nxt_str_t *nxt_app_msg_prefix[] = {
|
|||||||
static const nxt_port_handlers_t nxt_router_process_port_handlers = {
|
static const nxt_port_handlers_t nxt_router_process_port_handlers = {
|
||||||
.quit = nxt_signal_quit_handler,
|
.quit = nxt_signal_quit_handler,
|
||||||
.new_port = nxt_router_new_port_handler,
|
.new_port = nxt_router_new_port_handler,
|
||||||
|
.get_port = nxt_router_get_port_handler,
|
||||||
.change_file = nxt_port_change_log_file_handler,
|
.change_file = nxt_port_change_log_file_handler,
|
||||||
.mmap = nxt_port_mmap_handler,
|
.mmap = nxt_port_mmap_handler,
|
||||||
.data = nxt_router_conf_data_handler,
|
.data = nxt_router_conf_data_handler,
|
||||||
@@ -2944,6 +2949,7 @@ nxt_router_thread_start(void *data)
|
|||||||
nxt_int_t ret;
|
nxt_int_t ret;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_task_t *task;
|
nxt_task_t *task;
|
||||||
|
nxt_work_t *work;
|
||||||
nxt_thread_t *thread;
|
nxt_thread_t *thread;
|
||||||
nxt_thread_link_t *link;
|
nxt_thread_link_t *link;
|
||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
@@ -2988,10 +2994,42 @@ nxt_router_thread_start(void *data)
|
|||||||
|
|
||||||
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
|
nxt_port_enable(task, port, &nxt_router_app_port_handlers);
|
||||||
|
|
||||||
|
work = nxt_zalloc(sizeof(nxt_work_t));
|
||||||
|
if (nxt_slow_path(work == NULL)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
work->handler = nxt_router_rt_add_port;
|
||||||
|
work->task = link->work.task;
|
||||||
|
work->obj = work;
|
||||||
|
work->data = port;
|
||||||
|
|
||||||
|
nxt_event_engine_post(link->work.task->thread->engine, work);
|
||||||
|
|
||||||
nxt_event_engine_start(engine);
|
nxt_event_engine_start(engine);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_rt_add_port(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_int_t res;
|
||||||
|
nxt_port_t *port;
|
||||||
|
nxt_runtime_t *rt;
|
||||||
|
|
||||||
|
rt = task->thread->runtime;
|
||||||
|
port = data;
|
||||||
|
|
||||||
|
nxt_free(obj);
|
||||||
|
|
||||||
|
res = nxt_port_hash_add(&rt->ports, port);
|
||||||
|
|
||||||
|
if (nxt_fast_path(res == NXT_OK)) {
|
||||||
|
nxt_port_use(task, port, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
|
nxt_router_listen_socket_create(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
@@ -3281,7 +3319,6 @@ nxt_router_conf_release(nxt_task_t *task, nxt_socket_conf_joint_t *joint)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* TODO remove engine->port */
|
/* TODO remove engine->port */
|
||||||
/* TODO excude from connected ports */
|
|
||||||
|
|
||||||
if (rtcf != NULL) {
|
if (rtcf != NULL) {
|
||||||
nxt_debug(task, "old router conf is destroyed");
|
nxt_debug(task, "old router conf is destroyed");
|
||||||
@@ -4937,7 +4974,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
|
|||||||
{
|
{
|
||||||
nxt_buf_t *buf;
|
nxt_buf_t *buf;
|
||||||
nxt_int_t res;
|
nxt_int_t res;
|
||||||
nxt_port_t *port, *c_port, *reply_port;
|
nxt_port_t *port, *reply_port;
|
||||||
nxt_apr_action_t apr_action;
|
nxt_apr_action_t apr_action;
|
||||||
|
|
||||||
nxt_assert(req_app_link->app_port != NULL);
|
nxt_assert(req_app_link->app_port != NULL);
|
||||||
@@ -4947,21 +4984,6 @@ nxt_router_app_prepare_request(nxt_task_t *task,
|
|||||||
|
|
||||||
apr_action = NXT_APR_REQUEST_FAILED;
|
apr_action = NXT_APR_REQUEST_FAILED;
|
||||||
|
|
||||||
c_port = nxt_process_connected_port_find(port->process, reply_port);
|
|
||||||
|
|
||||||
if (nxt_slow_path(c_port != reply_port)) {
|
|
||||||
res = nxt_port_send_port(task, port, reply_port, 0);
|
|
||||||
|
|
||||||
if (nxt_slow_path(res != NXT_OK)) {
|
|
||||||
nxt_request_app_link_error(task, port->app, req_app_link,
|
|
||||||
"Failed to send reply port to application");
|
|
||||||
|
|
||||||
goto release_port;
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_process_connected_port_add(port->process, reply_port);
|
|
||||||
}
|
|
||||||
|
|
||||||
buf = nxt_router_prepare_msg(task, req_app_link->request, port,
|
buf = nxt_router_prepare_msg(task, req_app_link->request, port,
|
||||||
nxt_app_msg_prefix[port->app->type]);
|
nxt_app_msg_prefix[port->app->type]);
|
||||||
|
|
||||||
@@ -5531,3 +5553,47 @@ nxt_router_oosm_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
-1, 0, 0, NULL);
|
-1, 0, 0, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_get_port_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
|
{
|
||||||
|
nxt_port_t *port, *reply_port;
|
||||||
|
nxt_runtime_t *rt;
|
||||||
|
nxt_port_msg_get_port_t *get_port_msg;
|
||||||
|
|
||||||
|
rt = task->thread->runtime;
|
||||||
|
|
||||||
|
reply_port = nxt_runtime_port_find(rt, msg->port_msg.pid,
|
||||||
|
msg->port_msg.reply_port);
|
||||||
|
if (nxt_slow_path(reply_port == NULL)) {
|
||||||
|
nxt_alert(task, "get_port_handler: reply_port %PI:%d not found",
|
||||||
|
msg->port_msg.pid, msg->port_msg.reply_port);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nxt_slow_path(nxt_buf_used_size(msg->buf)
|
||||||
|
< (int) sizeof(nxt_port_msg_get_port_t)))
|
||||||
|
{
|
||||||
|
nxt_alert(task, "get_port_handler: message buffer too small (%d)",
|
||||||
|
(int) nxt_buf_used_size(msg->buf));
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
get_port_msg = (nxt_port_msg_get_port_t *) msg->buf->mem.pos;
|
||||||
|
|
||||||
|
port = nxt_runtime_port_find(rt, get_port_msg->pid, get_port_msg->id);
|
||||||
|
if (nxt_slow_path(port == NULL)) {
|
||||||
|
nxt_alert(task, "get_port_handler: port %PI:%d not found",
|
||||||
|
get_port_msg->pid, get_port_msg->id);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "get port %PI:%d found", get_port_msg->pid,
|
||||||
|
get_port_msg->id);
|
||||||
|
|
||||||
|
(void) nxt_port_send_port(task, reply_port, port, msg->port_msg.stream);
|
||||||
|
}
|
||||||
|
|||||||
@@ -1389,8 +1389,6 @@ nxt_runtime_process_new(nxt_runtime_t *rt)
|
|||||||
void
|
void
|
||||||
nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
|
nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
|
||||||
{
|
{
|
||||||
nxt_port_t *port;
|
|
||||||
|
|
||||||
if (process->registered == 1) {
|
if (process->registered == 1) {
|
||||||
nxt_runtime_process_remove(rt, process);
|
nxt_runtime_process_remove(rt, process);
|
||||||
}
|
}
|
||||||
@@ -1401,11 +1399,6 @@ nxt_runtime_process_release(nxt_runtime_t *rt, nxt_process_t *process)
|
|||||||
nxt_port_mmaps_destroy(&process->incoming, 1);
|
nxt_port_mmaps_destroy(&process->incoming, 1);
|
||||||
nxt_port_mmaps_destroy(&process->outgoing, 1);
|
nxt_port_mmaps_destroy(&process->outgoing, 1);
|
||||||
|
|
||||||
do {
|
|
||||||
port = nxt_port_hash_retrieve(&process->connected_ports);
|
|
||||||
|
|
||||||
} while (port != NULL);
|
|
||||||
|
|
||||||
nxt_thread_mutex_destroy(&process->incoming.mutex);
|
nxt_thread_mutex_destroy(&process->incoming.mutex);
|
||||||
nxt_thread_mutex_destroy(&process->outgoing.mutex);
|
nxt_thread_mutex_destroy(&process->outgoing.mutex);
|
||||||
nxt_thread_mutex_destroy(&process->cp_mutex);
|
nxt_thread_mutex_destroy(&process->cp_mutex);
|
||||||
|
|||||||
266
src/nxt_unit.c
266
src/nxt_unit.c
@@ -55,6 +55,8 @@ static int nxt_unit_process_new_port(nxt_unit_ctx_t *ctx,
|
|||||||
nxt_unit_recv_msg_t *recv_msg);
|
nxt_unit_recv_msg_t *recv_msg);
|
||||||
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
|
static int nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_recv_msg_t *recv_msg);
|
nxt_unit_recv_msg_t *recv_msg);
|
||||||
|
static int nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
|
||||||
|
nxt_unit_port_id_t *port_id);
|
||||||
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
|
static int nxt_unit_process_websocket(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_recv_msg_t *recv_msg);
|
nxt_unit_recv_msg_t *recv_msg);
|
||||||
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
|
static int nxt_unit_process_shm_ack(nxt_unit_ctx_t *ctx);
|
||||||
@@ -119,6 +121,7 @@ static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_impl_t *lib,
|
|||||||
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
|
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
|
||||||
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
|
static void nxt_unit_read_buf(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_read_buf_t *rbuf);
|
nxt_unit_read_buf_t *rbuf);
|
||||||
|
static void nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl);
|
||||||
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
|
static void nxt_unit_ctx_free(nxt_unit_ctx_impl_t *ctx_impl);
|
||||||
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
|
static nxt_unit_port_t *nxt_unit_create_port(nxt_unit_ctx_t *ctx);
|
||||||
|
|
||||||
@@ -138,6 +141,7 @@ static void nxt_unit_remove_pid(nxt_unit_impl_t *lib, pid_t pid);
|
|||||||
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
|
static void nxt_unit_remove_process(nxt_unit_impl_t *lib,
|
||||||
nxt_unit_process_t *process);
|
nxt_unit_process_t *process);
|
||||||
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
|
static void nxt_unit_quit(nxt_unit_ctx_t *ctx);
|
||||||
|
static int nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
|
||||||
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
|
static ssize_t nxt_unit_port_send(nxt_unit_ctx_t *ctx,
|
||||||
nxt_unit_port_t *port, const void *buf, size_t buf_size,
|
nxt_unit_port_t *port, const void *buf, size_t buf_size,
|
||||||
const void *oob, size_t oob_size);
|
const void *oob, size_t oob_size);
|
||||||
@@ -215,7 +219,10 @@ struct nxt_unit_request_info_impl_s {
|
|||||||
nxt_unit_req_state_t state;
|
nxt_unit_req_state_t state;
|
||||||
uint8_t websocket;
|
uint8_t websocket;
|
||||||
|
|
||||||
|
/* for nxt_unit_ctx_impl_t.free_req or active_req */
|
||||||
nxt_queue_link_t link;
|
nxt_queue_link_t link;
|
||||||
|
/* for nxt_unit_port_impl_t.awaiting_req */
|
||||||
|
nxt_queue_link_t port_wait_link;
|
||||||
|
|
||||||
char extra_data[];
|
char extra_data[];
|
||||||
};
|
};
|
||||||
@@ -244,6 +251,7 @@ struct nxt_unit_ctx_impl_s {
|
|||||||
nxt_unit_ctx_t ctx;
|
nxt_unit_ctx_t ctx;
|
||||||
|
|
||||||
nxt_atomic_t use_count;
|
nxt_atomic_t use_count;
|
||||||
|
nxt_atomic_t wait_items;
|
||||||
|
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
|
|
||||||
@@ -265,6 +273,9 @@ struct nxt_unit_ctx_impl_s {
|
|||||||
/* of nxt_unit_request_info_impl_t */
|
/* of nxt_unit_request_info_impl_t */
|
||||||
nxt_lvlhsh_t requests;
|
nxt_lvlhsh_t requests;
|
||||||
|
|
||||||
|
/* of nxt_unit_request_info_impl_t */
|
||||||
|
nxt_queue_t ready_req;
|
||||||
|
|
||||||
nxt_unit_read_buf_t *pending_read_head;
|
nxt_unit_read_buf_t *pending_read_head;
|
||||||
nxt_unit_read_buf_t **pending_read_tail;
|
nxt_unit_read_buf_t **pending_read_tail;
|
||||||
nxt_unit_read_buf_t *free_read_buf;
|
nxt_unit_read_buf_t *free_read_buf;
|
||||||
@@ -309,6 +320,11 @@ struct nxt_unit_port_impl_s {
|
|||||||
|
|
||||||
nxt_queue_link_t link;
|
nxt_queue_link_t link;
|
||||||
nxt_unit_process_t *process;
|
nxt_unit_process_t *process;
|
||||||
|
|
||||||
|
/* of nxt_unit_request_info_impl_t */
|
||||||
|
nxt_queue_t awaiting_req;
|
||||||
|
|
||||||
|
int ready;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@@ -515,10 +531,12 @@ nxt_unit_ctx_init(nxt_unit_impl_t *lib, nxt_unit_ctx_impl_t *ctx_impl,
|
|||||||
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
|
nxt_queue_insert_tail(&lib->contexts, &ctx_impl->link);
|
||||||
|
|
||||||
ctx_impl->use_count = 1;
|
ctx_impl->use_count = 1;
|
||||||
|
ctx_impl->wait_items = 0;
|
||||||
|
|
||||||
nxt_queue_init(&ctx_impl->free_req);
|
nxt_queue_init(&ctx_impl->free_req);
|
||||||
nxt_queue_init(&ctx_impl->free_ws);
|
nxt_queue_init(&ctx_impl->free_ws);
|
||||||
nxt_queue_init(&ctx_impl->active_req);
|
nxt_queue_init(&ctx_impl->active_req);
|
||||||
|
nxt_queue_init(&ctx_impl->ready_req);
|
||||||
|
|
||||||
ctx_impl->free_buf = NULL;
|
ctx_impl->free_buf = NULL;
|
||||||
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
|
nxt_unit_mmap_buf_insert(&ctx_impl->free_buf, &ctx_impl->ctx_buf[1]);
|
||||||
@@ -973,8 +991,8 @@ nxt_unit_process_new_port(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
static int
|
static int
|
||||||
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||||
{
|
{
|
||||||
|
int res;
|
||||||
nxt_unit_impl_t *lib;
|
nxt_unit_impl_t *lib;
|
||||||
nxt_unit_port_t *port;
|
|
||||||
nxt_unit_port_id_t port_id;
|
nxt_unit_port_id_t port_id;
|
||||||
nxt_unit_request_t *r;
|
nxt_unit_request_t *r;
|
||||||
nxt_unit_mmap_buf_t *b;
|
nxt_unit_mmap_buf_t *b;
|
||||||
@@ -1004,28 +1022,8 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
return NXT_UNIT_ERROR;
|
return NXT_UNIT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
|
|
||||||
|
|
||||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
|
||||||
|
|
||||||
pthread_mutex_lock(&lib->mutex);
|
|
||||||
|
|
||||||
port = nxt_unit_port_hash_find(&lib->ports, &port_id, 0);
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&lib->mutex);
|
|
||||||
|
|
||||||
if (nxt_slow_path(port == NULL)) {
|
|
||||||
nxt_unit_alert(ctx, "#%"PRIu32": response port %d,%d not found",
|
|
||||||
recv_msg->stream,
|
|
||||||
(int) recv_msg->pid, (int) recv_msg->reply_port);
|
|
||||||
|
|
||||||
return NXT_UNIT_ERROR;
|
|
||||||
}
|
|
||||||
|
|
||||||
req = &req_impl->req;
|
req = &req_impl->req;
|
||||||
|
|
||||||
req->response_port = port;
|
|
||||||
|
|
||||||
req->request = recv_msg->start;
|
req->request = recv_msg->start;
|
||||||
|
|
||||||
b = recv_msg->incoming_buf;
|
b = recv_msg->incoming_buf;
|
||||||
@@ -1076,12 +1074,129 @@ nxt_unit_process_req_headers(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
|||||||
(char *) nxt_unit_sptr_get(&r->target),
|
(char *) nxt_unit_sptr_get(&r->target),
|
||||||
(int) r->content_length);
|
(int) r->content_length);
|
||||||
|
|
||||||
|
nxt_unit_port_id_init(&port_id, recv_msg->pid, recv_msg->reply_port);
|
||||||
|
|
||||||
|
res = nxt_unit_request_check_response_port(req, &port_id);
|
||||||
|
|
||||||
|
if (nxt_fast_path(res == NXT_UNIT_OK)) {
|
||||||
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
|
||||||
lib->callbacks.request_handler(req);
|
lib->callbacks.request_handler(req);
|
||||||
|
}
|
||||||
|
|
||||||
return NXT_UNIT_OK;
|
return NXT_UNIT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
nxt_unit_request_check_response_port(nxt_unit_request_info_t *req,
|
||||||
|
nxt_unit_port_id_t *port_id)
|
||||||
|
{
|
||||||
|
int res;
|
||||||
|
nxt_unit_ctx_t *ctx;
|
||||||
|
nxt_unit_impl_t *lib;
|
||||||
|
nxt_unit_port_t *port;
|
||||||
|
nxt_unit_ctx_impl_t *ctx_impl;
|
||||||
|
nxt_unit_port_impl_t *port_impl;
|
||||||
|
nxt_unit_request_info_impl_t *req_impl;
|
||||||
|
|
||||||
|
ctx = req->ctx;
|
||||||
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&lib->mutex);
|
||||||
|
|
||||||
|
port = nxt_unit_port_hash_find(&lib->ports, port_id, 0);
|
||||||
|
port_impl = nxt_container_of(port, nxt_unit_port_impl_t, port);
|
||||||
|
|
||||||
|
if (nxt_fast_path(port != NULL)) {
|
||||||
|
req->response_port = port;
|
||||||
|
|
||||||
|
if (nxt_fast_path(port_impl->ready)) {
|
||||||
|
pthread_mutex_unlock(&lib->mutex);
|
||||||
|
|
||||||
|
nxt_unit_debug(ctx, "check_response_port: found port{%d,%d}",
|
||||||
|
(int) port->id.pid, (int) port->id.id);
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_unit_debug(ctx, "check_response_port: "
|
||||||
|
"port{%d,%d} already requested",
|
||||||
|
(int) port->id.pid, (int) port->id.id);
|
||||||
|
|
||||||
|
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
|
||||||
|
|
||||||
|
nxt_queue_insert_tail(&port_impl->awaiting_req,
|
||||||
|
&req_impl->port_wait_link);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&lib->mutex);
|
||||||
|
|
||||||
|
nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
|
||||||
|
|
||||||
|
return NXT_UNIT_AGAIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
port_impl = malloc(sizeof(nxt_unit_port_impl_t));
|
||||||
|
if (nxt_slow_path(port_impl == NULL)) {
|
||||||
|
nxt_unit_alert(ctx, "check_response_port: malloc(%d) failed",
|
||||||
|
(int) sizeof(nxt_unit_port_impl_t));
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&lib->mutex);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
port = &port_impl->port;
|
||||||
|
|
||||||
|
port->id = *port_id;
|
||||||
|
port->in_fd = -1;
|
||||||
|
port->out_fd = -1;
|
||||||
|
port->data = NULL;
|
||||||
|
|
||||||
|
res = nxt_unit_port_hash_add(&lib->ports, port);
|
||||||
|
if (nxt_slow_path(res != NXT_UNIT_OK)) {
|
||||||
|
nxt_unit_alert(ctx, "check_response_port: %d,%d hash_add failed",
|
||||||
|
port->id.pid, port->id.id);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&lib->mutex);
|
||||||
|
|
||||||
|
free(port);
|
||||||
|
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
req_impl = nxt_container_of(req, nxt_unit_request_info_impl_t, req);
|
||||||
|
|
||||||
|
nxt_queue_insert_tail(&req_impl->process->ports, &port_impl->link);
|
||||||
|
|
||||||
|
port_impl->process = req_impl->process;
|
||||||
|
|
||||||
|
|
||||||
|
nxt_queue_init(&port_impl->awaiting_req);
|
||||||
|
|
||||||
|
nxt_queue_insert_tail(&port_impl->awaiting_req, &req_impl->port_wait_link);
|
||||||
|
|
||||||
|
port_impl->use_count = 2;
|
||||||
|
port_impl->ready = 0;
|
||||||
|
|
||||||
|
req->response_port = port;
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&lib->mutex);
|
||||||
|
|
||||||
|
nxt_unit_process_use(port_impl->process);
|
||||||
|
|
||||||
|
res = nxt_unit_get_port(ctx, port_id);
|
||||||
|
if (nxt_slow_path(res == NXT_UNIT_ERROR)) {
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_atomic_fetch_add(&ctx_impl->wait_items, 1);
|
||||||
|
|
||||||
|
return NXT_UNIT_AGAIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int
|
static int
|
||||||
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
nxt_unit_process_websocket(nxt_unit_ctx_t *ctx, nxt_unit_recv_msg_t *recv_msg)
|
||||||
{
|
{
|
||||||
@@ -4041,6 +4156,8 @@ nxt_unit_run_once(nxt_unit_ctx_t *ctx)
|
|||||||
|
|
||||||
nxt_unit_read_buf_release(ctx, rbuf);
|
nxt_unit_read_buf_release(ctx, rbuf);
|
||||||
|
|
||||||
|
nxt_unit_process_ready_req(ctx_impl);
|
||||||
|
|
||||||
nxt_unit_ctx_release(ctx_impl);
|
nxt_unit_ctx_release(ctx_impl);
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
@@ -4062,6 +4179,39 @@ nxt_unit_read_buf(nxt_unit_ctx_t *ctx, nxt_unit_read_buf_t *rbuf)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_unit_process_ready_req(nxt_unit_ctx_impl_t *ctx_impl)
|
||||||
|
{
|
||||||
|
nxt_queue_t ready_req;
|
||||||
|
nxt_unit_impl_t *lib;
|
||||||
|
nxt_unit_request_info_impl_t *req_impl;
|
||||||
|
|
||||||
|
nxt_queue_init(&ready_req);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
|
if (nxt_queue_is_empty(&ctx_impl->ready_req)) {
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_queue_add(&ready_req, &ctx_impl->ready_req);
|
||||||
|
nxt_queue_init(&ctx_impl->ready_req);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
|
nxt_queue_each(req_impl, &ready_req,
|
||||||
|
nxt_unit_request_info_impl_t, port_wait_link)
|
||||||
|
{
|
||||||
|
lib = nxt_container_of(ctx_impl->ctx.unit, nxt_unit_impl_t, unit);
|
||||||
|
|
||||||
|
lib->callbacks.request_handler(&req_impl->req);
|
||||||
|
|
||||||
|
} nxt_queue_loop;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void
|
void
|
||||||
nxt_unit_done(nxt_unit_ctx_t *ctx)
|
nxt_unit_done(nxt_unit_ctx_t *ctx)
|
||||||
{
|
{
|
||||||
@@ -4372,10 +4522,13 @@ static nxt_unit_port_t *
|
|||||||
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
|
nxt_queue_t awaiting_req;
|
||||||
nxt_unit_impl_t *lib;
|
nxt_unit_impl_t *lib;
|
||||||
nxt_unit_port_t *old_port;
|
nxt_unit_port_t *old_port;
|
||||||
nxt_unit_process_t *process;
|
nxt_unit_process_t *process;
|
||||||
nxt_unit_port_impl_t *new_port;
|
nxt_unit_ctx_impl_t *ctx_impl;
|
||||||
|
nxt_unit_port_impl_t *new_port, *old_port_impl;
|
||||||
|
nxt_unit_request_info_impl_t *req_impl;
|
||||||
|
|
||||||
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
|
||||||
@@ -4415,6 +4568,17 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
|||||||
|
|
||||||
*port = *old_port;
|
*port = *old_port;
|
||||||
|
|
||||||
|
nxt_queue_init(&awaiting_req);
|
||||||
|
|
||||||
|
old_port_impl = nxt_container_of(old_port, nxt_unit_port_impl_t, port);
|
||||||
|
|
||||||
|
if (!nxt_queue_is_empty(&old_port_impl->awaiting_req)) {
|
||||||
|
nxt_queue_add(&awaiting_req, &old_port_impl->awaiting_req);
|
||||||
|
nxt_queue_init(&old_port_impl->awaiting_req);
|
||||||
|
}
|
||||||
|
|
||||||
|
old_port_impl->ready = (port->in_fd != -1 || port->out_fd != -1);
|
||||||
|
|
||||||
pthread_mutex_unlock(&lib->mutex);
|
pthread_mutex_unlock(&lib->mutex);
|
||||||
|
|
||||||
if (lib->callbacks.add_port != NULL
|
if (lib->callbacks.add_port != NULL
|
||||||
@@ -4423,6 +4587,25 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
|||||||
lib->callbacks.add_port(ctx, old_port);
|
lib->callbacks.add_port(ctx, old_port);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_queue_each(req_impl, &awaiting_req,
|
||||||
|
nxt_unit_request_info_impl_t, port_wait_link)
|
||||||
|
{
|
||||||
|
nxt_queue_remove(&req_impl->port_wait_link);
|
||||||
|
|
||||||
|
ctx_impl = nxt_container_of(req_impl->req.ctx, nxt_unit_ctx_impl_t,
|
||||||
|
ctx);
|
||||||
|
|
||||||
|
pthread_mutex_lock(&ctx_impl->mutex);
|
||||||
|
|
||||||
|
nxt_queue_insert_tail(&ctx_impl->ready_req,
|
||||||
|
&req_impl->port_wait_link);
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&ctx_impl->mutex);
|
||||||
|
|
||||||
|
nxt_atomic_fetch_add(&ctx_impl->wait_items, -1);
|
||||||
|
|
||||||
|
} nxt_queue_loop;
|
||||||
|
|
||||||
return old_port;
|
return old_port;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -4464,6 +4647,9 @@ nxt_unit_add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
|
|||||||
|
|
||||||
new_port->use_count = 2;
|
new_port->use_count = 2;
|
||||||
new_port->process = process;
|
new_port->process = process;
|
||||||
|
new_port->ready = (port->in_fd != -1 || port->out_fd != -1);
|
||||||
|
|
||||||
|
nxt_queue_init(&new_port->awaiting_req);
|
||||||
|
|
||||||
process = NULL;
|
process = NULL;
|
||||||
|
|
||||||
@@ -4608,6 +4794,42 @@ nxt_unit_quit(nxt_unit_ctx_t *ctx)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int
|
||||||
|
nxt_unit_get_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
|
||||||
|
{
|
||||||
|
ssize_t res;
|
||||||
|
nxt_unit_impl_t *lib;
|
||||||
|
nxt_unit_ctx_impl_t *ctx_impl;
|
||||||
|
|
||||||
|
struct {
|
||||||
|
nxt_port_msg_t msg;
|
||||||
|
nxt_port_msg_get_port_t get_port;
|
||||||
|
} m;
|
||||||
|
|
||||||
|
lib = nxt_container_of(ctx->unit, nxt_unit_impl_t, unit);
|
||||||
|
ctx_impl = nxt_container_of(ctx, nxt_unit_ctx_impl_t, ctx);
|
||||||
|
|
||||||
|
memset(&m.msg, 0, sizeof(nxt_port_msg_t));
|
||||||
|
|
||||||
|
m.msg.pid = lib->pid;
|
||||||
|
m.msg.reply_port = ctx_impl->read_port->id.id;
|
||||||
|
m.msg.type = _NXT_PORT_MSG_GET_PORT;
|
||||||
|
|
||||||
|
m.get_port.id = port_id->id;
|
||||||
|
m.get_port.pid = port_id->pid;
|
||||||
|
|
||||||
|
nxt_unit_debug(ctx, "get_port: %d %d", (int) port_id->pid,
|
||||||
|
(int) port_id->id);
|
||||||
|
|
||||||
|
res = nxt_unit_port_send(ctx, lib->router_port, &m, sizeof(m), NULL, 0);
|
||||||
|
if (nxt_slow_path(res != sizeof(m))) {
|
||||||
|
return NXT_UNIT_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
return NXT_UNIT_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static ssize_t
|
static ssize_t
|
||||||
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
nxt_unit_port_send(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port,
|
||||||
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
|
const void *buf, size_t buf_size, const void *oob, size_t oob_size)
|
||||||
|
|||||||
@@ -19,6 +19,7 @@
|
|||||||
enum {
|
enum {
|
||||||
NXT_UNIT_OK = 0,
|
NXT_UNIT_OK = 0,
|
||||||
NXT_UNIT_ERROR = 1,
|
NXT_UNIT_ERROR = 1,
|
||||||
|
NXT_UNIT_AGAIN = 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
|||||||
Reference in New Issue
Block a user