Introducing application timeout.
This commit is contained in:
@@ -495,7 +495,7 @@ nxt_conf_map_object(nxt_mp_t *mp, nxt_conf_value_t *value, nxt_conf_map_t *map,
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case NXT_CONF_MAP_MSEC:
|
case NXT_CONF_MAP_MSEC:
|
||||||
ptr->msec = v->u.integer;
|
ptr->msec = v->u.integer * 1000;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -66,6 +66,21 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_listener_members[] = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static nxt_conf_vldt_object_t nxt_conf_vldt_app_limits_members[] = {
|
||||||
|
{ nxt_string("timeout"),
|
||||||
|
NXT_CONF_INTEGER,
|
||||||
|
NULL,
|
||||||
|
NULL },
|
||||||
|
|
||||||
|
{ nxt_string("requests"),
|
||||||
|
NXT_CONF_INTEGER,
|
||||||
|
NULL,
|
||||||
|
NULL },
|
||||||
|
|
||||||
|
{ nxt_null_string, 0, NULL, NULL }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = {
|
static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = {
|
||||||
{ nxt_string("type"),
|
{ nxt_string("type"),
|
||||||
NXT_CONF_STRING,
|
NXT_CONF_STRING,
|
||||||
@@ -77,6 +92,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = {
|
|||||||
NULL,
|
NULL,
|
||||||
NULL },
|
NULL },
|
||||||
|
|
||||||
|
{ nxt_string("limits"),
|
||||||
|
NXT_CONF_OBJECT,
|
||||||
|
&nxt_conf_vldt_object,
|
||||||
|
(void *) &nxt_conf_vldt_app_limits_members },
|
||||||
|
|
||||||
{ nxt_string("user"),
|
{ nxt_string("user"),
|
||||||
NXT_CONF_STRING,
|
NXT_CONF_STRING,
|
||||||
nxt_conf_vldt_system,
|
nxt_conf_vldt_system,
|
||||||
@@ -117,6 +137,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_php_members[] = {
|
|||||||
NULL,
|
NULL,
|
||||||
NULL },
|
NULL },
|
||||||
|
|
||||||
|
{ nxt_string("limits"),
|
||||||
|
NXT_CONF_OBJECT,
|
||||||
|
&nxt_conf_vldt_object,
|
||||||
|
(void *) &nxt_conf_vldt_app_limits_members },
|
||||||
|
|
||||||
{ nxt_string("user"),
|
{ nxt_string("user"),
|
||||||
NXT_CONF_STRING,
|
NXT_CONF_STRING,
|
||||||
nxt_conf_vldt_system,
|
nxt_conf_vldt_system,
|
||||||
@@ -162,6 +187,11 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_go_members[] = {
|
|||||||
NULL,
|
NULL,
|
||||||
NULL },
|
NULL },
|
||||||
|
|
||||||
|
{ nxt_string("limits"),
|
||||||
|
NXT_CONF_OBJECT,
|
||||||
|
&nxt_conf_vldt_object,
|
||||||
|
(void *) &nxt_conf_vldt_app_limits_members },
|
||||||
|
|
||||||
{ nxt_string("user"),
|
{ nxt_string("user"),
|
||||||
NXT_CONF_STRING,
|
NXT_CONF_STRING,
|
||||||
nxt_conf_vldt_system,
|
nxt_conf_vldt_system,
|
||||||
|
|||||||
@@ -152,35 +152,3 @@ nxt_conn_work_queue_set(nxt_conn_t *c, nxt_work_queue_t *wq)
|
|||||||
c->read_timer.work_queue = wq;
|
c->read_timer.work_queue = wq;
|
||||||
c->write_timer.work_queue = wq;
|
c->write_timer.work_queue = wq;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_req_conn_link_t *
|
|
||||||
nxt_conn_request_add(nxt_conn_t *c, nxt_req_id_t req_id)
|
|
||||||
{
|
|
||||||
nxt_req_conn_link_t *rc;
|
|
||||||
|
|
||||||
rc = nxt_mp_zalloc(c->mem_pool, sizeof(nxt_req_conn_link_t));
|
|
||||||
if (nxt_slow_path(rc == NULL)) {
|
|
||||||
nxt_thread_log_error(NXT_LOG_WARN, "failed to allocate req %08uxD "
|
|
||||||
"to conn", req_id);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
rc->req_id = req_id;
|
|
||||||
rc->conn = c;
|
|
||||||
|
|
||||||
nxt_queue_insert_tail(&c->requests, &rc->link);
|
|
||||||
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_conn_request_remove(nxt_conn_t *c, nxt_req_conn_link_t *rc)
|
|
||||||
{
|
|
||||||
nxt_queue_remove(&rc->link);
|
|
||||||
|
|
||||||
nxt_mp_free(c->mem_pool, rc);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -175,17 +175,6 @@ struct nxt_conn_s {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
typedef uint32_t nxt_req_id_t;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
nxt_req_id_t req_id;
|
|
||||||
nxt_conn_t *conn;
|
|
||||||
nxt_port_t *app_port;
|
|
||||||
|
|
||||||
nxt_queue_link_t link; /* for nxt_conn_t.requests */
|
|
||||||
} nxt_req_conn_link_t;
|
|
||||||
|
|
||||||
|
|
||||||
#define nxt_conn_timer_init(ev, c, wq) \
|
#define nxt_conn_timer_init(ev, c, wq) \
|
||||||
do { \
|
do { \
|
||||||
(ev)->work_queue = (wq); \
|
(ev)->work_queue = (wq); \
|
||||||
@@ -353,10 +342,4 @@ NXT_EXPORT void nxt_conn_proxy(nxt_task_t *task, nxt_conn_proxy_t *p);
|
|||||||
#define nxt_event_conn_close nxt_conn_close
|
#define nxt_event_conn_close nxt_conn_close
|
||||||
|
|
||||||
|
|
||||||
NXT_EXPORT nxt_req_conn_link_t *nxt_conn_request_add(nxt_conn_t *c,
|
|
||||||
nxt_req_id_t req_id);
|
|
||||||
NXT_EXPORT void nxt_conn_request_remove(nxt_conn_t *c,
|
|
||||||
nxt_req_conn_link_t *rc);
|
|
||||||
|
|
||||||
|
|
||||||
#endif /* _NXT_CONN_H_INCLUDED_ */
|
#endif /* _NXT_CONN_H_INCLUDED_ */
|
||||||
|
|||||||
@@ -232,6 +232,12 @@ nxt_event_engine_post(nxt_event_engine_t *engine, nxt_work_t *work)
|
|||||||
{
|
{
|
||||||
nxt_debug(&engine->task, "event engine post");
|
nxt_debug(&engine->task, "event engine post");
|
||||||
|
|
||||||
|
#if (NXT_DEBUG)
|
||||||
|
if (nxt_slow_path(work->next != NULL)) {
|
||||||
|
nxt_debug(&engine->task, "event engine post multiple works");
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
nxt_locked_work_queue_add(&engine->locked_work_queue, work);
|
nxt_locked_work_queue_add(&engine->locked_work_queue, work);
|
||||||
|
|
||||||
nxt_event_engine_signal(engine, 0);
|
nxt_event_engine_signal(engine, 0);
|
||||||
@@ -530,117 +536,6 @@ nxt_event_engine_start(nxt_event_engine_t *engine)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t
|
|
||||||
nxt_req_conn_test(nxt_lvlhsh_query_t *lhq, void *data)
|
|
||||||
{
|
|
||||||
return NXT_OK;
|
|
||||||
}
|
|
||||||
|
|
||||||
static const nxt_lvlhsh_proto_t lvlhsh_req_conn_proto nxt_aligned(64) = {
|
|
||||||
NXT_LVLHSH_DEFAULT,
|
|
||||||
nxt_req_conn_test,
|
|
||||||
nxt_lvlhsh_alloc,
|
|
||||||
nxt_lvlhsh_free,
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_event_engine_request_add(nxt_event_engine_t *engine,
|
|
||||||
nxt_req_conn_link_t *rc)
|
|
||||||
{
|
|
||||||
nxt_lvlhsh_query_t lhq;
|
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
|
|
||||||
lhq.key.length = sizeof(rc->req_id);
|
|
||||||
lhq.key.start = (u_char *) &rc->req_id;
|
|
||||||
lhq.proto = &lvlhsh_req_conn_proto;
|
|
||||||
lhq.replace = 0;
|
|
||||||
lhq.value = rc;
|
|
||||||
lhq.pool = engine->mem_pool;
|
|
||||||
|
|
||||||
switch (nxt_lvlhsh_insert(&engine->requests, &lhq)) {
|
|
||||||
|
|
||||||
case NXT_OK:
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn add failed",
|
|
||||||
rc->req_id);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
nxt_req_conn_link_t *
|
|
||||||
nxt_event_engine_request_find(nxt_event_engine_t *engine, nxt_req_id_t req_id)
|
|
||||||
{
|
|
||||||
nxt_lvlhsh_query_t lhq;
|
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
|
|
||||||
lhq.key.length = sizeof(req_id);
|
|
||||||
lhq.key.start = (u_char *) &req_id;
|
|
||||||
lhq.proto = &lvlhsh_req_conn_proto;
|
|
||||||
|
|
||||||
if (nxt_lvlhsh_find(&engine->requests, &lhq) == NXT_OK) {
|
|
||||||
return lhq.value;
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void
|
|
||||||
nxt_event_engine_request_remove(nxt_event_engine_t *engine,
|
|
||||||
nxt_req_conn_link_t *rc)
|
|
||||||
{
|
|
||||||
nxt_lvlhsh_query_t lhq;
|
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&rc->req_id, sizeof(rc->req_id));
|
|
||||||
lhq.key.length = sizeof(rc->req_id);
|
|
||||||
lhq.key.start = (u_char *) &rc->req_id;
|
|
||||||
lhq.proto = &lvlhsh_req_conn_proto;
|
|
||||||
lhq.pool = engine->mem_pool;
|
|
||||||
|
|
||||||
switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
|
|
||||||
|
|
||||||
case NXT_OK:
|
|
||||||
break;
|
|
||||||
|
|
||||||
default:
|
|
||||||
nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
|
|
||||||
rc->req_id);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
nxt_req_conn_link_t *
|
|
||||||
nxt_event_engine_request_find_remove(nxt_event_engine_t *engine,
|
|
||||||
nxt_req_id_t req_id)
|
|
||||||
{
|
|
||||||
nxt_lvlhsh_query_t lhq;
|
|
||||||
|
|
||||||
lhq.key_hash = nxt_murmur_hash2(&req_id, sizeof(req_id));
|
|
||||||
lhq.key.length = sizeof(req_id);
|
|
||||||
lhq.key.start = (u_char *) &req_id;
|
|
||||||
lhq.proto = &lvlhsh_req_conn_proto;
|
|
||||||
lhq.pool = engine->mem_pool;
|
|
||||||
|
|
||||||
switch (nxt_lvlhsh_delete(&engine->requests, &lhq)) {
|
|
||||||
|
|
||||||
case NXT_OK:
|
|
||||||
return lhq.value;
|
|
||||||
|
|
||||||
default:
|
|
||||||
nxt_thread_log_error(NXT_LOG_WARN, "req %08uxD to conn remove failed",
|
|
||||||
req_id);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#if (NXT_DEBUG)
|
#if (NXT_DEBUG)
|
||||||
|
|
||||||
void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
|
void nxt_event_engine_thread_adopt(nxt_event_engine_t *engine)
|
||||||
|
|||||||
@@ -491,7 +491,6 @@ struct nxt_event_engine_s {
|
|||||||
nxt_queue_t joints;
|
nxt_queue_t joints;
|
||||||
nxt_queue_t listen_connections;
|
nxt_queue_t listen_connections;
|
||||||
nxt_queue_t idle_connections;
|
nxt_queue_t idle_connections;
|
||||||
nxt_lvlhsh_t requests; /* req_id to nxt_req_conn_link_t */
|
|
||||||
|
|
||||||
nxt_queue_link_t link;
|
nxt_queue_link_t link;
|
||||||
// STUB: router link
|
// STUB: router link
|
||||||
@@ -512,15 +511,6 @@ NXT_EXPORT void nxt_event_engine_post(nxt_event_engine_t *engine,
|
|||||||
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
|
NXT_EXPORT void nxt_event_engine_signal(nxt_event_engine_t *engine,
|
||||||
nxt_uint_t signo);
|
nxt_uint_t signo);
|
||||||
|
|
||||||
NXT_EXPORT void nxt_event_engine_request_add(nxt_event_engine_t *engine,
|
|
||||||
nxt_req_conn_link_t *rc);
|
|
||||||
NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find(
|
|
||||||
nxt_event_engine_t *engine, nxt_req_id_t req_id);
|
|
||||||
NXT_EXPORT void nxt_event_engine_request_remove(nxt_event_engine_t *engine,
|
|
||||||
nxt_req_conn_link_t *rc);
|
|
||||||
NXT_EXPORT nxt_req_conn_link_t *nxt_event_engine_request_find_remove(
|
|
||||||
nxt_event_engine_t *engine, nxt_req_id_t req_id);
|
|
||||||
|
|
||||||
|
|
||||||
nxt_inline nxt_event_engine_t *
|
nxt_inline nxt_event_engine_t *
|
||||||
nxt_thread_event_engine(void)
|
nxt_thread_event_engine(void)
|
||||||
|
|||||||
@@ -171,11 +171,14 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
nxt_mp_t *mp;
|
nxt_mp_t *mp;
|
||||||
nxt_int_t ret;
|
nxt_int_t ret;
|
||||||
nxt_buf_t *b;
|
nxt_buf_t *b;
|
||||||
|
nxt_port_t *port;
|
||||||
nxt_conf_value_t *conf;
|
nxt_conf_value_t *conf;
|
||||||
nxt_common_app_conf_t app_conf;
|
nxt_common_app_conf_t app_conf;
|
||||||
|
|
||||||
static nxt_str_t nobody = nxt_string("nobody");
|
static nxt_str_t nobody = nxt_string("nobody");
|
||||||
|
|
||||||
|
ret = NXT_ERROR;
|
||||||
|
|
||||||
b = msg->buf;
|
b = msg->buf;
|
||||||
|
|
||||||
nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
|
nxt_debug(task, "main start worker: %*s", b->mem.free - b->mem.pos,
|
||||||
@@ -196,7 +199,8 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
|
|
||||||
if (conf == NULL) {
|
if (conf == NULL) {
|
||||||
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
|
nxt_log(task, NXT_LOG_CRIT, "configuration parsing error");
|
||||||
return;
|
|
||||||
|
goto failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
app_conf.user = nobody;
|
app_conf.user = nobody;
|
||||||
@@ -205,12 +209,24 @@ nxt_port_main_start_worker_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
nxt_nitems(nxt_common_app_conf), &app_conf);
|
nxt_nitems(nxt_common_app_conf), &app_conf);
|
||||||
if (ret != NXT_OK) {
|
if (ret != NXT_OK) {
|
||||||
nxt_log(task, NXT_LOG_CRIT, "root map error");
|
nxt_log(task, NXT_LOG_CRIT, "root map error");
|
||||||
return;
|
|
||||||
|
goto failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = nxt_main_start_worker_process(task, task->thread->runtime,
|
ret = nxt_main_start_worker_process(task, task->thread->runtime,
|
||||||
&app_conf, msg->port_msg.stream);
|
&app_conf, msg->port_msg.stream);
|
||||||
|
|
||||||
|
failed:
|
||||||
|
|
||||||
|
if (ret == NXT_ERROR) {
|
||||||
|
port = nxt_runtime_port_find(task->thread->runtime, msg->port_msg.pid,
|
||||||
|
msg->port_msg.reply_port);
|
||||||
|
if (nxt_fast_path(port != NULL)) {
|
||||||
|
nxt_port_socket_write(task, port, NXT_PORT_MSG_RPC_ERROR,
|
||||||
|
-1, msg->port_msg.stream, 0, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
nxt_mp_destroy(mp);
|
nxt_mp_destroy(mp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ nxt_port_mp_cleanup(nxt_task_t *task, void *obj, void *data)
|
|||||||
nxt_assert(port->pair[0] == -1);
|
nxt_assert(port->pair[0] == -1);
|
||||||
nxt_assert(port->pair[1] == -1);
|
nxt_assert(port->pair[1] == -1);
|
||||||
|
|
||||||
nxt_assert(port->app_req_id == 0);
|
nxt_assert(port->app_stream == 0);
|
||||||
nxt_assert(port->app_link.next == NULL);
|
nxt_assert(port->app_link.next == NULL);
|
||||||
|
|
||||||
nxt_assert(nxt_queue_is_empty(&port->messages));
|
nxt_assert(nxt_queue_is_empty(&port->messages));
|
||||||
@@ -58,7 +58,6 @@ nxt_port_new(nxt_task_t *task, nxt_port_id_t id, nxt_pid_t pid,
|
|||||||
port->pid = pid;
|
port->pid = pid;
|
||||||
port->type = type;
|
port->type = type;
|
||||||
port->mem_pool = mp;
|
port->mem_pool = mp;
|
||||||
port->next_stream = 1;
|
|
||||||
|
|
||||||
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
|
nxt_mp_cleanup(mp, nxt_port_mp_cleanup, task, port, mp);
|
||||||
|
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ struct nxt_port_s {
|
|||||||
uint32_t max_size;
|
uint32_t max_size;
|
||||||
/* Maximum interleave of message parts. */
|
/* Maximum interleave of message parts. */
|
||||||
uint32_t max_share;
|
uint32_t max_share;
|
||||||
uint32_t app_req_id;
|
uint32_t app_stream;
|
||||||
|
|
||||||
nxt_port_handler_t handler;
|
nxt_port_handler_t handler;
|
||||||
nxt_port_handler_t *data;
|
nxt_port_handler_t *data;
|
||||||
@@ -122,7 +122,6 @@ struct nxt_port_s {
|
|||||||
|
|
||||||
nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
|
nxt_lvlhsh_t rpc_streams; /* stream to nxt_port_rpc_reg_t */
|
||||||
nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
|
nxt_lvlhsh_t rpc_peers; /* peer to queue of nxt_port_rpc_reg_t */
|
||||||
uint32_t next_stream;
|
|
||||||
|
|
||||||
nxt_process_type_t type;
|
nxt_process_type_t type;
|
||||||
nxt_work_t work;
|
nxt_work_t work;
|
||||||
|
|||||||
@@ -8,6 +8,8 @@
|
|||||||
#include <nxt_port_rpc.h>
|
#include <nxt_port_rpc.h>
|
||||||
|
|
||||||
|
|
||||||
|
static nxt_atomic_t nxt_stream_ident = 1;
|
||||||
|
|
||||||
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
|
typedef struct nxt_port_rpc_reg_s nxt_port_rpc_reg_t;
|
||||||
|
|
||||||
struct nxt_port_rpc_reg_s {
|
struct nxt_port_rpc_reg_s {
|
||||||
@@ -15,6 +17,7 @@ struct nxt_port_rpc_reg_s {
|
|||||||
|
|
||||||
nxt_pid_t peer;
|
nxt_pid_t peer;
|
||||||
nxt_queue_link_t link;
|
nxt_queue_link_t link;
|
||||||
|
nxt_bool_t link_first;
|
||||||
|
|
||||||
nxt_port_rpc_handler_t ready_handler;
|
nxt_port_rpc_handler_t ready_handler;
|
||||||
nxt_port_rpc_handler_t error_handler;
|
nxt_port_rpc_handler_t error_handler;
|
||||||
@@ -61,31 +64,58 @@ uint32_t
|
|||||||
nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
||||||
nxt_pid_t peer, void *data)
|
nxt_pid_t peer, void *data)
|
||||||
|
{
|
||||||
|
void *ex;
|
||||||
|
nxt_port_rpc_reg_t *reg;
|
||||||
|
|
||||||
|
ex = nxt_port_rpc_register_handler_ex(task, port, ready_handler,
|
||||||
|
error_handler, 0);
|
||||||
|
|
||||||
|
if (ex == NULL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (peer != -1) {
|
||||||
|
nxt_port_rpc_ex_set_peer(task, port, ex, peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
|
||||||
|
|
||||||
|
nxt_assert(reg->data == ex);
|
||||||
|
|
||||||
|
reg->data = data;
|
||||||
|
|
||||||
|
return reg->stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void *
|
||||||
|
nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
|
||||||
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
||||||
|
size_t ex_size)
|
||||||
{
|
{
|
||||||
uint32_t stream;
|
uint32_t stream;
|
||||||
nxt_queue_link_t *peer_link;
|
|
||||||
nxt_port_rpc_reg_t *reg;
|
nxt_port_rpc_reg_t *reg;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
|
||||||
nxt_assert(port->pair[0] != -1);
|
nxt_assert(port->pair[0] != -1);
|
||||||
|
|
||||||
stream = port->next_stream++;
|
stream =
|
||||||
|
(uint32_t) nxt_atomic_fetch_add(&nxt_stream_ident, 1) & 0x3fffffff;
|
||||||
|
|
||||||
reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t));
|
reg = nxt_mp_zalloc(port->mem_pool, sizeof(nxt_port_rpc_reg_t) + ex_size);
|
||||||
|
|
||||||
if (nxt_slow_path(reg == NULL)) {
|
if (nxt_slow_path(reg == NULL)) {
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to allocate "
|
nxt_debug(task, "rpc: stream #%uD failed to allocate reg", stream);
|
||||||
"reg for stream #%uD", stream);
|
|
||||||
|
|
||||||
return 0;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
reg->stream = stream;
|
reg->stream = stream;
|
||||||
reg->peer = peer;
|
reg->peer = -1;
|
||||||
reg->ready_handler = ready_handler;
|
reg->ready_handler = ready_handler;
|
||||||
reg->error_handler = error_handler;
|
reg->error_handler = error_handler;
|
||||||
reg->data = data;
|
reg->data = reg + 1;
|
||||||
|
|
||||||
|
|
||||||
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
||||||
lhq.replace = 0;
|
lhq.replace = 0;
|
||||||
@@ -98,39 +128,140 @@ nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add handler "
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
|
||||||
"for stream #%uD", stream);
|
"reg ", stream);
|
||||||
|
|
||||||
nxt_mp_free(port->mem_pool, reg);
|
nxt_mp_free(port->mem_pool, reg);
|
||||||
|
|
||||||
return 0;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (peer != -1) {
|
nxt_debug(task, "rpc: stream #%uD registered", stream);
|
||||||
nxt_port_rpc_lhq_peer(&lhq, &peer);
|
|
||||||
lhq.replace = 0;
|
return reg->data;
|
||||||
lhq.value = ®->link;
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uint32_t
|
||||||
|
nxt_port_rpc_ex_stream(void *ex)
|
||||||
|
{
|
||||||
|
nxt_port_rpc_reg_t *reg;
|
||||||
|
|
||||||
|
reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
|
||||||
|
|
||||||
|
nxt_assert(reg->data == ex);
|
||||||
|
|
||||||
|
return reg->stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
|
||||||
|
void *ex, nxt_pid_t peer)
|
||||||
|
{
|
||||||
|
nxt_int_t ret;
|
||||||
|
nxt_queue_link_t *peer_link;
|
||||||
|
nxt_port_rpc_reg_t *reg;
|
||||||
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
|
||||||
|
reg = nxt_pointer_to(ex, -sizeof(nxt_port_rpc_reg_t));
|
||||||
|
|
||||||
|
nxt_assert(reg->data == ex);
|
||||||
|
|
||||||
|
if (peer == -1 || reg->peer != -1) {
|
||||||
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to "
|
||||||
|
"change peer %PI->%PI", reg->stream, reg->peer, peer);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
reg->peer = peer;
|
||||||
|
|
||||||
|
nxt_port_rpc_lhq_peer(&lhq, &peer);
|
||||||
|
lhq.replace = 0;
|
||||||
|
lhq.value = ®->link;
|
||||||
|
lhq.pool = port->mem_pool;
|
||||||
|
|
||||||
|
ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
|
||||||
|
|
||||||
|
switch (ret) {
|
||||||
|
|
||||||
|
case NXT_OK:
|
||||||
|
reg->link_first = 1;
|
||||||
|
nxt_queue_self(®->link);
|
||||||
|
|
||||||
|
nxt_debug(task, "rpc: stream #%uD assigned uniq pid %PI (%p)",
|
||||||
|
reg->stream, reg->peer, reg->link.next);
|
||||||
|
break;
|
||||||
|
|
||||||
|
case NXT_DECLINED:
|
||||||
|
reg->link_first = 0;
|
||||||
|
peer_link = lhq.value;
|
||||||
|
nxt_queue_insert_after(peer_link, ®->link);
|
||||||
|
|
||||||
|
nxt_debug(task, "rpc: stream #%uD assigned duplicate pid %PI (%p)",
|
||||||
|
reg->stream, reg->peer, reg->link.next);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed to add "
|
||||||
|
"peer for stream #%uD (%d)", reg->stream, ret);
|
||||||
|
|
||||||
|
reg->peer = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_port_rpc_remove_from_peers(nxt_task_t *task, nxt_port_t *port,
|
||||||
|
nxt_port_rpc_reg_t *reg)
|
||||||
|
{
|
||||||
|
uint32_t stream;
|
||||||
|
nxt_int_t ret;
|
||||||
|
nxt_lvlhsh_query_t lhq;
|
||||||
|
nxt_port_rpc_reg_t *r;
|
||||||
|
|
||||||
|
stream = reg->stream;
|
||||||
|
|
||||||
|
if (reg->link_first != 0) {
|
||||||
|
nxt_port_rpc_lhq_peer(&lhq, ®->peer);
|
||||||
lhq.pool = port->mem_pool;
|
lhq.pool = port->mem_pool;
|
||||||
|
|
||||||
switch (nxt_lvlhsh_insert(&port->rpc_peers, &lhq)) {
|
if (reg->link.next == ®->link) {
|
||||||
|
nxt_assert(reg->link.prev == ®->link);
|
||||||
|
|
||||||
case NXT_OK:
|
nxt_debug(task, "rpc: stream #%uD remove first and last pid %PI "
|
||||||
nxt_queue_self(®->link);
|
"registration (%p)", stream, reg->peer, reg->link.next);
|
||||||
break;
|
|
||||||
|
|
||||||
case NXT_DECLINED:
|
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
||||||
peer_link = lhq.value;
|
} else {
|
||||||
nxt_queue_insert_before(peer_link, ®->link);
|
nxt_debug(task, "rpc: stream #%uD remove first pid %PI "
|
||||||
break;
|
"registration (%p)", stream, reg->peer, reg->link.next);
|
||||||
|
|
||||||
default:
|
lhq.replace = 1;
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to add peer "
|
lhq.value = reg->link.next;
|
||||||
"for stream #%uD", stream);
|
|
||||||
break;
|
r = nxt_queue_link_data(reg->link.next, nxt_port_rpc_reg_t, link);
|
||||||
|
r->link_first = 1;
|
||||||
|
|
||||||
|
nxt_queue_remove(®->link);
|
||||||
|
|
||||||
|
ret = nxt_lvlhsh_insert(&port->rpc_peers, &lhq);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
nxt_debug(task, "rpc: stream #%uD remove pid %PI "
|
||||||
|
"registration (%p)", stream, reg->peer, reg->link.next);
|
||||||
|
|
||||||
|
nxt_queue_remove(®->link);
|
||||||
|
ret = NXT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
return stream;
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
|
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: stream #%uD failed"
|
||||||
|
" to delete peer %PI (%d)", stream, reg->peer, ret);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -150,8 +281,6 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
last = msg->port_msg.last;
|
last = msg->port_msg.last;
|
||||||
type = msg->port_msg.type;
|
type = msg->port_msg.type;
|
||||||
|
|
||||||
nxt_debug(task, "rpc: handler for stream #%uD, type %d", stream, type);
|
|
||||||
|
|
||||||
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
||||||
lhq.pool = port->mem_pool;
|
lhq.pool = port->mem_pool;
|
||||||
|
|
||||||
@@ -163,11 +292,14 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (ret != NXT_OK) {
|
if (ret != NXT_OK) {
|
||||||
nxt_debug(task, "rpc: no handler found for stream #%uD", stream);
|
nxt_debug(task, "rpc: stream #%uD no handler found", stream);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "rpc: stream #%uD %shandler, type %d", stream,
|
||||||
|
(last ? "last " : ""), type);
|
||||||
|
|
||||||
reg = lhq.value;
|
reg = lhq.value;
|
||||||
|
|
||||||
if (reg->peer != -1) {
|
if (reg->peer != -1) {
|
||||||
@@ -182,28 +314,15 @@ nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (last == 0) {
|
if (last == 0) {
|
||||||
nxt_debug(task, "rpc: keep handler for stream #%uD", stream);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reg->peer != -1) {
|
if (reg->peer != -1) {
|
||||||
if (reg->link.next == ®->link) {
|
nxt_port_rpc_remove_from_peers(task, port, reg);
|
||||||
nxt_port_rpc_lhq_peer(&lhq, ®->peer);
|
|
||||||
lhq.pool = port->mem_pool;
|
|
||||||
|
|
||||||
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
|
||||||
|
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log, "rpc: failed to delete "
|
|
||||||
"peer %PI", reg->peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
nxt_queue_remove(®->link);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "rpc: stream #%uD free registration", stream);
|
||||||
|
|
||||||
nxt_mp_free(port->mem_pool, reg);
|
nxt_mp_free(port->mem_pool, reg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,7 +334,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
|
|||||||
uint32_t stream;
|
uint32_t stream;
|
||||||
nxt_int_t ret;
|
nxt_int_t ret;
|
||||||
nxt_buf_t buf;
|
nxt_buf_t buf;
|
||||||
nxt_queue_link_t *peer_link;
|
nxt_queue_link_t *peer_link, *next_link;
|
||||||
nxt_port_rpc_reg_t *reg;
|
nxt_port_rpc_reg_t *reg;
|
||||||
nxt_lvlhsh_query_t lhq;
|
nxt_lvlhsh_query_t lhq;
|
||||||
nxt_port_recv_msg_t msg;
|
nxt_port_recv_msg_t msg;
|
||||||
@@ -226,7 +345,7 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
|
|||||||
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
||||||
|
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
nxt_debug(task, "rpc: no handler found for peer %PI", peer);
|
nxt_debug(task, "rpc: no reg found for peer %PI", peer);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -249,15 +368,16 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
|
|||||||
|
|
||||||
reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
|
reg = nxt_queue_link_data(peer_link, nxt_port_rpc_reg_t, link);
|
||||||
|
|
||||||
nxt_debug(task, "rpc: trigger error for stream #%uD", reg->stream);
|
nxt_assert(reg->peer == peer);
|
||||||
|
|
||||||
msg.port_msg.stream = reg->stream;
|
|
||||||
|
|
||||||
reg->error_handler(task, &msg, reg->data);
|
|
||||||
|
|
||||||
|
|
||||||
stream = reg->stream;
|
stream = reg->stream;
|
||||||
|
|
||||||
|
nxt_debug(task, "rpc: stream #%uD trigger error", stream);
|
||||||
|
|
||||||
|
msg.port_msg.stream = stream;
|
||||||
|
|
||||||
|
reg->error_handler(task, &msg, reg->data);
|
||||||
|
|
||||||
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
nxt_port_rpc_lhq_stream(&lhq, &stream);
|
||||||
lhq.pool = port->mem_pool;
|
lhq.pool = port->mem_pool;
|
||||||
|
|
||||||
@@ -265,18 +385,24 @@ nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port, nxt_pid_t peer)
|
|||||||
|
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
if (nxt_slow_path(ret != NXT_OK)) {
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log,
|
nxt_log_error(NXT_LOG_ERR, task->log,
|
||||||
"rpc: failed to delete handler for stream #%uD",
|
"rpc: stream #%uD failed to delete handler", stream);
|
||||||
stream);
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (peer_link == peer_link->next) {
|
if (peer_link == peer_link->next) {
|
||||||
|
nxt_assert(peer_link->prev == peer_link);
|
||||||
|
|
||||||
last = 1;
|
last = 1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
peer_link = peer_link->next;
|
nxt_assert(peer_link->next->prev == peer_link);
|
||||||
nxt_queue_remove(peer_link->prev);
|
nxt_assert(peer_link->prev->next == peer_link);
|
||||||
|
|
||||||
|
next_link = peer_link->next;
|
||||||
|
nxt_queue_remove(peer_link);
|
||||||
|
|
||||||
|
peer_link = next_link;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_mp_free(port->mem_pool, reg);
|
nxt_mp_free(port->mem_pool, reg);
|
||||||
@@ -297,7 +423,7 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
|
|||||||
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
|
ret = nxt_lvlhsh_delete(&port->rpc_streams, &lhq);
|
||||||
|
|
||||||
if (ret != NXT_OK) {
|
if (ret != NXT_OK) {
|
||||||
nxt_debug(task, "rpc: no handler found for stream %uxD", stream);
|
nxt_debug(task, "rpc: stream #%uD no handler found", stream);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -305,21 +431,10 @@ nxt_port_rpc_cancel(nxt_task_t *task, nxt_port_t *port, uint32_t stream)
|
|||||||
reg = lhq.value;
|
reg = lhq.value;
|
||||||
|
|
||||||
if (reg->peer != -1) {
|
if (reg->peer != -1) {
|
||||||
if (reg->link.next == ®->link) {
|
nxt_port_rpc_remove_from_peers(task, port, reg);
|
||||||
nxt_port_rpc_lhq_peer(&lhq, ®->peer);
|
|
||||||
lhq.pool = port->mem_pool;
|
|
||||||
|
|
||||||
ret = nxt_lvlhsh_delete(&port->rpc_peers, &lhq);
|
|
||||||
|
|
||||||
if (nxt_slow_path(ret != NXT_OK)) {
|
|
||||||
nxt_log_error(NXT_LOG_ERR, task->log,
|
|
||||||
"rpc: failed to delete peer %PI", reg->peer);
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
nxt_queue_remove(®->link);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "rpc: stream #%uD cancel registration", stream);
|
||||||
|
|
||||||
nxt_mp_free(port->mem_pool, reg);
|
nxt_mp_free(port->mem_pool, reg);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,14 @@ typedef void (*nxt_port_rpc_handler_t)(nxt_task_t *task,
|
|||||||
uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
uint32_t nxt_port_rpc_register_handler(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
||||||
nxt_pid_t peer, void *data);
|
nxt_pid_t peer, void *data);
|
||||||
|
void *nxt_port_rpc_register_handler_ex(nxt_task_t *task, nxt_port_t *port,
|
||||||
|
nxt_port_rpc_handler_t ready_handler, nxt_port_rpc_handler_t error_handler,
|
||||||
|
size_t ex_size);
|
||||||
|
|
||||||
|
uint32_t nxt_port_rpc_ex_stream(void *ex);
|
||||||
|
void nxt_port_rpc_ex_set_peer(nxt_task_t *task, nxt_port_t *port,
|
||||||
|
void *ex, nxt_pid_t peer);
|
||||||
|
|
||||||
void nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
void nxt_port_rpc_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg);
|
||||||
void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port,
|
void nxt_port_rpc_remove_peer(nxt_task_t *task, nxt_port_t *port,
|
||||||
nxt_pid_t peer);
|
nxt_pid_t peer);
|
||||||
|
|||||||
418
src/nxt_router.c
418
src/nxt_router.c
@@ -10,8 +10,11 @@
|
|||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
nxt_str_t type;
|
nxt_str_t type;
|
||||||
uint32_t workers;
|
uint32_t workers;
|
||||||
|
nxt_msec_t timeout;
|
||||||
|
uint32_t requests;
|
||||||
|
nxt_conf_value_t *limits_value;
|
||||||
} nxt_router_app_conf_t;
|
} nxt_router_app_conf_t;
|
||||||
|
|
||||||
|
|
||||||
@@ -31,9 +34,20 @@ struct nxt_start_worker_s {
|
|||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
struct nxt_req_app_link_s {
|
typedef struct {
|
||||||
nxt_req_id_t req_id;
|
uint32_t stream;
|
||||||
|
nxt_conn_t *conn;
|
||||||
nxt_port_t *app_port;
|
nxt_port_t *app_port;
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
|
|
||||||
|
nxt_queue_link_t link; /* for nxt_conn_t.requests */
|
||||||
|
} nxt_req_conn_link_t;
|
||||||
|
|
||||||
|
|
||||||
|
struct nxt_req_app_link_s {
|
||||||
|
uint32_t stream;
|
||||||
|
nxt_port_t *app_port;
|
||||||
|
nxt_pid_t app_pid;
|
||||||
nxt_port_t *reply_port;
|
nxt_port_t *reply_port;
|
||||||
nxt_app_parse_ctx_t *ap;
|
nxt_app_parse_ctx_t *ap;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_conn_link_t *rc;
|
||||||
@@ -51,6 +65,18 @@ typedef struct {
|
|||||||
} nxt_socket_rpc_t;
|
} nxt_socket_rpc_t;
|
||||||
|
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
nxt_mp_t *mem_pool;
|
||||||
|
nxt_port_recv_msg_t msg;
|
||||||
|
nxt_work_t work;
|
||||||
|
} nxt_remove_pid_msg_t;
|
||||||
|
|
||||||
|
|
||||||
|
static void nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj,
|
||||||
|
void *data);
|
||||||
|
static void nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj,
|
||||||
|
void *data);
|
||||||
|
|
||||||
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
|
static nxt_router_temp_conf_t *nxt_router_temp_conf(nxt_task_t *task);
|
||||||
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conf_apply(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_router_conf_ready(nxt_task_t *task,
|
static void nxt_router_conf_ready(nxt_task_t *task,
|
||||||
@@ -105,8 +131,6 @@ static void nxt_router_engines_post(nxt_router_t *router,
|
|||||||
nxt_router_temp_conf_t *tmcf);
|
nxt_router_temp_conf_t *tmcf);
|
||||||
static void nxt_router_engine_post(nxt_event_engine_t *engine,
|
static void nxt_router_engine_post(nxt_event_engine_t *engine,
|
||||||
nxt_work_t *jobs);
|
nxt_work_t *jobs);
|
||||||
static void nxt_router_app_data_handler(nxt_task_t *task,
|
|
||||||
nxt_port_recv_msg_t *msg);
|
|
||||||
|
|
||||||
static void nxt_router_thread_start(void *data);
|
static void nxt_router_thread_start(void *data);
|
||||||
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
|
static void nxt_router_listen_socket_create(nxt_task_t *task, void *obj,
|
||||||
@@ -129,7 +153,7 @@ static void nxt_router_conf_release(nxt_task_t *task,
|
|||||||
static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
|
static void nxt_router_send_sw_request(nxt_task_t *task, void *obj,
|
||||||
void *data);
|
void *data);
|
||||||
static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
|
static nxt_bool_t nxt_router_app_free(nxt_task_t *task, nxt_app_t *app);
|
||||||
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id);
|
static nxt_port_t * nxt_router_app_get_port(nxt_app_t *app, uint32_t stream);
|
||||||
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
|
static void nxt_router_app_release_port(nxt_task_t *task, void *obj,
|
||||||
void *data);
|
void *data);
|
||||||
|
|
||||||
@@ -153,6 +177,7 @@ static void nxt_router_conn_close(nxt_task_t *task, void *obj, void *data);
|
|||||||
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conn_free(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conn_error(nxt_task_t *task, void *obj, void *data);
|
||||||
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
|
static void nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data);
|
||||||
|
static void nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data);
|
||||||
static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
|
static nxt_msec_t nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data);
|
||||||
|
|
||||||
static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
|
static void nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
|
||||||
@@ -213,8 +238,8 @@ nxt_router_sw_create(nxt_task_t *task, nxt_app_t *app, nxt_req_app_link_t *ra)
|
|||||||
sw->app = app;
|
sw->app = app;
|
||||||
sw->ra = ra;
|
sw->ra = ra;
|
||||||
|
|
||||||
nxt_debug(task, "sw %p create, request #%uxD, app '%V' %p", sw,
|
nxt_debug(task, "sw %p create, stream #%uD, app '%V' %p", sw,
|
||||||
ra->req_id, &app->name, app);
|
ra->stream, &app->name, app);
|
||||||
|
|
||||||
rt = task->thread->runtime;
|
rt = task->thread->runtime;
|
||||||
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
|
main_port = rt->port_by_type[NXT_PROCESS_MAIN];
|
||||||
@@ -248,13 +273,29 @@ nxt_router_sw_release(nxt_task_t *task, nxt_start_worker_t *sw)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
nxt_inline void
|
||||||
|
nxt_router_rc_unlink(nxt_req_conn_link_t *rc)
|
||||||
|
{
|
||||||
|
nxt_queue_remove(&rc->link);
|
||||||
|
|
||||||
|
if (rc->ra != NULL) {
|
||||||
|
rc->ra->rc = NULL;
|
||||||
|
rc->ra = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
rc->conn = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_req_app_link_t *
|
static nxt_req_app_link_t *
|
||||||
nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
||||||
{
|
{
|
||||||
nxt_mp_t *mp;
|
nxt_mp_t *mp;
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
nxt_req_app_link_t *ra;
|
nxt_req_app_link_t *ra;
|
||||||
|
|
||||||
mp = rc->conn->mem_pool;
|
mp = rc->conn->mem_pool;
|
||||||
|
engine = task->thread->engine;
|
||||||
|
|
||||||
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
|
ra = nxt_mp_retain(mp, sizeof(nxt_req_app_link_t));
|
||||||
|
|
||||||
@@ -262,20 +303,22 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_debug(task, "ra #%uxD create", rc->req_id);
|
nxt_debug(task, "ra stream #%uD create", rc->stream);
|
||||||
|
|
||||||
nxt_memzero(ra, sizeof(nxt_req_app_link_t));
|
nxt_memzero(ra, sizeof(nxt_req_app_link_t));
|
||||||
|
|
||||||
ra->req_id = rc->req_id;
|
ra->stream = rc->stream;
|
||||||
ra->app_port = NULL;
|
ra->app_pid = -1;
|
||||||
ra->rc = rc;
|
ra->rc = rc;
|
||||||
|
rc->ra = ra;
|
||||||
|
ra->reply_port = engine->port;
|
||||||
|
|
||||||
ra->mem_pool = mp;
|
ra->mem_pool = mp;
|
||||||
|
|
||||||
ra->work.handler = NULL;
|
ra->work.handler = NULL;
|
||||||
ra->work.task = &task->thread->engine->task;
|
ra->work.task = &engine->task;
|
||||||
ra->work.obj = ra;
|
ra->work.obj = ra;
|
||||||
ra->work.data = task->thread->engine;
|
ra->work.data = engine;
|
||||||
|
|
||||||
return ra;
|
return ra;
|
||||||
}
|
}
|
||||||
@@ -284,6 +327,62 @@ nxt_router_ra_create(nxt_task_t *task, nxt_req_conn_link_t *rc)
|
|||||||
static void
|
static void
|
||||||
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
|
nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
|
||||||
{
|
{
|
||||||
|
nxt_port_t *app_port;
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
|
|
||||||
|
ra = obj;
|
||||||
|
engine = data;
|
||||||
|
|
||||||
|
if (ra->app_port != NULL) {
|
||||||
|
|
||||||
|
app_port = ra->app_port;
|
||||||
|
ra->app_port = NULL;
|
||||||
|
|
||||||
|
if (task->thread->engine != engine) {
|
||||||
|
ra->app_pid = app_port->pid;
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_router_app_release_port(task, app_port, app_port->app);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
/* Uncomment to hold app port until complete response received. */
|
||||||
|
if (ra->rc != NULL) {
|
||||||
|
ra->rc->app_port = ra->app_port;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
if (task->thread->engine != engine) {
|
||||||
|
ra->work.handler = nxt_router_ra_release;
|
||||||
|
ra->work.task = &engine->task;
|
||||||
|
ra->work.next = NULL;
|
||||||
|
|
||||||
|
nxt_debug(task, "ra stream #%uD post release to %p",
|
||||||
|
ra->stream, engine);
|
||||||
|
|
||||||
|
nxt_event_engine_post(engine, &ra->work);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ra->rc != NULL && ra->app_pid != -1) {
|
||||||
|
nxt_port_rpc_ex_set_peer(task, engine->port, ra->rc, ra->app_pid);
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_debug(task, "ra stream #%uD release", ra->stream);
|
||||||
|
|
||||||
|
nxt_mp_release(ra->mem_pool, ra);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_ra_abort(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_conn_t *c;
|
||||||
nxt_req_app_link_t *ra;
|
nxt_req_app_link_t *ra;
|
||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
|
|
||||||
@@ -291,32 +390,24 @@ nxt_router_ra_release(nxt_task_t *task, void *obj, void *data)
|
|||||||
engine = data;
|
engine = data;
|
||||||
|
|
||||||
if (task->thread->engine != engine) {
|
if (task->thread->engine != engine) {
|
||||||
ra->work.handler = nxt_router_ra_release;
|
ra->work.handler = nxt_router_ra_abort;
|
||||||
ra->work.task = &engine->task;
|
ra->work.task = &engine->task;
|
||||||
ra->work.next = NULL;
|
ra->work.next = NULL;
|
||||||
|
|
||||||
nxt_debug(task, "ra #%uxD post release to %p", ra->req_id, engine);
|
nxt_debug(task, "ra stream #%uD post abort to %p", ra->stream, engine);
|
||||||
|
|
||||||
nxt_event_engine_post(engine, &ra->work);
|
nxt_event_engine_post(engine, &ra->work);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_debug(task, "ra #%uxD release", ra->req_id);
|
nxt_debug(task, "ra stream #%uD abort", ra->stream);
|
||||||
|
|
||||||
if (ra->app_port != NULL) {
|
if (ra->rc != NULL) {
|
||||||
|
c = ra->rc->conn;
|
||||||
|
|
||||||
nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
|
nxt_router_gen_error(task, c, 500,
|
||||||
|
"Failed to start application worker");
|
||||||
#if 0
|
|
||||||
/* Uncomment to hold app port until complete response received. */
|
|
||||||
if (ra->rc->conn != NULL) {
|
|
||||||
ra->rc->app_port = ra->app_port;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
nxt_router_app_release_port(task, ra->app_port, ra->app_port->app);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_mp_release(ra->mem_pool, ra);
|
nxt_mp_release(ra->mem_pool, ra);
|
||||||
@@ -384,18 +475,83 @@ nxt_router_conf_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
void
|
void
|
||||||
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_router_remove_pid_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
||||||
{
|
{
|
||||||
|
nxt_mp_t *mp;
|
||||||
|
nxt_buf_t *buf;
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
|
nxt_remove_pid_msg_t *rp;
|
||||||
|
|
||||||
nxt_port_remove_pid_handler(task, msg);
|
nxt_port_remove_pid_handler(task, msg);
|
||||||
|
|
||||||
if (msg->port_msg.stream == 0) {
|
if (msg->port_msg.stream == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||||
|
|
||||||
|
buf = nxt_buf_mem_alloc(mp, nxt_buf_used_size(msg->buf), 0);
|
||||||
|
buf->mem.free = nxt_cpymem(buf->mem.free, msg->buf->mem.pos,
|
||||||
|
nxt_buf_used_size(msg->buf));
|
||||||
|
|
||||||
|
nxt_queue_each(engine, &nxt_router->engines, nxt_event_engine_t, link0)
|
||||||
|
{
|
||||||
|
rp = nxt_mp_retain(mp, sizeof(nxt_remove_pid_msg_t));
|
||||||
|
|
||||||
|
rp->mem_pool = mp;
|
||||||
|
|
||||||
|
rp->msg.fd = msg->fd;
|
||||||
|
rp->msg.buf = buf;
|
||||||
|
rp->msg.port = engine->port;
|
||||||
|
rp->msg.port_msg = msg->port_msg;
|
||||||
|
rp->msg.size = msg->size;
|
||||||
|
rp->msg.new_port = NULL;
|
||||||
|
|
||||||
|
rp->work.handler = nxt_router_worker_remove_pid_handler;
|
||||||
|
rp->work.task = &engine->task;
|
||||||
|
rp->work.obj = rp;
|
||||||
|
rp->work.data = task->thread->engine;
|
||||||
|
rp->work.next = NULL;
|
||||||
|
|
||||||
|
nxt_event_engine_post(engine, &rp->work);
|
||||||
|
}
|
||||||
|
nxt_queue_loop;
|
||||||
|
|
||||||
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
msg->port_msg.type = _NXT_PORT_MSG_RPC_ERROR;
|
||||||
|
|
||||||
nxt_port_rpc_handler(task, msg);
|
nxt_port_rpc_handler(task, msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_worker_remove_pid_handler(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
|
nxt_remove_pid_msg_t *rp;
|
||||||
|
|
||||||
|
rp = obj;
|
||||||
|
|
||||||
|
nxt_port_remove_pid_handler(task, &rp->msg);
|
||||||
|
|
||||||
|
engine = rp->work.data;
|
||||||
|
|
||||||
|
rp->work.handler = nxt_router_worker_remove_pid_done;
|
||||||
|
rp->work.task = &engine->task;
|
||||||
|
rp->work.next = NULL;
|
||||||
|
|
||||||
|
nxt_event_engine_post(engine, &rp->work);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_worker_remove_pid_done(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_remove_pid_msg_t *rp;
|
||||||
|
|
||||||
|
rp = obj;
|
||||||
|
|
||||||
|
nxt_mp_release(rp->mem_pool, rp);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_router_temp_conf_t *
|
static nxt_router_temp_conf_t *
|
||||||
nxt_router_temp_conf(nxt_task_t *task)
|
nxt_router_temp_conf(nxt_task_t *task)
|
||||||
{
|
{
|
||||||
@@ -607,6 +763,27 @@ static nxt_conf_map_t nxt_router_app_conf[] = {
|
|||||||
NXT_CONF_MAP_INT32,
|
NXT_CONF_MAP_INT32,
|
||||||
offsetof(nxt_router_app_conf_t, workers),
|
offsetof(nxt_router_app_conf_t, workers),
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
nxt_string("limits"),
|
||||||
|
NXT_CONF_MAP_PTR,
|
||||||
|
offsetof(nxt_router_app_conf_t, limits_value),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static nxt_conf_map_t nxt_router_app_limits_conf[] = {
|
||||||
|
{
|
||||||
|
nxt_string("timeout"),
|
||||||
|
NXT_CONF_MAP_MSEC,
|
||||||
|
offsetof(nxt_router_app_conf_t, timeout),
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
nxt_string("requests"),
|
||||||
|
NXT_CONF_MAP_INT32,
|
||||||
|
offsetof(nxt_router_app_conf_t, requests),
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@@ -754,6 +931,9 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
|||||||
}
|
}
|
||||||
|
|
||||||
apcf.workers = 1;
|
apcf.workers = 1;
|
||||||
|
apcf.timeout = 0;
|
||||||
|
apcf.requests = 0;
|
||||||
|
apcf.limits_value = NULL;
|
||||||
|
|
||||||
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
|
ret = nxt_conf_map_object(mp, application, nxt_router_app_conf,
|
||||||
nxt_nitems(nxt_router_app_conf), &apcf);
|
nxt_nitems(nxt_router_app_conf), &apcf);
|
||||||
@@ -762,8 +942,27 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
|||||||
goto app_fail;
|
goto app_fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (apcf.limits_value != NULL) {
|
||||||
|
|
||||||
|
if (nxt_conf_type(apcf.limits_value) != NXT_CONF_OBJECT) {
|
||||||
|
nxt_log(task, NXT_LOG_CRIT, "application limits is not object");
|
||||||
|
goto app_fail;
|
||||||
|
}
|
||||||
|
|
||||||
|
ret = nxt_conf_map_object(mp, apcf.limits_value,
|
||||||
|
nxt_router_app_limits_conf,
|
||||||
|
nxt_nitems(nxt_router_app_limits_conf),
|
||||||
|
&apcf);
|
||||||
|
if (ret != NXT_OK) {
|
||||||
|
nxt_log(task, NXT_LOG_CRIT, "application limits map error");
|
||||||
|
goto app_fail;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
nxt_debug(task, "application type: %V", &apcf.type);
|
nxt_debug(task, "application type: %V", &apcf.type);
|
||||||
nxt_debug(task, "application workers: %D", apcf.workers);
|
nxt_debug(task, "application workers: %D", apcf.workers);
|
||||||
|
nxt_debug(task, "application timeout: %D", apcf.timeout);
|
||||||
|
nxt_debug(task, "application requests: %D", apcf.requests);
|
||||||
|
|
||||||
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
|
lang = nxt_app_lang_module(task->thread->runtime, &apcf.type);
|
||||||
|
|
||||||
@@ -802,6 +1001,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
|
|||||||
|
|
||||||
app->type = type;
|
app->type = type;
|
||||||
app->max_workers = apcf.workers;
|
app->max_workers = apcf.workers;
|
||||||
|
app->timeout = apcf.timeout;
|
||||||
app->live = 1;
|
app->live = 1;
|
||||||
app->prepare_msg = nxt_app_prepare_msg[type];
|
app->prepare_msg = nxt_app_prepare_msg[type];
|
||||||
|
|
||||||
@@ -1589,7 +1789,7 @@ static nxt_port_handler_t nxt_router_app_port_handlers[] = {
|
|||||||
NULL, /* NXT_PORT_MSG_CHANGE_FILE */
|
NULL, /* NXT_PORT_MSG_CHANGE_FILE */
|
||||||
/* TODO: remove mmap_handler from app ports */
|
/* TODO: remove mmap_handler from app ports */
|
||||||
nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */
|
nxt_port_mmap_handler, /* NXT_PORT_MSG_MMAP */
|
||||||
nxt_router_app_data_handler,
|
nxt_port_rpc_handler, /* NXT_PORT_MSG_DATA */
|
||||||
NULL, /* NXT_PORT_MSG_REMOVE_PID */
|
NULL, /* NXT_PORT_MSG_REMOVE_PID */
|
||||||
NULL, /* NXT_PORT_MSG_READY */
|
NULL, /* NXT_PORT_MSG_READY */
|
||||||
NULL, /* NXT_PORT_MSG_START_WORKER */
|
NULL, /* NXT_PORT_MSG_START_WORKER */
|
||||||
@@ -2008,23 +2208,16 @@ static const nxt_conn_state_t nxt_router_conn_write_state
|
|||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||||
|
void *data)
|
||||||
{
|
{
|
||||||
size_t dump_size;
|
size_t dump_size;
|
||||||
nxt_buf_t *b, *last;
|
nxt_buf_t *b, *last;
|
||||||
nxt_conn_t *c;
|
nxt_conn_t *c;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_conn_link_t *rc;
|
||||||
nxt_event_engine_t *engine;
|
|
||||||
|
|
||||||
b = msg->buf;
|
b = msg->buf;
|
||||||
engine = task->thread->engine;
|
rc = data;
|
||||||
|
|
||||||
rc = nxt_event_engine_request_find(engine, msg->port_msg.stream);
|
|
||||||
if (nxt_slow_path(rc == NULL)) {
|
|
||||||
nxt_debug(task, "request id %08uxD not found", msg->port_msg.stream);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
c = rc->conn;
|
c = rc->conn;
|
||||||
|
|
||||||
@@ -2058,7 +2251,7 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
rc->app_port = NULL;
|
rc->app_port = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc->conn = NULL;
|
nxt_router_rc_unlink(rc);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (b == NULL) {
|
if (b == NULL) {
|
||||||
@@ -2084,6 +2277,21 @@ nxt_router_app_data_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_response_error_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
|
||||||
|
void *data)
|
||||||
|
{
|
||||||
|
nxt_req_conn_link_t *rc;
|
||||||
|
|
||||||
|
rc = data;
|
||||||
|
|
||||||
|
nxt_router_gen_error(task, rc->conn, 500,
|
||||||
|
"Application terminated unexpectedly");
|
||||||
|
|
||||||
|
nxt_router_rc_unlink(rc);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
nxt_inline const char *
|
nxt_inline const char *
|
||||||
nxt_router_text_by_code(int code)
|
nxt_router_text_by_code(int code)
|
||||||
{
|
{
|
||||||
@@ -2147,20 +2355,21 @@ nxt_router_gen_error(nxt_task_t *task, nxt_conn_t *c, int code,
|
|||||||
const char* fmt, ...)
|
const char* fmt, ...)
|
||||||
{
|
{
|
||||||
va_list args;
|
va_list args;
|
||||||
|
nxt_mp_t *mp;
|
||||||
nxt_buf_t *b;
|
nxt_buf_t *b;
|
||||||
|
|
||||||
|
/* TODO: fix when called from main thread */
|
||||||
|
/* TODO: fix when called in the middle of response */
|
||||||
|
|
||||||
|
mp = nxt_mp_create(1024, 128, 256, 32);
|
||||||
|
|
||||||
va_start(args, fmt);
|
va_start(args, fmt);
|
||||||
b = nxt_router_get_error_buf(task, c->mem_pool, code, fmt, args);
|
b = nxt_router_get_error_buf(task, mp, code, fmt, args);
|
||||||
va_end(args);
|
va_end(args);
|
||||||
|
|
||||||
if (c->socket.data != NULL) {
|
|
||||||
nxt_mp_free(c->mem_pool, c->socket.data);
|
|
||||||
c->socket.data = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c->socket.fd == -1) {
|
if (c->socket.fd == -1) {
|
||||||
nxt_mp_release(c->mem_pool, b->next);
|
nxt_mp_release(mp, b->next);
|
||||||
nxt_mp_release(c->mem_pool, b);
|
nxt_mp_release(mp, b);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2204,17 +2413,35 @@ nxt_router_sw_ready(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
|
|||||||
static void
|
static void
|
||||||
nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
|
nxt_router_sw_error(nxt_task_t *task, nxt_port_recv_msg_t *msg, void *data)
|
||||||
{
|
{
|
||||||
|
nxt_app_t *app;
|
||||||
|
nxt_queue_link_t *lnk;
|
||||||
|
nxt_req_app_link_t *ra;
|
||||||
nxt_start_worker_t *sw;
|
nxt_start_worker_t *sw;
|
||||||
|
|
||||||
sw = data;
|
sw = data;
|
||||||
|
|
||||||
nxt_assert(sw != NULL);
|
nxt_assert(sw != NULL);
|
||||||
|
nxt_assert(sw->app != NULL);
|
||||||
nxt_assert(sw->app->pending_workers != 0);
|
nxt_assert(sw->app->pending_workers != 0);
|
||||||
|
|
||||||
|
app = sw->app;
|
||||||
|
|
||||||
sw->app->pending_workers--;
|
sw->app->pending_workers--;
|
||||||
|
|
||||||
nxt_debug(task, "sw %p error, failed to start app '%V'",
|
nxt_debug(task, "sw %p error, failed to start app '%V'",
|
||||||
sw, &sw->app->name);
|
sw, &app->name);
|
||||||
|
|
||||||
|
if (!nxt_queue_is_empty(&app->requests)) {
|
||||||
|
lnk = nxt_queue_last(&app->requests);
|
||||||
|
nxt_queue_remove(lnk);
|
||||||
|
|
||||||
|
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
|
||||||
|
|
||||||
|
nxt_debug(task, "app '%V' %p abort next stream #%uD",
|
||||||
|
&app->name, app, ra->stream);
|
||||||
|
|
||||||
|
nxt_router_ra_abort(task, ra, ra->work.data);
|
||||||
|
}
|
||||||
|
|
||||||
nxt_router_sw_release(task, sw);
|
nxt_router_sw_release(task, sw);
|
||||||
}
|
}
|
||||||
@@ -2237,11 +2464,11 @@ nxt_router_send_sw_request(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
if (nxt_queue_is_empty(&app->requests)) {
|
if (nxt_queue_is_empty(&app->requests)) {
|
||||||
ra = sw->ra;
|
ra = sw->ra;
|
||||||
app_port = nxt_router_app_get_port(app, ra->req_id);
|
app_port = nxt_router_app_get_port(app, ra->stream);
|
||||||
|
|
||||||
if (app_port != NULL) {
|
if (app_port != NULL) {
|
||||||
nxt_debug(task, "app '%V' %p process request #%uxD",
|
nxt_debug(task, "app '%V' %p process stream #%uD",
|
||||||
&app->name, app, ra->req_id);
|
&app->name, app, ra->stream);
|
||||||
|
|
||||||
ra->app_port = app_port;
|
ra->app_port = app_port;
|
||||||
|
|
||||||
@@ -2330,7 +2557,7 @@ nxt_router_app_free(nxt_task_t *task, nxt_app_t *app)
|
|||||||
|
|
||||||
|
|
||||||
static nxt_port_t *
|
static nxt_port_t *
|
||||||
nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id)
|
nxt_router_app_get_port(nxt_app_t *app, uint32_t stream)
|
||||||
{
|
{
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_queue_link_t *lnk;
|
nxt_queue_link_t *lnk;
|
||||||
@@ -2347,7 +2574,7 @@ nxt_router_app_get_port(nxt_app_t *app, uint32_t req_id)
|
|||||||
|
|
||||||
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
port = nxt_queue_link_data(lnk, nxt_port_t, app_link);
|
||||||
|
|
||||||
port->app_req_id = req_id;
|
port->app_stream = stream;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_thread_mutex_unlock(&app->mutex);
|
nxt_thread_mutex_unlock(&app->mutex);
|
||||||
@@ -2395,11 +2622,11 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
|
ra = nxt_queue_link_data(lnk, nxt_req_app_link_t, link);
|
||||||
|
|
||||||
nxt_debug(task, "app '%V' %p process next request #%uxD",
|
nxt_debug(task, "app '%V' %p process next stream #%uD",
|
||||||
&app->name, app, ra->req_id);
|
&app->name, app, ra->stream);
|
||||||
|
|
||||||
ra->app_port = port;
|
ra->app_port = port;
|
||||||
port->app_req_id = ra->req_id;
|
port->app_stream = ra->stream;
|
||||||
|
|
||||||
nxt_router_process_http_request_mp(task, ra, port);
|
nxt_router_process_http_request_mp(task, ra, port);
|
||||||
|
|
||||||
@@ -2408,7 +2635,7 @@ nxt_router_app_release_port(nxt_task_t *task, void *obj, void *data)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
port->app_req_id = 0;
|
port->app_stream = 0;
|
||||||
|
|
||||||
if (port->pair[1] == -1) {
|
if (port->pair[1] == -1) {
|
||||||
nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
|
nxt_debug(task, "app '%V' %p port already closed (pid %PI dead?)",
|
||||||
@@ -2452,7 +2679,7 @@ nxt_router_app_remove_port(nxt_port_t *port)
|
|||||||
nxt_bool_t busy;
|
nxt_bool_t busy;
|
||||||
|
|
||||||
app = port->app;
|
app = port->app;
|
||||||
busy = port->app_req_id != 0;
|
busy = port->app_stream != 0;
|
||||||
|
|
||||||
if (app == NULL) {
|
if (app == NULL) {
|
||||||
nxt_thread_log_debug("port %p app remove, no app", port);
|
nxt_thread_log_debug("port %p app remove, no app", port);
|
||||||
@@ -2483,8 +2710,9 @@ nxt_router_app_remove_port(nxt_port_t *port)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, req #%uxD",
|
nxt_thread_log_debug("port %p app remove, busy, app '%V' %p, "
|
||||||
port, &app->name, app, port->app_req_id);
|
"app stream #%uD", port, &app->name, app,
|
||||||
|
port->app_stream);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -2496,6 +2724,7 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
|
|||||||
nxt_app_t *app;
|
nxt_app_t *app;
|
||||||
nxt_conn_t *c;
|
nxt_conn_t *c;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
|
nxt_event_engine_t *engine;
|
||||||
nxt_start_worker_t *sw;
|
nxt_start_worker_t *sw;
|
||||||
nxt_socket_conf_joint_t *joint;
|
nxt_socket_conf_joint_t *joint;
|
||||||
|
|
||||||
@@ -2511,8 +2740,16 @@ nxt_router_app_port(nxt_task_t *task, nxt_req_app_link_t *ra)
|
|||||||
return NXT_ERROR;
|
return NXT_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
engine = task->thread->engine;
|
||||||
|
|
||||||
port = nxt_router_app_get_port(app, ra->req_id);
|
nxt_timer_disable(engine, &c->read_timer);
|
||||||
|
|
||||||
|
if (app->timeout != 0) {
|
||||||
|
c->read_timer.handler = nxt_router_app_timeout;
|
||||||
|
nxt_timer_add(engine, &c->read_timer, app->timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
port = nxt_router_app_get_port(app, ra->stream);
|
||||||
|
|
||||||
if (port != NULL) {
|
if (port != NULL) {
|
||||||
nxt_debug(task, "already have port for app '%V'", &app->name);
|
nxt_debug(task, "already have port for app '%V'", &app->name);
|
||||||
@@ -2740,18 +2977,16 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
|||||||
nxt_mp_t *port_mp;
|
nxt_mp_t *port_mp;
|
||||||
nxt_int_t res;
|
nxt_int_t res;
|
||||||
nxt_port_t *port;
|
nxt_port_t *port;
|
||||||
nxt_req_id_t req_id;
|
|
||||||
nxt_event_engine_t *engine;
|
nxt_event_engine_t *engine;
|
||||||
nxt_req_app_link_t *ra;
|
nxt_req_app_link_t *ra;
|
||||||
nxt_req_conn_link_t *rc;
|
nxt_req_conn_link_t *rc;
|
||||||
|
|
||||||
engine = task->thread->engine;
|
engine = task->thread->engine;
|
||||||
|
|
||||||
do {
|
rc = nxt_port_rpc_register_handler_ex(task, engine->port,
|
||||||
req_id = nxt_random(&task->thread->random);
|
nxt_router_response_ready_handler,
|
||||||
} while (nxt_event_engine_request_find(engine, req_id) != NULL);
|
nxt_router_response_error_handler,
|
||||||
|
sizeof(nxt_req_conn_link_t));
|
||||||
rc = nxt_conn_request_add(c, req_id);
|
|
||||||
|
|
||||||
if (nxt_slow_path(rc == NULL)) {
|
if (nxt_slow_path(rc == NULL)) {
|
||||||
nxt_router_gen_error(task, c, 500, "Failed to allocate "
|
nxt_router_gen_error(task, c, 500, "Failed to allocate "
|
||||||
@@ -2760,17 +2995,19 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
nxt_event_engine_request_add(engine, rc);
|
rc->stream = nxt_port_rpc_ex_stream(rc);
|
||||||
|
rc->conn = c;
|
||||||
|
|
||||||
nxt_debug(task, "req_id %uxD linked to conn %p at engine %p",
|
nxt_queue_insert_tail(&c->requests, &rc->link);
|
||||||
req_id, c, engine);
|
|
||||||
|
nxt_debug(task, "stream #%uD linked to conn %p at engine %p",
|
||||||
|
rc->stream, c, engine);
|
||||||
|
|
||||||
c->socket.data = NULL;
|
c->socket.data = NULL;
|
||||||
|
|
||||||
ra = nxt_router_ra_create(task, rc);
|
ra = nxt_router_ra_create(task, rc);
|
||||||
|
|
||||||
ra->ap = ap;
|
ra->ap = ap;
|
||||||
ra->reply_port = engine->port;
|
|
||||||
|
|
||||||
res = nxt_router_app_port(task, ra);
|
res = nxt_router_app_port(task, ra);
|
||||||
|
|
||||||
@@ -2781,10 +3018,12 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
|||||||
port = ra->app_port;
|
port = ra->app_port;
|
||||||
|
|
||||||
if (nxt_slow_path(port == NULL)) {
|
if (nxt_slow_path(port == NULL)) {
|
||||||
nxt_router_gen_error(task, rc->conn, 500, "Application port not found");
|
nxt_router_gen_error(task, c, 500, "Application port not found");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nxt_port_rpc_ex_set_peer(task, engine->port, rc, port->pid);
|
||||||
|
|
||||||
port_mp = port->mem_pool;
|
port_mp = port->mem_pool;
|
||||||
port->mem_pool = c->mem_pool;
|
port->mem_pool = c->mem_pool;
|
||||||
|
|
||||||
@@ -2792,7 +3031,6 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_conn_t *c,
|
|||||||
|
|
||||||
port->mem_pool = port_mp;
|
port->mem_pool = port_mp;
|
||||||
|
|
||||||
|
|
||||||
nxt_router_ra_release(task, ra, ra->work.data);
|
nxt_router_ra_release(task, ra, ra->work.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2807,6 +3045,10 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
|
|||||||
nxt_app_wmsg_t wmsg;
|
nxt_app_wmsg_t wmsg;
|
||||||
nxt_app_parse_ctx_t *ap;
|
nxt_app_parse_ctx_t *ap;
|
||||||
|
|
||||||
|
/* TODO: it is unsafe to use ra->rc and ra->rc->conn in main thread */
|
||||||
|
|
||||||
|
nxt_assert(ra->rc != NULL);
|
||||||
|
|
||||||
reply_port = ra->reply_port;
|
reply_port = ra->reply_port;
|
||||||
ap = ra->ap;
|
ap = ra->ap;
|
||||||
c = ra->rc->conn;
|
c = ra->rc->conn;
|
||||||
@@ -2828,7 +3070,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
|
|||||||
wmsg.port = port;
|
wmsg.port = port;
|
||||||
wmsg.write = NULL;
|
wmsg.write = NULL;
|
||||||
wmsg.buf = &wmsg.write;
|
wmsg.buf = &wmsg.write;
|
||||||
wmsg.stream = ra->req_id;
|
wmsg.stream = ra->stream;
|
||||||
|
|
||||||
res = port->app->prepare_msg(task, &ap->r, &wmsg);
|
res = port->app->prepare_msg(task, &ap->r, &wmsg);
|
||||||
|
|
||||||
@@ -2843,7 +3085,7 @@ nxt_router_process_http_request_mp(nxt_task_t *task, nxt_req_app_link_t *ra,
|
|||||||
wmsg.port->socket.fd);
|
wmsg.port->socket.fd);
|
||||||
|
|
||||||
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
|
res = nxt_port_socket_write(task, wmsg.port, NXT_PORT_MSG_DATA,
|
||||||
-1, ra->req_id, reply_port->id, wmsg.write);
|
-1, ra->stream, reply_port->id, wmsg.write);
|
||||||
|
|
||||||
if (nxt_slow_path(res != NXT_OK)) {
|
if (nxt_slow_path(res != NXT_OK)) {
|
||||||
nxt_router_gen_error(task, c, 500,
|
nxt_router_gen_error(task, c, 500,
|
||||||
@@ -3217,7 +3459,7 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
|||||||
|
|
||||||
nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
|
nxt_queue_each(rc, &c->requests, nxt_req_conn_link_t, link) {
|
||||||
|
|
||||||
nxt_debug(task, "conn %p close, req %uxD", c, rc->req_id);
|
nxt_debug(task, "conn %p close, stream #%uD", c, rc->stream);
|
||||||
|
|
||||||
if (rc->app_port != NULL) {
|
if (rc->app_port != NULL) {
|
||||||
nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
|
nxt_router_app_release_port(task, rc->app_port, rc->app_port->app);
|
||||||
@@ -3225,9 +3467,9 @@ nxt_router_conn_free(nxt_task_t *task, void *obj, void *data)
|
|||||||
rc->app_port = NULL;
|
rc->app_port = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc->conn = NULL;
|
nxt_router_rc_unlink(rc);
|
||||||
|
|
||||||
nxt_event_engine_request_remove(task->thread->engine, rc);
|
nxt_port_rpc_cancel(task, task->thread->engine->port, rc->stream);
|
||||||
|
|
||||||
} nxt_queue_loop;
|
} nxt_queue_loop;
|
||||||
|
|
||||||
@@ -3281,6 +3523,22 @@ nxt_router_conn_timeout(nxt_task_t *task, void *obj, void *data)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_router_app_timeout(nxt_task_t *task, void *obj, void *data)
|
||||||
|
{
|
||||||
|
nxt_conn_t *c;
|
||||||
|
nxt_timer_t *timer;
|
||||||
|
|
||||||
|
timer = obj;
|
||||||
|
|
||||||
|
nxt_debug(task, "router app timeout");
|
||||||
|
|
||||||
|
c = nxt_read_timer_conn(timer);
|
||||||
|
|
||||||
|
nxt_router_gen_error(task, c, 408, "Application timeout");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_msec_t
|
static nxt_msec_t
|
||||||
nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
|
nxt_router_conn_timeout_value(nxt_conn_t *c, uintptr_t data)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -88,6 +88,8 @@ struct nxt_app_s {
|
|||||||
uint32_t workers;
|
uint32_t workers;
|
||||||
uint32_t max_workers;
|
uint32_t max_workers;
|
||||||
|
|
||||||
|
nxt_msec_t timeout;
|
||||||
|
|
||||||
nxt_app_type_t type:8;
|
nxt_app_type_t type:8;
|
||||||
uint8_t live; /* 1 bit */
|
uint8_t live; /* 1 bit */
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user