Introducing websocket support in router and libunit.

This commit is contained in:
Max Romanov
2019-08-20 16:31:53 +03:00
parent 9bbf54e23e
commit e501c74ddc
29 changed files with 3545 additions and 451 deletions

View File

@@ -14,7 +14,7 @@
#include <nxt_port_memory_int.h>
#include <nxt_unit_request.h>
#include <nxt_unit_response.h>
#include <nxt_router_request.h>
typedef struct {
nxt_str_t type;
@@ -48,64 +48,6 @@ typedef struct {
#endif
typedef struct nxt_msg_info_s {
nxt_buf_t *buf;
nxt_port_mmap_tracking_t tracking;
nxt_work_handler_t completion_handler;
} nxt_msg_info_t;
typedef struct nxt_request_app_link_s nxt_request_app_link_t;
typedef enum {
NXT_APR_NEW_PORT,
NXT_APR_REQUEST_FAILED,
NXT_APR_GOT_RESPONSE,
NXT_APR_CLOSE,
} nxt_apr_action_t;
typedef struct {
uint32_t stream;
nxt_app_t *app;
nxt_port_t *app_port;
nxt_apr_action_t apr_action;
nxt_http_request_t *request;
nxt_msg_info_t msg_info;
nxt_request_app_link_t *req_app_link;
} nxt_request_rpc_data_t;
struct nxt_request_app_link_s {
uint32_t stream;
nxt_atomic_t use_count;
nxt_port_t *app_port;
nxt_apr_action_t apr_action;
nxt_port_t *reply_port;
nxt_http_request_t *request;
nxt_msg_info_t msg_info;
nxt_request_rpc_data_t *req_rpc_data;
nxt_nsec_t res_time;
nxt_queue_link_t link_app_requests; /* for nxt_app_t.requests */
/* for nxt_port_t.pending_requests */
nxt_queue_link_t link_port_pending;
nxt_queue_link_t link_app_pending; /* for nxt_app_t.pending */
nxt_mp_t *mem_pool;
nxt_work_t work;
int err_code;
const char *err_str;
};
typedef struct {
nxt_socket_conf_t *socket_conf;
nxt_router_temp_conf_t *temp_conf;
@@ -305,6 +247,8 @@ static nxt_int_t nxt_router_http_request_done(nxt_task_t *task,
static void nxt_router_http_request_release(nxt_task_t *task, void *obj,
void *data);
const nxt_http_request_state_t nxt_http_websocket;
static nxt_router_t *nxt_router;
static const nxt_str_t http_prefix = nxt_string("HTTP_");
@@ -663,6 +607,7 @@ nxt_request_app_link_release(nxt_task_t *task,
nxt_request_app_link_t *req_app_link)
{
nxt_mp_t *mp;
nxt_http_request_t *r;
nxt_request_rpc_data_t *req_rpc_data;
nxt_assert(task->thread->engine == req_app_link->work.data);
@@ -683,10 +628,11 @@ nxt_request_app_link_release(nxt_task_t *task,
req_rpc_data->msg_info = req_app_link->msg_info;
if (req_rpc_data->app->timeout != 0) {
req_rpc_data->request->timer.handler = nxt_router_app_timeout;
req_rpc_data->request->timer_data = req_rpc_data;
nxt_timer_add(task->thread->engine,
&req_rpc_data->request->timer,
r = req_rpc_data->request;
r->timer.handler = nxt_router_app_timeout;
r->timer_data = req_rpc_data;
nxt_timer_add(task->thread->engine, &r->timer,
req_rpc_data->app->timeout);
}
@@ -833,14 +779,16 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
if (req_app_link->link_app_requests.next == NULL
&& req_app_link->link_port_pending.next == NULL
&& req_app_link->link_app_pending.next == NULL)
&& req_app_link->link_app_pending.next == NULL
&& req_app_link->link_port_websockets.next == NULL)
{
req_app_link = NULL;
} else {
ra_use_delta -=
nxt_queue_chk_remove(&req_app_link->link_app_requests)
+ nxt_queue_chk_remove(&req_app_link->link_port_pending);
+ nxt_queue_chk_remove(&req_app_link->link_port_pending)
+ nxt_queue_chk_remove(&req_app_link->link_port_websockets);
nxt_queue_chk_remove(&req_app_link->link_app_pending);
}
@@ -863,6 +811,7 @@ nxt_request_rpc_data_unlink(nxt_task_t *task,
nxt_router_http_request_done(task, req_rpc_data->request);
req_rpc_data->request->req_rpc_data = NULL;
req_rpc_data->request = NULL;
}
}
@@ -1412,6 +1361,28 @@ static nxt_conf_map_t nxt_router_http_conf[] = {
};
static nxt_conf_map_t nxt_router_websocket_conf[] = {
{
nxt_string("max_frame_size"),
NXT_CONF_MAP_SIZE,
offsetof(nxt_websocket_conf_t, max_frame_size),
},
{
nxt_string("read_timeout"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_websocket_conf_t, read_timeout),
},
{
nxt_string("keepalive_interval"),
NXT_CONF_MAP_MSEC,
offsetof(nxt_websocket_conf_t, keepalive_interval),
},
};
static nxt_int_t
nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
u_char *start, u_char *end)
@@ -1425,7 +1396,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
nxt_app_t *app, *prev;
nxt_router_t *router;
nxt_app_joint_t *app_joint;
nxt_conf_value_t *conf, *http, *value;
nxt_conf_value_t *conf, *http, *value, *websocket;
nxt_conf_value_t *applications, *application;
nxt_conf_value_t *listeners, *listener;
nxt_conf_value_t *routes_conf;
@@ -1448,6 +1419,7 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
#if (NXT_TLS)
static nxt_str_t certificate_path = nxt_string("/tls/certificate");
#endif
static nxt_str_t websocket_path = nxt_string("/settings/http/websocket");
conf = nxt_conf_json_parse(tmcf->mem_pool, start, end, NULL);
if (conf == NULL) {
@@ -1658,6 +1630,8 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
#endif
websocket = nxt_conf_get_path(conf, &websocket_path);
listeners = nxt_conf_get_path(conf, &listeners_path);
if (listeners != NULL) {
@@ -1697,6 +1671,10 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
skcf->body_read_timeout = 30 * 1000;
skcf->send_timeout = 30 * 1000;
skcf->websocket_conf.max_frame_size = 1024 * 1024;
skcf->websocket_conf.read_timeout = 60 * 1000;
skcf->websocket_conf.keepalive_interval = 30 * 1000;
if (http != NULL) {
ret = nxt_conf_map_object(mp, http, nxt_router_http_conf,
nxt_nitems(nxt_router_http_conf),
@@ -1707,6 +1685,17 @@ nxt_router_conf_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
}
}
if (websocket != NULL) {
ret = nxt_conf_map_object(mp, websocket,
nxt_router_websocket_conf,
nxt_nitems(nxt_router_websocket_conf),
&skcf->websocket_conf);
if (ret != NXT_OK) {
nxt_alert(task, "websocket map error");
goto fail;
}
}
#if (NXT_TLS)
value = nxt_conf_get_path(listener, &certificate_path);
@@ -3418,10 +3407,12 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
{
nxt_int_t ret;
nxt_buf_t *b;
nxt_port_t *app_port;
nxt_unit_field_t *f;
nxt_http_field_t *field;
nxt_http_request_t *r;
nxt_unit_response_t *resp;
nxt_request_app_link_t *req_app_link;
nxt_request_rpc_data_t *req_rpc_data;
b = msg->buf;
@@ -3542,7 +3533,48 @@ nxt_router_response_ready_handler(nxt_task_t *task, nxt_port_recv_msg_t *msg,
nxt_http_request_header_send(task, r);
r->state = &nxt_http_request_send_state;
if (r->websocket_handshake
&& r->status == NXT_HTTP_SWITCHING_PROTOCOLS)
{
req_app_link = nxt_request_app_link_alloc(task,
req_rpc_data->req_app_link,
req_rpc_data);
if (nxt_slow_path(req_app_link == NULL)) {
goto fail;
}
app_port = req_app_link->app_port;
if (app_port == NULL && req_rpc_data->app_port != NULL) {
req_app_link->app_port = req_rpc_data->app_port;
app_port = req_app_link->app_port;
req_app_link->apr_action = req_rpc_data->apr_action;
req_rpc_data->app_port = NULL;
}
if (nxt_slow_path(app_port == NULL)) {
goto fail;
}
nxt_thread_mutex_lock(&req_rpc_data->app->mutex);
nxt_queue_insert_tail(&app_port->active_websockets,
&req_app_link->link_port_websockets);
nxt_thread_mutex_unlock(&req_rpc_data->app->mutex);
nxt_router_app_port_release(task, app_port, NXT_APR_UPGRADE);
req_app_link->apr_action = NXT_APR_CLOSE;
nxt_debug(task, "req_app_link stream #%uD upgrade",
req_app_link->stream);
r->state = &nxt_http_websocket;
} else {
r->state = &nxt_http_request_send_state;
}
if (r->out) {
nxt_work_queue_add(&task->thread->engine->fast_work_queue,
@@ -3924,6 +3956,10 @@ nxt_router_app_port_release(nxt_task_t *task, nxt_port_t *port,
got_response = 1;
inc_use = -1;
break;
case NXT_APR_UPGRADE:
dec_pending = 1;
got_response = 1;
break;
case NXT_APR_CLOSE:
inc_use = -1;
break;
@@ -4046,9 +4082,10 @@ re_ra_cancelled:
adjust_idle_timer = 0;
if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0) {
nxt_assert(port->idle_link.next == NULL);
if (port->pair[1] != -1 && !send_quit && port->app_pending_responses == 0
&& nxt_queue_is_empty(&port->active_websockets)
&& port->idle_link.next == NULL)
{
if (app->idle_processes == app->spare_processes
&& app->adjust_idle_work.data == NULL)
{
@@ -4545,6 +4582,7 @@ nxt_router_process_http_request(nxt_task_t *task, nxt_http_request_t *r,
nxt_router_app_use(task, app, 1);
req_rpc_data->request = r;
r->req_rpc_data = req_rpc_data;
req_app_link = &ra_local;
nxt_request_app_link_init(task, req_app_link, req_rpc_data);
@@ -4635,7 +4673,7 @@ nxt_router_app_prepare_request(nxt_task_t *task,
goto release_port;
}
res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_DATA,
res = nxt_port_socket_twrite(task, port, NXT_PORT_MSG_REQ_HEADERS,
-1, req_app_link->stream, reply_port->id, buf,
&req_app_link->msg_info.tracking);
@@ -4785,6 +4823,7 @@ nxt_router_prepare_msg(nxt_task_t *task, nxt_http_request_t *r,
*p++ = '\0';
req->tls = (r->tls != NULL);
req->websocket_handshake = r->websocket_handshake;
req->server_name_length = r->server_name.length;
nxt_unit_sptr_set(&req->server_name, p);