Node.js: introducing websocket support.

This commit is contained in:
Max Romanov
2019-08-20 16:32:05 +03:00
parent e501c74ddc
commit e291841b33
15 changed files with 2397 additions and 199 deletions

0
src/nodejs/unit-http/http.js Executable file → Normal file
View File

76
src/nodejs/unit-http/http_server.js Executable file → Normal file
View File

@@ -8,16 +8,21 @@
const EventEmitter = require('events'); const EventEmitter = require('events');
const http = require('http'); const http = require('http');
const util = require('util'); const util = require('util');
const unit_lib = require('unit-http/build/Release/unit-http.node'); const unit_lib = require('./build/Release/unit-http');
const unit_socket = require('unit-http/socket'); const Socket = require('./socket');
const WebSocketFrame = require('./websocket_frame');
const { Socket } = unit_socket;
function ServerResponse(req) { function ServerResponse(req) {
EventEmitter.call(this); EventEmitter.call(this);
this.headers = {}; this.headers = {};
this.server = req.server;
this._request = req;
req._response = this;
this.socket = req.socket;
this.connection = req.connection;
} }
util.inherits(ServerResponse, EventEmitter); util.inherits(ServerResponse, EventEmitter);
@@ -207,15 +212,23 @@ ServerResponse.prototype._implicitHeader = function _implicitHeader() {
this.writeHead(this.statusCode); this.writeHead(this.statusCode);
}; };
ServerResponse.prototype._writeBody = function(chunk, encoding, callback) { ServerResponse.prototype._send_headers = unit_lib.response_send_headers;
var contentLength = 0;
ServerResponse.prototype._sendHeaders = function _sendHeaders() {
if (!this.headersSent) { if (!this.headersSent) {
unit_lib.unit_response_headers(this, this.statusCode, this.headers, this._send_headers(this.statusCode, this.headers, this.headers_count,
this.headers_count, this.headers_len); this.headers_len);
this.headersSent = true; this.headersSent = true;
} }
};
ServerResponse.prototype._write = unit_lib.response_write;
ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
var contentLength = 0;
this._sendHeaders();
if (typeof chunk === 'function') { if (typeof chunk === 'function') {
callback = chunk; callback = chunk;
@@ -238,7 +251,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
contentLength = chunk.length; contentLength = chunk.length;
} }
unit_lib.unit_response_write(this, chunk, contentLength); this._write(chunk, contentLength);
} }
if (typeof callback === 'function') { if (typeof callback === 'function') {
@@ -268,11 +281,13 @@ ServerResponse.prototype.write = function write(chunk, encoding, callback) {
return true; return true;
}; };
ServerResponse.prototype._end = unit_lib.response_end;
ServerResponse.prototype.end = function end(chunk, encoding, callback) { ServerResponse.prototype.end = function end(chunk, encoding, callback) {
if (!this.finished) { if (!this.finished) {
this._writeBody(chunk, encoding, callback); this._writeBody(chunk, encoding, callback);
unit_lib.unit_response_end(this); this._end();
this.finished = true; this.finished = true;
} }
@@ -280,10 +295,12 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
return this; return this;
}; };
function ServerRequest(server) { function ServerRequest(server, socket) {
EventEmitter.call(this); EventEmitter.call(this);
this.server = server; this.server = server;
this.socket = socket;
this.connection = socket;
} }
util.inherits(ServerRequest, EventEmitter); util.inherits(ServerRequest, EventEmitter);
@@ -339,8 +356,8 @@ ServerRequest.prototype.on = function on(ev, fn) {
if (ev === "data") { if (ev === "data") {
process.nextTick(function () { process.nextTick(function () {
if (this.server.buffer.length !== 0) { if (this._data.length !== 0) {
this.emit("data", this.server.buffer); this.emit("data", this._data);
} }
}.bind(this)); }.bind(this));
@@ -357,14 +374,27 @@ function Server(requestListener) {
this.unit.createServer(); this.unit.createServer();
this.socket = Socket; this.Socket = Socket;
this.request = ServerRequest; this.ServerRequest = ServerRequest;
this.response = ServerResponse; this.ServerResponse = ServerResponse;
this.WebSocketFrame = WebSocketFrame;
if (requestListener) { if (requestListener) {
this.on('request', requestListener); this.on('request', requestListener);
} }
this._upgradeListenerCount = 0;
this.on('newListener', function(ev) {
if (ev === 'upgrade'){
this._upgradeListenerCount++;
}
}).on('removeListener', function(ev) {
if (ev === 'upgrade') {
this._upgradeListenerCount--;
}
});
} }
util.inherits(Server, EventEmitter); util.inherits(Server, EventEmitter);
Server.prototype.setTimeout = function setTimeout(msecs, callback) { Server.prototype.setTimeout = function setTimeout(msecs, callback) {
@@ -381,15 +411,13 @@ Server.prototype.listen = function () {
this.unit.listen(); this.unit.listen();
}; };
Server.prototype.emit_events = function (server, req, res) { Server.prototype.emit_request = function (req, res) {
req.server = server; if (req._websocket_handshake && this._upgradeListenerCount > 0) {
res.server = server; this.emit('upgrade', req, req.socket);
req.res = res;
res.req = req;
server.buffer = server.unit._read(req.socket.req_pointer); } else {
this.emit("request", req, res);
server.emit("request", req, res); }
process.nextTick(() => { process.nextTick(() => {
req.emit("finish"); req.emit("finish");

View File

@@ -188,6 +188,21 @@ struct nxt_napi {
} }
inline void *
get_buffer_info(napi_value val, size_t &size)
{
void *res;
napi_status status;
status = napi_get_buffer_info(env_, val, &res, &size);
if (status != napi_ok) {
throw exception("Failed to get buffer info");
}
return res;
}
inline napi_value inline napi_value
get_cb_info(napi_callback_info info, size_t &argc, napi_value *argv) get_cb_info(napi_callback_info info, size_t &argc, napi_value *argv)
{ {
@@ -218,6 +233,23 @@ struct nxt_napi {
} }
inline napi_value
get_cb_info(napi_callback_info info, napi_value &arg)
{
size_t argc;
napi_value res;
argc = 1;
res = get_cb_info(info, argc, &arg);
if (argc != 1) {
throw exception("Wrong args count. Expected 1");
}
return res;
}
inline napi_value inline napi_value
get_element(napi_value obj, uint32_t i) get_element(napi_value obj, uint32_t i)
{ {
@@ -311,15 +343,22 @@ struct nxt_napi {
inline nxt_unit_request_info_t * inline nxt_unit_request_info_t *
get_request_info(napi_value obj) get_request_info(napi_value obj)
{ {
int64_t n; return (nxt_unit_request_info_t *) unwrap(obj);
napi_status status;
status = napi_get_value_int64(env_, obj, &n);
if (status != napi_ok) {
throw exception("Failed to get request pointer");
} }
return (nxt_unit_request_info_t *) (intptr_t) n;
inline uint32_t
get_value_bool(napi_value obj)
{
bool res;
napi_status status;
status = napi_get_value_bool(env_, obj, &res);
if (status != napi_ok) {
throw exception("Failed to get bool");
}
return res;
} }
@@ -353,6 +392,21 @@ struct nxt_napi {
} }
inline size_t
get_value_string_utf8(napi_value val, char *buf, size_t bufsize)
{
size_t res;
napi_status status;
status = napi_get_value_string_utf8(env_, val, buf, bufsize, &res);
if (status != napi_ok) {
throw exception("Failed to get string utf8");
}
return res;
}
inline bool inline bool
is_array(napi_value val) is_array(napi_value val)
{ {
@@ -368,6 +422,21 @@ struct nxt_napi {
} }
inline bool
is_buffer(napi_value val)
{
bool res;
napi_status status;
status = napi_is_buffer(env_, val, &res);
if (status != napi_ok) {
throw exception("Failed to confirm value is buffer");
}
return res;
}
inline napi_value inline napi_value
make_callback(napi_async_context ctx, napi_value val, napi_value func, make_callback(napi_async_context ctx, napi_value val, napi_value func,
int argc, const napi_value *argv) int argc, const napi_value *argv)
@@ -397,6 +466,41 @@ struct nxt_napi {
} }
inline napi_value
make_callback(napi_async_context ctx, napi_value val, napi_value func)
{
return make_callback(ctx, val, func, 0, NULL);
}
inline napi_value
make_callback(napi_async_context ctx, napi_value val, napi_value func,
napi_value arg1)
{
return make_callback(ctx, val, func, 1, &arg1);
}
inline napi_value
make_callback(napi_async_context ctx, napi_value val, napi_value func,
napi_value arg1, napi_value arg2)
{
napi_value args[2] = { arg1, arg2 };
return make_callback(ctx, val, func, 2, args);
}
inline napi_value
make_callback(napi_async_context ctx, napi_value val, napi_value func,
napi_value arg1, napi_value arg2, napi_value arg3)
{
napi_value args[3] = { arg1, arg2, arg3 };
return make_callback(ctx, val, func, 3, args);
}
inline napi_value inline napi_value
new_instance(napi_value ctor) new_instance(napi_value ctor)
{ {
@@ -427,6 +531,22 @@ struct nxt_napi {
} }
inline napi_value
new_instance(napi_value ctor, napi_value param1, napi_value param2)
{
napi_value res;
napi_status status;
napi_value param[2] = { param1, param2 };
status = napi_new_instance(env_, ctor, 2, param, &res);
if (status != napi_ok) {
throw exception("Failed to create instance");
}
return res;
}
inline void inline void
set_element(napi_value obj, uint32_t i, napi_value val) set_element(napi_value obj, uint32_t i, napi_value val)
{ {
@@ -472,8 +592,46 @@ struct nxt_napi {
} }
template<typename T>
inline void inline void
set_named_property(napi_value obj, const char *name, intptr_t val) set_named_property(napi_value obj, const char *name, T val)
{
set_named_property(obj, name, create(val));
}
inline napi_value
create(int32_t val)
{
napi_value ptr;
napi_status status;
status = napi_create_int32(env_, val, &ptr);
if (status != napi_ok) {
throw exception("Failed to create int32");
}
return ptr;
}
inline napi_value
create(uint32_t val)
{
napi_value ptr;
napi_status status;
status = napi_create_uint32(env_, val, &ptr);
if (status != napi_ok) {
throw exception("Failed to create uint32");
}
return ptr;
}
inline napi_value
create(int64_t val)
{ {
napi_value ptr; napi_value ptr;
napi_status status; napi_status status;
@@ -483,7 +641,32 @@ struct nxt_napi {
throw exception("Failed to create int64"); throw exception("Failed to create int64");
} }
set_named_property(obj, name, ptr); return ptr;
}
inline void
remove_wrap(napi_ref& ref)
{
if (ref != nullptr) {
remove_wrap(get_reference_value(ref));
ref = nullptr;
}
}
inline void *
remove_wrap(napi_value val)
{
void *res;
napi_status status;
status = napi_remove_wrap(env_, val, &res);
if (status != napi_ok) {
throw exception("Failed to remove_wrap");
}
return res;
} }

View File

@@ -14,7 +14,14 @@
"package.json", "package.json",
"socket.js", "socket.js",
"binding.gyp", "binding.gyp",
"README.md" "README.md",
"websocket.js",
"websocket_connection.js",
"websocket_frame.js",
"websocket_request.js",
"websocket_router.js",
"websocket_router_request.js",
"websocket_server.js"
], ],
"scripts": { "scripts": {
"clean": "node-gyp clean", "clean": "node-gyp clean",

8
src/nodejs/unit-http/socket.js Executable file → Normal file
View File

@@ -7,7 +7,7 @@
const EventEmitter = require('events'); const EventEmitter = require('events');
const util = require('util'); const util = require('util');
const unit_lib = require('unit-http/build/Release/unit-http.node'); const unit_lib = require('./build/Release/unit-http');
function Socket(options) { function Socket(options) {
EventEmitter.call(this); EventEmitter.call(this);
@@ -89,7 +89,7 @@ Socket.prototype.setTimeout = function setTimeout(timeout, callback) {
this.timeout = timeout; this.timeout = timeout;
this.on('timeout', callback); // this.on('timeout', callback);
return this; return this;
}; };
@@ -101,6 +101,4 @@ Socket.prototype.write = function write(data, encoding, callback) {
}; };
module.exports = { module.exports = Socket;
Socket
};

View File

@@ -10,6 +10,8 @@
#include <uv.h> #include <uv.h>
#include <nxt_unit_websocket.h>
napi_ref Unit::constructor_; napi_ref Unit::constructor_;
@@ -20,17 +22,27 @@ struct nxt_nodejs_ctx_t {
}; };
struct req_data_t {
napi_ref sock_ref;
napi_ref resp_ref;
napi_ref conn_ref;
};
Unit::Unit(napi_env env, napi_value jsthis): Unit::Unit(napi_env env, napi_value jsthis):
nxt_napi(env), nxt_napi(env),
wrapper_(wrap(jsthis, this, destroy)), wrapper_(wrap(jsthis, this, destroy)),
unit_ctx_(nullptr) unit_ctx_(nullptr)
{ {
nxt_unit_debug(NULL, "Unit::Unit()");
} }
Unit::~Unit() Unit::~Unit()
{ {
delete_reference(wrapper_); delete_reference(wrapper_);
nxt_unit_debug(NULL, "Unit::~Unit()");
} }
@@ -38,23 +50,26 @@ napi_value
Unit::init(napi_env env, napi_value exports) Unit::init(napi_env env, napi_value exports)
{ {
nxt_napi napi(env); nxt_napi napi(env);
napi_value cons; napi_value ctor;
napi_property_descriptor properties[] = { napi_property_descriptor unit_props[] = {
{ "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
{ "listen", 0, listen, 0, 0, 0, napi_default, 0 }, { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
{ "_read", 0, _read, 0, 0, 0, napi_default, 0 }
}; };
try { try {
cons = napi.define_class("Unit", create, 3, properties); ctor = napi.define_class("Unit", create, 2, unit_props);
constructor_ = napi.create_reference(cons); constructor_ = napi.create_reference(ctor);
napi.set_named_property(exports, "Unit", cons); napi.set_named_property(exports, "Unit", ctor);
napi.set_named_property(exports, "unit_response_headers", napi.set_named_property(exports, "response_send_headers",
response_send_headers); response_send_headers);
napi.set_named_property(exports, "unit_response_write", response_write); napi.set_named_property(exports, "response_write", response_write);
napi.set_named_property(exports, "unit_response_end", response_end); napi.set_named_property(exports, "response_end", response_end);
napi.set_named_property(exports, "websocket_send_frame",
websocket_send_frame);
napi.set_named_property(exports, "websocket_set_sock",
websocket_set_sock);
} catch (exception &e) { } catch (exception &e) {
napi.throw_error(e); napi.throw_error(e);
@@ -78,7 +93,7 @@ napi_value
Unit::create(napi_env env, napi_callback_info info) Unit::create(napi_env env, napi_callback_info info)
{ {
nxt_napi napi(env); nxt_napi napi(env);
napi_value target, cons, instance, jsthis; napi_value target, ctor, instance, jsthis;
try { try {
target = napi.get_new_target(info); target = napi.get_new_target(info);
@@ -94,8 +109,8 @@ Unit::create(napi_env env, napi_callback_info info)
} }
/* Invoked as plain function `Unit(...)`, turn into construct call. */ /* Invoked as plain function `Unit(...)`, turn into construct call. */
cons = napi.get_reference_value(constructor_); ctor = napi.get_reference_value(constructor_);
instance = napi.new_instance(cons); instance = napi.new_instance(ctor);
napi.create_reference(instance); napi.create_reference(instance);
} catch (exception &e) { } catch (exception &e) {
@@ -130,10 +145,14 @@ Unit::create_server(napi_env env, napi_callback_info info)
memset(&unit_init, 0, sizeof(nxt_unit_init_t)); memset(&unit_init, 0, sizeof(nxt_unit_init_t));
unit_init.data = obj; unit_init.data = obj;
unit_init.callbacks.request_handler = request_handler; unit_init.callbacks.request_handler = request_handler_cb;
unit_init.callbacks.websocket_handler = websocket_handler_cb;
unit_init.callbacks.close_handler = close_handler_cb;
unit_init.callbacks.add_port = add_port; unit_init.callbacks.add_port = add_port;
unit_init.callbacks.remove_port = remove_port; unit_init.callbacks.remove_port = remove_port;
unit_init.callbacks.quit = quit; unit_init.callbacks.quit = quit_cb;
unit_init.request_data_size = sizeof(req_data_t);
obj->unit_ctx_ = nxt_unit_init(&unit_init); obj->unit_ctx_ = nxt_unit_init(&unit_init);
if (obj->unit_ctx_ == NULL) { if (obj->unit_ctx_ == NULL) {
@@ -157,74 +176,139 @@ Unit::listen(napi_env env, napi_callback_info info)
} }
napi_value void
Unit::_read(napi_env env, napi_callback_info info) Unit::request_handler_cb(nxt_unit_request_info_t *req)
{ {
void *data; Unit *obj;
size_t argc;
nxt_napi napi(env);
napi_value buffer, argv;
nxt_unit_request_info_t *req;
argc = 1; obj = reinterpret_cast<Unit *>(req->unit->data);
try { obj->request_handler(req);
napi.get_cb_info(info, argc, &argv);
req = napi.get_request_info(argv);
buffer = napi.create_buffer((size_t) req->content_length, &data);
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
nxt_unit_request_read(req, data, req->content_length);
return buffer;
} }
void void
Unit::request_handler(nxt_unit_request_info_t *req) Unit::request_handler(nxt_unit_request_info_t *req)
{ {
Unit *obj; napi_value socket, request, response, server_obj, emit_request;
napi_value socket, request, response, server_obj;
napi_value emit_events;
napi_value events_args[3];
obj = reinterpret_cast<Unit *>(req->unit->data); memset(req->data, 0, sizeof(req_data_t));
try { try {
nxt_handle_scope scope(obj->env()); nxt_handle_scope scope(env());
server_obj = obj->get_server_object(); server_obj = get_server_object();
socket = obj->create_socket(server_obj, req); socket = create_socket(server_obj, req);
request = obj->create_request(server_obj, socket); request = create_request(server_obj, socket);
response = obj->create_response(server_obj, socket, request, req); response = create_response(server_obj, request, req);
obj->create_headers(req, request); create_headers(req, request);
emit_events = obj->get_named_property(server_obj, "emit_events"); emit_request = get_named_property(server_obj, "emit_request");
events_args[0] = server_obj; nxt_async_context async_context(env(), "request_handler");
events_args[1] = request;
events_args[2] = response;
nxt_async_context async_context(obj->env(), "unit_request_handler");
nxt_callback_scope async_scope(async_context); nxt_callback_scope async_scope(async_context);
obj->make_callback(async_context, server_obj, emit_events, make_callback(async_context, server_obj, emit_request, request,
3, events_args); response);
} catch (exception &e) { } catch (exception &e) {
obj->throw_error(e); nxt_unit_req_warn(req, "request_handler: %s", e.str);
} }
} }
void void
Unit::websocket_handler_cb(nxt_unit_websocket_frame_t *ws)
{
Unit *obj;
obj = reinterpret_cast<Unit *>(ws->req->unit->data);
obj->websocket_handler(ws);
}
void
Unit::websocket_handler(nxt_unit_websocket_frame_t *ws)
{
napi_value frame, server_obj, process_frame, conn;
req_data_t *req_data;
req_data = (req_data_t *) ws->req->data;
try {
nxt_handle_scope scope(env());
server_obj = get_server_object();
frame = create_websocket_frame(server_obj, ws);
conn = get_reference_value(req_data->conn_ref);
process_frame = get_named_property(conn, "processFrame");
nxt_async_context async_context(env(), "websocket_handler");
nxt_callback_scope async_scope(async_context);
make_callback(async_context, conn, process_frame, frame);
} catch (exception &e) {
nxt_unit_req_warn(ws->req, "websocket_handler: %s", e.str);
}
nxt_unit_websocket_done(ws);
}
void
Unit::close_handler_cb(nxt_unit_request_info_t *req)
{
Unit *obj;
obj = reinterpret_cast<Unit *>(req->unit->data);
obj->close_handler(req);
}
void
Unit::close_handler(nxt_unit_request_info_t *req)
{
napi_value conn_handle_close, conn;
req_data_t *req_data;
req_data = (req_data_t *) req->data;
try {
nxt_handle_scope scope(env());
conn = get_reference_value(req_data->conn_ref);
conn_handle_close = get_named_property(conn, "handleSocketClose");
nxt_async_context async_context(env(), "close_handler");
nxt_callback_scope async_scope(async_context);
make_callback(async_context, conn, conn_handle_close,
nxt_napi::create(0));
remove_wrap(req_data->sock_ref);
remove_wrap(req_data->resp_ref);
remove_wrap(req_data->conn_ref);
} catch (exception &e) {
nxt_unit_req_warn(req, "close_handler: %s", e.str);
return;
}
nxt_unit_request_done(req, NXT_UNIT_OK);
}
static void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events) nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{ {
nxt_unit_run_once((nxt_unit_ctx_t *) handle->data); nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
@@ -244,14 +328,14 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
obj = reinterpret_cast<Unit *>(ctx->unit->data); obj = reinterpret_cast<Unit *>(ctx->unit->data);
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) { if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
obj->throw_error("Failed to upgrade read" nxt_unit_warn(ctx, "fcntl(%d, O_NONBLOCK) failed: %s (%d)",
" file descriptor to O_NONBLOCK"); port->in_fd, strerror(errno), errno);
return -1; return -1;
} }
status = napi_get_uv_event_loop(obj->env(), &loop); status = napi_get_uv_event_loop(obj->env(), &loop);
if (status != napi_ok) { if (status != napi_ok) {
obj->throw_error("Failed to get uv.loop"); nxt_unit_warn(ctx, "Failed to get uv.loop");
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
@@ -259,13 +343,13 @@ Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
err = uv_poll_init(loop, &node_ctx->poll, port->in_fd); err = uv_poll_init(loop, &node_ctx->poll, port->in_fd);
if (err < 0) { if (err < 0) {
obj->throw_error("Failed to init uv.poll"); nxt_unit_warn(ctx, "Failed to init uv.poll");
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback); err = uv_poll_start(&node_ctx->poll, UV_READABLE, nxt_uv_read_callback);
if (err < 0) { if (err < 0) {
obj->throw_error("Failed to start uv.poll"); nxt_unit_warn(ctx, "Failed to start uv.poll");
return NXT_UNIT_ERROR; return NXT_UNIT_ERROR;
} }
@@ -308,27 +392,35 @@ Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
void void
Unit::quit(nxt_unit_ctx_t *ctx) Unit::quit_cb(nxt_unit_ctx_t *ctx)
{ {
Unit *obj; Unit *obj;
napi_value server_obj, emit_close;
obj = reinterpret_cast<Unit *>(ctx->unit->data); obj = reinterpret_cast<Unit *>(ctx->unit->data);
obj->quit(ctx);
}
void
Unit::quit(nxt_unit_ctx_t *ctx)
{
napi_value server_obj, emit_close;
try { try {
nxt_handle_scope scope(obj->env()); nxt_handle_scope scope(env());
server_obj = obj->get_server_object(); server_obj = get_server_object();
emit_close = obj->get_named_property(server_obj, "emit_close"); emit_close = get_named_property(server_obj, "emit_close");
nxt_async_context async_context(obj->env(), "unit_quit"); nxt_async_context async_context(env(), "unit_quit");
nxt_callback_scope async_scope(async_context); nxt_callback_scope async_scope(async_context);
obj->make_callback(async_context, server_obj, emit_close, 0, NULL); make_callback(async_context, server_obj, emit_close);
} catch (exception &e) { } catch (exception &e) {
obj->throw_error(e); nxt_unit_debug(ctx, "quit: %s", e.str);
} }
nxt_unit_done(ctx); nxt_unit_done(ctx);
@@ -349,8 +441,9 @@ Unit::get_server_object()
void void
Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
{ {
void *data;
uint32_t i; uint32_t i;
napi_value headers, raw_headers; napi_value headers, raw_headers, buffer;
napi_status status; napi_status status;
nxt_unit_request_t *r; nxt_unit_request_t *r;
@@ -373,6 +466,13 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
set_named_property(request, "httpVersion", r->version, r->version_length); set_named_property(request, "httpVersion", r->version, r->version_length);
set_named_property(request, "method", r->method, r->method_length); set_named_property(request, "method", r->method, r->method_length);
set_named_property(request, "url", r->target, r->target_length); set_named_property(request, "url", r->target, r->target_length);
set_named_property(request, "_websocket_handshake", r->websocket_handshake);
buffer = create_buffer((size_t) req->content_length, &data);
nxt_unit_request_read(req, data, req->content_length);
set_named_property(request, "_data", buffer);
} }
@@ -410,15 +510,18 @@ napi_value
Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
{ {
napi_value constructor, res; napi_value constructor, res;
req_data_t *req_data;
nxt_unit_request_t *r; nxt_unit_request_t *r;
r = req->request; r = req->request;
constructor = get_named_property(server_obj, "socket"); constructor = get_named_property(server_obj, "Socket");
res = new_instance(constructor); res = new_instance(constructor);
set_named_property(res, "req_pointer", (intptr_t) req); req_data = (req_data_t *) req->data;
req_data->sock_ref = wrap(res, req, sock_destroy);
set_named_property(res, "remoteAddress", r->remote, r->remote_length); set_named_property(res, "remoteAddress", r->remote, r->remote_length);
set_named_property(res, "localAddress", r->local, r->local_length); set_named_property(res, "localAddress", r->local, r->local_length);
@@ -429,34 +532,66 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
napi_value napi_value
Unit::create_request(napi_value server_obj, napi_value socket) Unit::create_request(napi_value server_obj, napi_value socket)
{ {
napi_value constructor, return_val; napi_value constructor;
constructor = get_named_property(server_obj, "request"); constructor = get_named_property(server_obj, "ServerRequest");
return_val = new_instance(constructor, server_obj); return new_instance(constructor, server_obj, socket);
set_named_property(return_val, "socket", socket);
set_named_property(return_val, "connection", socket);
return return_val;
} }
napi_value napi_value
Unit::create_response(napi_value server_obj, napi_value socket, Unit::create_response(napi_value server_obj, napi_value request,
napi_value request, nxt_unit_request_info_t *req) nxt_unit_request_info_t *req)
{ {
napi_value constructor, return_val; napi_value constructor, res;
req_data_t *req_data;
constructor = get_named_property(server_obj, "response"); constructor = get_named_property(server_obj, "ServerResponse");
return_val = new_instance(constructor, request); res = new_instance(constructor, request);
set_named_property(return_val, "socket", socket); req_data = (req_data_t *) req->data;
set_named_property(return_val, "connection", socket); req_data->resp_ref = wrap(res, req, resp_destroy);
set_named_property(return_val, "_req_point", (intptr_t) req);
return return_val; return res;
}
napi_value
Unit::create_websocket_frame(napi_value server_obj,
nxt_unit_websocket_frame_t *ws)
{
void *data;
napi_value constructor, res, buffer;
uint8_t sc[2];
constructor = get_named_property(server_obj, "WebSocketFrame");
res = new_instance(constructor);
set_named_property(res, "fin", (bool) ws->header->fin);
set_named_property(res, "opcode", ws->header->opcode);
set_named_property(res, "length", (int64_t) ws->payload_len);
if (ws->header->opcode == NXT_WEBSOCKET_OP_CLOSE) {
if (ws->payload_len >= 2) {
nxt_unit_websocket_read(ws, sc, 2);
set_named_property(res, "closeStatus",
(((uint16_t) sc[0]) << 8) | sc[1]);
} else {
set_named_property(res, "closeStatus", -1);
}
}
buffer = create_buffer((size_t) ws->content_length, &data);
nxt_unit_websocket_read(ws, data, ws->content_length);
set_named_property(res, "binaryPayload", buffer);
return res;
} }
@@ -472,35 +607,32 @@ Unit::response_send_headers(napi_env env, napi_callback_info info)
uint16_t hash; uint16_t hash;
nxt_napi napi(env); nxt_napi napi(env);
napi_value this_arg, headers, keys, name, value, array_val; napi_value this_arg, headers, keys, name, value, array_val;
napi_value req_num, array_entry; napi_value array_entry;
napi_valuetype val_type; napi_valuetype val_type;
nxt_unit_field_t *f; nxt_unit_field_t *f;
nxt_unit_request_info_t *req; nxt_unit_request_info_t *req;
napi_value argv[5]; napi_value argv[4];
argc = 5; argc = 4;
try { try {
this_arg = napi.get_cb_info(info, argc, argv); this_arg = napi.get_cb_info(info, argc, argv);
if (argc != 5) { if (argc != 4) {
napi.throw_error("Wrong args count. Expected: " napi.throw_error("Wrong args count. Expected: "
"statusCode, headers, headers count, " "statusCode, headers, headers count, "
"headers length"); "headers length");
return nullptr; return nullptr;
} }
req_num = napi.get_named_property(argv[0], "_req_point"); req = napi.get_request_info(this_arg);
status_code = napi.get_value_uint32(argv[0]);
req = napi.get_request_info(req_num); keys_count = napi.get_value_uint32(argv[2]);
header_len = napi.get_value_uint32(argv[3]);
status_code = napi.get_value_uint32(argv[1]);
keys_count = napi.get_value_uint32(argv[3]);
header_len = napi.get_value_uint32(argv[4]);
/* Need to reserve extra byte for C-string 0-termination. */ /* Need to reserve extra byte for C-string 0-termination. */
header_len++; header_len++;
headers = argv[2]; headers = argv[1];
ret = nxt_unit_response_init(req, status_code, keys_count, header_len); ret = nxt_unit_response_init(req, status_code, keys_count, header_len);
if (ret != NXT_UNIT_OK) { if (ret != NXT_UNIT_OK) {
@@ -611,94 +743,79 @@ napi_value
Unit::response_write(napi_env env, napi_callback_info info) Unit::response_write(napi_env env, napi_callback_info info)
{ {
int ret; int ret;
char *ptr; void *ptr;
size_t argc, have_buf_len; size_t argc, have_buf_len;
uint32_t buf_len; uint32_t buf_len;
nxt_napi napi(env); nxt_napi napi(env);
napi_value this_arg, req_num; napi_value this_arg;
napi_status status;
nxt_unit_buf_t *buf; nxt_unit_buf_t *buf;
napi_valuetype buf_type; napi_valuetype buf_type;
nxt_unit_request_info_t *req; nxt_unit_request_info_t *req;
napi_value argv[3]; napi_value argv[2];
argc = 3; argc = 2;
try { try {
this_arg = napi.get_cb_info(info, argc, argv); this_arg = napi.get_cb_info(info, argc, argv);
if (argc != 3) { if (argc != 2) {
throw exception("Wrong args count. Expected: " throw exception("Wrong args count. Expected: "
"chunk, chunk length"); "chunk, chunk length");
} }
req_num = napi.get_named_property(argv[0], "_req_point"); req = napi.get_request_info(this_arg);
req = napi.get_request_info(req_num); buf_type = napi.type_of(argv[0]);
buf_len = napi.get_value_uint32(argv[1]) + 1;
buf_len = napi.get_value_uint32(argv[2]);
buf_type = napi.type_of(argv[1]);
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
buf_len++;
buf = nxt_unit_response_buf_alloc(req, buf_len); buf = nxt_unit_response_buf_alloc(req, buf_len);
if (buf == NULL) { if (buf == NULL) {
goto failed; throw exception("Failed to allocate response buffer");
} }
if (buf_type == napi_string) { if (buf_type == napi_string) {
/* TODO: will work only for utf8 content-type */ /* TODO: will work only for utf8 content-type */
status = napi_get_value_string_utf8(env, argv[1], buf->free, have_buf_len = napi.get_value_string_utf8(argv[0], buf->free,
buf_len, &have_buf_len); buf_len);
} else { } else {
status = napi_get_buffer_info(env, argv[1], (void **) &ptr, ptr = napi.get_buffer_info(argv[0], have_buf_len);
&have_buf_len);
memcpy(buf->free, ptr, have_buf_len); memcpy(buf->free, ptr, have_buf_len);
} }
if (status != napi_ok) {
goto failed;
}
buf->free += have_buf_len; buf->free += have_buf_len;
ret = nxt_unit_buf_send(buf); ret = nxt_unit_buf_send(buf);
if (ret != NXT_UNIT_OK) { if (ret != NXT_UNIT_OK) {
goto failed; throw exception("Failed to send body buf");
}
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
} }
return this_arg; return this_arg;
failed:
napi.throw_error("Failed to write body");
return nullptr;
} }
napi_value napi_value
Unit::response_end(napi_env env, napi_callback_info info) Unit::response_end(napi_env env, napi_callback_info info)
{ {
size_t argc;
nxt_napi napi(env); nxt_napi napi(env);
napi_value resp, this_arg, req_num; napi_value this_arg;
req_data_t *req_data;
nxt_unit_request_info_t *req; nxt_unit_request_info_t *req;
argc = 1;
try { try {
this_arg = napi.get_cb_info(info, argc, &resp); this_arg = napi.get_cb_info(info);
req_num = napi.get_named_property(resp, "_req_point"); req = napi.get_request_info(this_arg);
req = napi.get_request_info(req_num);
req_data = (req_data_t *) req->data;
napi.remove_wrap(req_data->sock_ref);
napi.remove_wrap(req_data->resp_ref);
napi.remove_wrap(req_data->conn_ref);
} catch (exception &e) { } catch (exception &e) {
napi.throw_error(e); napi.throw_error(e);
@@ -709,3 +826,135 @@ Unit::response_end(napi_env env, napi_callback_info info)
return this_arg; return this_arg;
} }
napi_value
Unit::websocket_send_frame(napi_env env, napi_callback_info info)
{
int ret, iovec_len;
bool fin;
size_t buf_len;
uint32_t opcode, sc;
nxt_napi napi(env);
napi_value this_arg, frame, payload;
nxt_unit_request_info_t *req;
char status_code[2];
struct iovec iov[2];
iovec_len = 0;
try {
this_arg = napi.get_cb_info(info, frame);
req = napi.get_request_info(this_arg);
opcode = napi.get_value_uint32(napi.get_named_property(frame,
"opcode"));
if (opcode == NXT_WEBSOCKET_OP_CLOSE) {
sc = napi.get_value_uint32(napi.get_named_property(frame,
"closeStatus"));
status_code[0] = (sc >> 8) & 0xFF;
status_code[1] = sc & 0xFF;
iov[iovec_len].iov_base = status_code;
iov[iovec_len].iov_len = 2;
iovec_len++;
}
try {
fin = napi.get_value_bool(napi.get_named_property(frame, "fin"));
} catch (exception &e) {
fin = true;
}
payload = napi.get_named_property(frame, "binaryPayload");
if (napi.is_buffer(payload)) {
iov[iovec_len].iov_base = napi.get_buffer_info(payload, buf_len);
} else {
buf_len = 0;
}
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
if (buf_len > 0) {
iov[iovec_len].iov_len = buf_len;
iovec_len++;
}
ret = nxt_unit_websocket_sendv(req, opcode, fin ? 1 : 0, iov, iovec_len);
if (ret != NXT_UNIT_OK) {
goto failed;
}
return this_arg;
failed:
napi.throw_error("Failed to send frame");
return nullptr;
}
napi_value
Unit::websocket_set_sock(napi_env env, napi_callback_info info)
{
nxt_napi napi(env);
napi_value this_arg, sock;
req_data_t *req_data;
nxt_unit_request_info_t *req;
try {
this_arg = napi.get_cb_info(info, sock);
req = napi.get_request_info(sock);
req_data = (req_data_t *) req->data;
req_data->conn_ref = napi.wrap(this_arg, req, conn_destroy);
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
return this_arg;
}
void
Unit::conn_destroy(napi_env env, void *nativeObject, void *finalize_hint)
{
nxt_unit_request_info_t *req;
req = (nxt_unit_request_info_t *) nativeObject;
nxt_unit_warn(NULL, "conn_destroy: %p", req);
}
void
Unit::sock_destroy(napi_env env, void *nativeObject, void *finalize_hint)
{
nxt_unit_request_info_t *req;
req = (nxt_unit_request_info_t *) nativeObject;
nxt_unit_warn(NULL, "sock_destroy: %p", req);
}
void
Unit::resp_destroy(napi_env env, void *nativeObject, void *finalize_hint)
{
nxt_unit_request_info_t *req;
req = (nxt_unit_request_info_t *) nativeObject;
nxt_unit_warn(NULL, "resp_destroy: %p", req);
}

View File

@@ -19,14 +19,28 @@ private:
static napi_value create(napi_env env, napi_callback_info info); static napi_value create(napi_env env, napi_callback_info info);
static void destroy(napi_env env, void *nativeObject, void *finalize_hint); static void destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static napi_value create_server(napi_env env, napi_callback_info info); static napi_value create_server(napi_env env, napi_callback_info info);
static napi_value listen(napi_env env, napi_callback_info info); static napi_value listen(napi_env env, napi_callback_info info);
static napi_value _read(napi_env env, napi_callback_info info); static napi_value _read(napi_env env, napi_callback_info info);
static void request_handler(nxt_unit_request_info_t *req);
static void request_handler_cb(nxt_unit_request_info_t *req);
void request_handler(nxt_unit_request_info_t *req);
static void websocket_handler_cb(nxt_unit_websocket_frame_t *ws);
void websocket_handler(nxt_unit_websocket_frame_t *ws);
static void close_handler_cb(nxt_unit_request_info_t *req);
void close_handler(nxt_unit_request_info_t *req);
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port); static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id); static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
static void quit(nxt_unit_ctx_t *ctx);
static void quit_cb(nxt_unit_ctx_t *ctx);
void quit(nxt_unit_ctx_t *ctx);
napi_value get_server_object(); napi_value get_server_object();
@@ -35,15 +49,20 @@ private:
napi_value create_request(napi_value server_obj, napi_value socket); napi_value create_request(napi_value server_obj, napi_value socket);
napi_value create_response(napi_value server_obj, napi_value socket, napi_value create_response(napi_value server_obj, napi_value request,
napi_value request,
nxt_unit_request_info_t *req); nxt_unit_request_info_t *req);
napi_value create_websocket_frame(napi_value server_obj,
nxt_unit_websocket_frame_t *ws);
static napi_value response_send_headers(napi_env env, static napi_value response_send_headers(napi_env env,
napi_callback_info info); napi_callback_info info);
static napi_value response_write(napi_env env, napi_callback_info info); static napi_value response_write(napi_env env, napi_callback_info info);
static napi_value response_end(napi_env env, napi_callback_info info); static napi_value response_end(napi_env env, napi_callback_info info);
static napi_value websocket_send_frame(napi_env env,
napi_callback_info info);
static napi_value websocket_set_sock(napi_env env, napi_callback_info info);
void create_headers(nxt_unit_request_info_t *req, napi_value request); void create_headers(nxt_unit_request_info_t *req, napi_value request);

View File

@@ -0,0 +1,73 @@
var noop = exports.noop = function(){};
exports.extend = function extend(dest, source) {
for (var prop in source) {
dest[prop] = source[prop];
}
};
exports.eventEmitterListenerCount =
require('events').EventEmitter.listenerCount ||
function(emitter, type) { return emitter.listeners(type).length; };
exports.bufferAllocUnsafe = Buffer.allocUnsafe ?
Buffer.allocUnsafe :
function oldBufferAllocUnsafe(size) { return new Buffer(size); };
exports.bufferFromString = Buffer.from ?
Buffer.from :
function oldBufferFromString(string, encoding) {
return new Buffer(string, encoding);
};
exports.BufferingLogger = function createBufferingLogger(identifier, uniqueID) {
try {
var logFunction = require('debug')(identifier);
}
catch(e) {
logFunction = noop;
logFunction.enabled = false;
}
if (logFunction.enabled) {
var logger = new BufferingLogger(identifier, uniqueID, logFunction);
var debug = logger.log.bind(logger);
debug.printOutput = logger.printOutput.bind(logger);
debug.enabled = logFunction.enabled;
return debug;
}
logFunction.printOutput = noop;
return logFunction;
};
function BufferingLogger(identifier, uniqueID, logFunction) {
this.logFunction = logFunction;
this.identifier = identifier;
this.uniqueID = uniqueID;
this.buffer = [];
}
BufferingLogger.prototype.log = function() {
this.buffer.push([ new Date(), Array.prototype.slice.call(arguments) ]);
return this;
};
BufferingLogger.prototype.clear = function() {
this.buffer = [];
return this;
};
BufferingLogger.prototype.printOutput = function(logFunction) {
if (!logFunction) { logFunction = this.logFunction; }
var uniqueID = this.uniqueID;
this.buffer.forEach(function(entry) {
var date = entry[0].toLocaleString();
var args = entry[1].slice();
var formatString = args[0];
if (formatString !== (void 0) && formatString !== null) {
formatString = '%s - %s - ' + formatString.toString();
args.splice(0, 1, formatString, date, uniqueID);
logFunction.apply(global, args);
}
});
};

View File

@@ -0,0 +1,14 @@
/*
* Copyright (C) NGINX, Inc.
*/
'use strict';
module.exports = {
'server' : require('./websocket_server'),
'router' : require('./websocket_router'),
'frame' : require('./websocket_frame'),
'request' : require('./websocket_request'),
'connection' : require('./websocket_connection'),
};

View File

@@ -0,0 +1,683 @@
/************************************************************************
* Copyright 2010-2015 Brian McKelvey.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***********************************************************************/
var util = require('util');
var utils = require('./utils');
var unit_lib = require('./build/Release/unit-http');
var EventEmitter = require('events').EventEmitter;
var WebSocketFrame = require('./websocket_frame');
var bufferAllocUnsafe = utils.bufferAllocUnsafe;
var bufferFromString = utils.bufferFromString;
// Connected, fully-open, ready to send and receive frames
const STATE_OPEN = 'open';
// Received a close frame from the remote peer
const STATE_PEER_REQUESTED_CLOSE = 'peer_requested_close';
// Sent close frame to remote peer. No further data can be sent.
const STATE_ENDING = 'ending';
// Connection is fully closed. No further data can be sent or received.
const STATE_CLOSED = 'closed';
var idCounter = 0;
function WebSocketConnection(socket, extensions, protocol, maskOutgoingPackets, config) {
this._debug = utils.BufferingLogger('websocket:connection', ++idCounter);
this._debug('constructor');
if (this._debug.enabled) {
instrumentSocketForDebugging(this, socket);
}
// Superclass Constructor
EventEmitter.call(this);
this._pingListenerCount = 0;
this.on('newListener', function(ev) {
if (ev === 'ping'){
this._pingListenerCount++;
}
}).on('removeListener', function(ev) {
if (ev === 'ping') {
this._pingListenerCount--;
}
});
this.config = config;
this.socket = socket;
this.protocol = protocol;
this.extensions = extensions;
this.remoteAddress = socket.remoteAddress;
this.closeReasonCode = -1;
this.closeDescription = null;
this.closeEventEmitted = false;
// We have to mask outgoing packets if we're acting as a WebSocket client.
this.maskOutgoingPackets = maskOutgoingPackets;
this.fragmentationSize = 0; // data received so far...
this.frameQueue = [];
// Various bits of connection state
this.connected = true;
this.state = STATE_OPEN;
this.waitingForCloseResponse = false;
// Received TCP FIN, socket's readable stream is finished.
this.receivedEnd = false;
this.closeTimeout = this.config.closeTimeout;
this.assembleFragments = this.config.assembleFragments;
this.maxReceivedMessageSize = this.config.maxReceivedMessageSize;
this.outputBufferFull = false;
this.inputPaused = false;
this._closeTimerHandler = this.handleCloseTimer.bind(this);
// Disable nagle algorithm?
this.socket.setNoDelay(this.config.disableNagleAlgorithm);
// Make sure there is no socket inactivity timeout
this.socket.setTimeout(0);
// The HTTP Client seems to subscribe to socket error events
// and re-dispatch them in such a way that doesn't make sense
// for users of our client, so we want to make sure nobody
// else is listening for error events on the socket besides us.
this.socket.removeAllListeners('error');
this._set_sock(this.socket);
}
WebSocketConnection.prototype._set_sock = unit_lib.websocket_set_sock;
WebSocketConnection.prototype._end = unit_lib.response_end;
WebSocketConnection.CLOSE_REASON_NORMAL = 1000;
WebSocketConnection.CLOSE_REASON_GOING_AWAY = 1001;
WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR = 1002;
WebSocketConnection.CLOSE_REASON_UNPROCESSABLE_INPUT = 1003;
WebSocketConnection.CLOSE_REASON_RESERVED = 1004; // Reserved value. Undefined meaning.
WebSocketConnection.CLOSE_REASON_NOT_PROVIDED = 1005; // Not to be used on the wire
WebSocketConnection.CLOSE_REASON_ABNORMAL = 1006; // Not to be used on the wire
WebSocketConnection.CLOSE_REASON_INVALID_DATA = 1007;
WebSocketConnection.CLOSE_REASON_POLICY_VIOLATION = 1008;
WebSocketConnection.CLOSE_REASON_MESSAGE_TOO_BIG = 1009;
WebSocketConnection.CLOSE_REASON_EXTENSION_REQUIRED = 1010;
WebSocketConnection.CLOSE_REASON_INTERNAL_SERVER_ERROR = 1011;
WebSocketConnection.CLOSE_REASON_TLS_HANDSHAKE_FAILED = 1015; // Not to be used on the wire
WebSocketConnection.CLOSE_DESCRIPTIONS = {
1000: 'Normal connection closure',
1001: 'Remote peer is going away',
1002: 'Protocol error',
1003: 'Unprocessable input',
1004: 'Reserved',
1005: 'Reason not provided',
1006: 'Abnormal closure, no further detail available',
1007: 'Invalid data received',
1008: 'Policy violation',
1009: 'Message too big',
1010: 'Extension requested by client is required',
1011: 'Internal Server Error',
1015: 'TLS Handshake Failed'
};
function validateCloseReason(code) {
if (code < 1000) {
// Status codes in the range 0-999 are not used
return false;
}
if (code >= 1000 && code <= 2999) {
// Codes from 1000 - 2999 are reserved for use by the protocol. Only
// a few codes are defined, all others are currently illegal.
return [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011, 1012, 1013, 1014].indexOf(code) !== -1;
}
if (code >= 3000 && code <= 3999) {
// Reserved for use by libraries, frameworks, and applications.
// Should be registered with IANA. Interpretation of these codes is
// undefined by the WebSocket protocol.
return true;
}
if (code >= 4000 && code <= 4999) {
// Reserved for private use. Interpretation of these codes is
// undefined by the WebSocket protocol.
return true;
}
if (code >= 5000) {
return false;
}
}
util.inherits(WebSocketConnection, EventEmitter);
WebSocketConnection.prototype._addSocketEventListeners = function() {
this.socket.on('error', this.handleSocketError.bind(this));
this.socket.on('end', this.handleSocketEnd.bind(this));
this.socket.on('close', this.handleSocketClose.bind(this));
};
WebSocketConnection.prototype.handleSocketError = function(error) {
this._debug('handleSocketError: %j', error);
if (this.state === STATE_CLOSED) {
// See https://github.com/theturtle32/WebSocket-Node/issues/288
this._debug(' --- Socket \'error\' after \'close\'');
return;
}
this.closeReasonCode = WebSocketConnection.CLOSE_REASON_ABNORMAL;
this.closeDescription = 'Socket Error: ' + error.syscall + ' ' + error.code;
this.connected = false;
this.state = STATE_CLOSED;
this.fragmentationSize = 0;
if (utils.eventEmitterListenerCount(this, 'error') > 0) {
this.emit('error', error);
}
this.socket.destroy(error);
this._debug.printOutput();
this._end();
};
WebSocketConnection.prototype.handleSocketEnd = function() {
this._debug('handleSocketEnd: received socket end. state = %s', this.state);
this.receivedEnd = true;
if (this.state === STATE_CLOSED) {
// When using the TLS module, sometimes the socket will emit 'end'
// after it emits 'close'. I don't think that's correct behavior,
// but we should deal with it gracefully by ignoring it.
this._debug(' --- Socket \'end\' after \'close\'');
return;
}
if (this.state !== STATE_PEER_REQUESTED_CLOSE &&
this.state !== STATE_ENDING) {
this._debug(' --- UNEXPECTED socket end.');
this.socket.end();
this._end();
}
};
WebSocketConnection.prototype.handleSocketClose = function(hadError) {
this._debug('handleSocketClose: received socket close');
this.socketHadError = hadError;
this.connected = false;
this.state = STATE_CLOSED;
// If closeReasonCode is still set to -1 at this point then we must
// not have received a close frame!!
if (this.closeReasonCode === -1) {
this.closeReasonCode = WebSocketConnection.CLOSE_REASON_ABNORMAL;
this.closeDescription = 'Connection dropped by remote peer.';
}
this.clearCloseTimer();
if (!this.closeEventEmitted) {
this.closeEventEmitted = true;
this._debug('-- Emitting WebSocketConnection close event');
this.emit('close', this.closeReasonCode, this.closeDescription);
}
};
WebSocketConnection.prototype.close = function(reasonCode, description) {
if (this.connected) {
this._debug('close: Initating clean WebSocket close sequence.');
if ('number' !== typeof reasonCode) {
reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL;
}
if (!validateCloseReason(reasonCode)) {
throw new Error('Close code ' + reasonCode + ' is not valid.');
}
if ('string' !== typeof description) {
description = WebSocketConnection.CLOSE_DESCRIPTIONS[reasonCode];
}
this.closeReasonCode = reasonCode;
this.closeDescription = description;
this.setCloseTimer();
this.sendCloseFrame(this.closeReasonCode, this.closeDescription);
this.state = STATE_ENDING;
this.connected = false;
}
};
WebSocketConnection.prototype.drop = function(reasonCode, description, skipCloseFrame) {
this._debug('drop');
if (typeof(reasonCode) !== 'number') {
reasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR;
}
if (typeof(description) !== 'string') {
// If no description is provided, try to look one up based on the
// specified reasonCode.
description = WebSocketConnection.CLOSE_DESCRIPTIONS[reasonCode];
}
this._debug('Forcefully dropping connection. skipCloseFrame: %s, code: %d, description: %s',
skipCloseFrame, reasonCode, description
);
this.closeReasonCode = reasonCode;
this.closeDescription = description;
this.frameQueue = [];
this.fragmentationSize = 0;
if (!skipCloseFrame) {
this.sendCloseFrame(reasonCode, description);
}
this.connected = false;
this.state = STATE_CLOSED;
this.clearCloseTimer();
if (!this.closeEventEmitted) {
this.closeEventEmitted = true;
this._debug('Emitting WebSocketConnection close event');
this.emit('close', this.closeReasonCode, this.closeDescription);
}
this._debug('Drop: destroying socket');
this.socket.destroy();
this._end();
};
WebSocketConnection.prototype.setCloseTimer = function() {
this._debug('setCloseTimer');
this.clearCloseTimer();
this._debug('Setting close timer');
this.waitingForCloseResponse = true;
this.closeTimer = setTimeout(this._closeTimerHandler, this.closeTimeout);
};
WebSocketConnection.prototype.clearCloseTimer = function() {
this._debug('clearCloseTimer');
if (this.closeTimer) {
this._debug('Clearing close timer');
clearTimeout(this.closeTimer);
this.waitingForCloseResponse = false;
this.closeTimer = null;
}
};
WebSocketConnection.prototype.handleCloseTimer = function() {
this._debug('handleCloseTimer');
this.closeTimer = null;
if (this.waitingForCloseResponse) {
this._debug('Close response not received from client. Forcing socket end.');
this.waitingForCloseResponse = false;
this.state = STATE_CLOSED;
this.socket.end();
this._end();
}
};
WebSocketConnection.prototype.processFrame = function(frame) {
if (!this.connected) {
return;
}
this._debug('processFrame');
this._debug(' -- frame: %s', frame);
// Any non-control opcode besides 0x00 (continuation) received in the
// middle of a fragmented message is illegal.
if (this.frameQueue.length !== 0 && (frame.opcode > 0x00 && frame.opcode < 0x08)) {
this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
'Illegal frame opcode 0x' + frame.opcode.toString(16) + ' ' +
'received in middle of fragmented message.');
return;
}
switch(frame.opcode) {
case 0x02: // WebSocketFrame.BINARY_FRAME
this._debug('-- Binary Frame');
if (this.assembleFragments) {
if (frame.fin) {
// Complete single-frame message received
this._debug('---- Emitting \'message\' event');
this.emit('message', {
type: 'binary',
binaryData: frame.binaryPayload
});
}
else {
// beginning of a fragmented message
this.frameQueue.push(frame);
this.fragmentationSize = frame.length;
}
}
break;
case 0x01: // WebSocketFrame.TEXT_FRAME
this._debug('-- Text Frame');
if (this.assembleFragments) {
if (frame.fin) {
// Complete single-frame message received
this._debug('---- Emitting \'message\' event');
this.emit('message', {
type: 'utf8',
utf8Data: frame.binaryPayload.toString('utf8')
});
}
else {
// beginning of a fragmented message
this.frameQueue.push(frame);
this.fragmentationSize = frame.length;
}
}
break;
case 0x00: // WebSocketFrame.CONTINUATION
this._debug('-- Continuation Frame');
if (this.assembleFragments) {
if (this.frameQueue.length === 0) {
this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
'Unexpected Continuation Frame');
return;
}
this.fragmentationSize += frame.length;
if (this.fragmentationSize > this.maxReceivedMessageSize) {
this.drop(WebSocketConnection.CLOSE_REASON_MESSAGE_TOO_BIG,
'Maximum message size exceeded.');
return;
}
this.frameQueue.push(frame);
if (frame.fin) {
// end of fragmented message, so we process the whole
// message now. We also have to decode the utf-8 data
// for text frames after combining all the fragments.
var bytesCopied = 0;
var binaryPayload = bufferAllocUnsafe(this.fragmentationSize);
var opcode = this.frameQueue[0].opcode;
this.frameQueue.forEach(function (currentFrame) {
currentFrame.binaryPayload.copy(binaryPayload, bytesCopied);
bytesCopied += currentFrame.binaryPayload.length;
});
this.frameQueue = [];
this.fragmentationSize = 0;
switch (opcode) {
case 0x02: // WebSocketOpcode.BINARY_FRAME
this.emit('message', {
type: 'binary',
binaryData: binaryPayload
});
break;
case 0x01: // WebSocketOpcode.TEXT_FRAME
this.emit('message', {
type: 'utf8',
utf8Data: binaryPayload.toString('utf8')
});
break;
default:
this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
'Unexpected first opcode in fragmentation sequence: 0x' + opcode.toString(16));
return;
}
}
}
break;
case 0x09: // WebSocketFrame.PING
this._debug('-- Ping Frame');
if (this._pingListenerCount > 0) {
// logic to emit the ping frame: this is only done when a listener is known to exist
// Expose a function allowing the user to override the default ping() behavior
var cancelled = false;
var cancel = function() {
cancelled = true;
};
this.emit('ping', cancel, frame.binaryPayload);
// Only send a pong if the client did not indicate that he would like to cancel
if (!cancelled) {
this.pong(frame.binaryPayload);
}
}
else {
this.pong(frame.binaryPayload);
}
break;
case 0x0A: // WebSocketFrame.PONG
this._debug('-- Pong Frame');
this.emit('pong', frame.binaryPayload);
break;
case 0x08: // WebSocketFrame.CONNECTION_CLOSE
this._debug('-- Close Frame');
if (this.waitingForCloseResponse) {
// Got response to our request to close the connection.
// Close is complete, so we just hang up.
this._debug('---- Got close response from peer. Completing closing handshake.');
this.clearCloseTimer();
this.waitingForCloseResponse = false;
this.state = STATE_CLOSED;
this.socket.end();
this._end();
return;
}
this._debug('---- Closing handshake initiated by peer.');
// Got request from other party to close connection.
// Send back acknowledgement and then hang up.
this.state = STATE_PEER_REQUESTED_CLOSE;
var respondCloseReasonCode;
// Make sure the close reason provided is legal according to
// the protocol spec. Providing no close status is legal.
// WebSocketFrame sets closeStatus to -1 by default, so if it
// is still -1, then no status was provided.
if (frame.invalidCloseFrameLength) {
this.closeReasonCode = 1005; // 1005 = No reason provided.
respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR;
}
else if (frame.closeStatus === -1 || validateCloseReason(frame.closeStatus)) {
this.closeReasonCode = frame.closeStatus;
respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_NORMAL;
}
else {
this.closeReasonCode = frame.closeStatus;
respondCloseReasonCode = WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR;
}
// If there is a textual description in the close frame, extract it.
if (frame.binaryPayload.length > 1) {
this.closeDescription = frame.binaryPayload.toString('utf8');
}
else {
this.closeDescription = WebSocketConnection.CLOSE_DESCRIPTIONS[this.closeReasonCode];
}
this._debug(
'------ Remote peer %s - code: %d - %s - close frame payload length: %d',
this.remoteAddress, this.closeReasonCode,
this.closeDescription, frame.length
);
this._debug('------ responding to remote peer\'s close request.');
this.drop(respondCloseReasonCode, null);
this.connected = false;
break;
default:
this._debug('-- Unrecognized Opcode %d', frame.opcode);
this.drop(WebSocketConnection.CLOSE_REASON_PROTOCOL_ERROR,
'Unrecognized Opcode: 0x' + frame.opcode.toString(16));
break;
}
};
WebSocketConnection.prototype.send = function(data, cb) {
this._debug('send');
if (Buffer.isBuffer(data)) {
this.sendBytes(data, cb);
}
else if (typeof(data['toString']) === 'function') {
this.sendUTF(data, cb);
}
else {
throw new Error('Data provided must either be a Node Buffer or implement toString()');
}
};
WebSocketConnection.prototype.sendUTF = function(data, cb) {
data = bufferFromString(data.toString(), 'utf8');
this._debug('sendUTF: %d bytes', data.length);
var frame = new WebSocketFrame();
frame.opcode = 0x01; // WebSocketOpcode.TEXT_FRAME
frame.binaryPayload = data;
this.fragmentAndSend(frame, cb);
};
WebSocketConnection.prototype.sendBytes = function(data, cb) {
this._debug('sendBytes');
if (!Buffer.isBuffer(data)) {
throw new Error('You must pass a Node Buffer object to WebSocketConnection.prototype.sendBytes()');
}
var frame = new WebSocketFrame();
frame.opcode = 0x02; // WebSocketOpcode.BINARY_FRAME
frame.binaryPayload = data;
this.fragmentAndSend(frame, cb);
};
WebSocketConnection.prototype.ping = function(data) {
this._debug('ping');
var frame = new WebSocketFrame();
frame.opcode = 0x09; // WebSocketOpcode.PING
frame.fin = true;
if (data) {
if (!Buffer.isBuffer(data)) {
data = bufferFromString(data.toString(), 'utf8');
}
if (data.length > 125) {
this._debug('WebSocket: Data for ping is longer than 125 bytes. Truncating.');
data = data.slice(0,124);
}
frame.binaryPayload = data;
}
this.sendFrame(frame);
};
// Pong frames have to echo back the contents of the data portion of the
// ping frame exactly, byte for byte.
WebSocketConnection.prototype.pong = function(binaryPayload) {
this._debug('pong');
var frame = new WebSocketFrame();
frame.opcode = 0x0A; // WebSocketOpcode.PONG
if (Buffer.isBuffer(binaryPayload) && binaryPayload.length > 125) {
this._debug('WebSocket: Data for pong is longer than 125 bytes. Truncating.');
binaryPayload = binaryPayload.slice(0,124);
}
frame.binaryPayload = binaryPayload;
frame.fin = true;
this.sendFrame(frame);
};
WebSocketConnection.prototype.fragmentAndSend = function(frame, cb) {
this._debug('fragmentAndSend');
if (frame.opcode > 0x07) {
throw new Error('You cannot fragment control frames.');
}
var threshold = this.config.fragmentationThreshold;
var length = frame.binaryPayload.length;
// Send immediately if fragmentation is disabled or the message is not
// larger than the fragmentation threshold.
if (!this.config.fragmentOutgoingMessages || (frame.binaryPayload && length <= threshold)) {
frame.fin = true;
this.sendFrame(frame, cb);
return;
}
var numFragments = Math.ceil(length / threshold);
var sentFragments = 0;
var sentCallback = function fragmentSentCallback(err) {
if (err) {
if (typeof cb === 'function') {
// pass only the first error
cb(err);
cb = null;
}
return;
}
++sentFragments;
if ((sentFragments === numFragments) && (typeof cb === 'function')) {
cb();
}
};
for (var i=1; i <= numFragments; i++) {
var currentFrame = new WebSocketFrame();
// continuation opcode except for first frame.
currentFrame.opcode = (i === 1) ? frame.opcode : 0x00;
// fin set on last frame only
currentFrame.fin = (i === numFragments);
// length is likely to be shorter on the last fragment
var currentLength = (i === numFragments) ? length - (threshold * (i-1)) : threshold;
var sliceStart = threshold * (i-1);
// Slice the right portion of the original payload
currentFrame.binaryPayload = frame.binaryPayload.slice(sliceStart, sliceStart + currentLength);
this.sendFrame(currentFrame, sentCallback);
}
};
WebSocketConnection.prototype.sendCloseFrame = function(reasonCode, description, cb) {
if (typeof(reasonCode) !== 'number') {
reasonCode = WebSocketConnection.CLOSE_REASON_NORMAL;
}
this._debug('sendCloseFrame state: %s, reasonCode: %d, description: %s', this.state, reasonCode, description);
if (this.state !== STATE_OPEN && this.state !== STATE_PEER_REQUESTED_CLOSE) { return; }
var frame = new WebSocketFrame();
frame.fin = true;
frame.opcode = 0x08; // WebSocketOpcode.CONNECTION_CLOSE
frame.closeStatus = reasonCode;
if (typeof(description) === 'string') {
frame.binaryPayload = bufferFromString(description, 'utf8');
}
this.sendFrame(frame, cb);
this.socket.end();
};
WebSocketConnection.prototype._send_frame = unit_lib.websocket_send_frame;
WebSocketConnection.prototype.sendFrame = function(frame, cb) {
this._debug('sendFrame');
frame.mask = this.maskOutgoingPackets;
this._send_frame(frame);
if (typeof cb === 'function') {
cb();
}
var flushed = 0; // this.socket.write(frame.toBuffer(), cb);
this.outputBufferFull = !flushed;
return flushed;
};
module.exports = WebSocketConnection;

View File

@@ -0,0 +1,11 @@
/*
* Copyright (C) NGINX, Inc.
*/
'use strict';
function WebSocketFrame() {
}
module.exports = WebSocketFrame;

View File

@@ -0,0 +1,509 @@
/************************************************************************
* Copyright 2010-2015 Brian McKelvey.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***********************************************************************/
var util = require('util');
var url = require('url');
var EventEmitter = require('events').EventEmitter;
var WebSocketConnection = require('./websocket_connection');
var headerValueSplitRegExp = /,\s*/;
var headerParamSplitRegExp = /;\s*/;
var headerSanitizeRegExp = /[\r\n]/g;
var xForwardedForSeparatorRegExp = /,\s*/;
var separators = [
'(', ')', '<', '>', '@',
',', ';', ':', '\\', '\"',
'/', '[', ']', '?', '=',
'{', '}', ' ', String.fromCharCode(9)
];
var controlChars = [String.fromCharCode(127) /* DEL */];
for (var i=0; i < 31; i ++) {
/* US-ASCII Control Characters */
controlChars.push(String.fromCharCode(i));
}
var cookieNameValidateRegEx = /([\x00-\x20\x22\x28\x29\x2c\x2f\x3a-\x3f\x40\x5b-\x5e\x7b\x7d\x7f])/;
var cookieValueValidateRegEx = /[^\x21\x23-\x2b\x2d-\x3a\x3c-\x5b\x5d-\x7e]/;
var cookieValueDQuoteValidateRegEx = /^"[^"]*"$/;
var controlCharsAndSemicolonRegEx = /[\x00-\x20\x3b]/g;
var cookieSeparatorRegEx = /[;,] */;
var httpStatusDescriptions = {
100: 'Continue',
101: 'Switching Protocols',
200: 'OK',
201: 'Created',
203: 'Non-Authoritative Information',
204: 'No Content',
205: 'Reset Content',
206: 'Partial Content',
300: 'Multiple Choices',
301: 'Moved Permanently',
302: 'Found',
303: 'See Other',
304: 'Not Modified',
305: 'Use Proxy',
307: 'Temporary Redirect',
400: 'Bad Request',
401: 'Unauthorized',
402: 'Payment Required',
403: 'Forbidden',
404: 'Not Found',
406: 'Not Acceptable',
407: 'Proxy Authorization Required',
408: 'Request Timeout',
409: 'Conflict',
410: 'Gone',
411: 'Length Required',
412: 'Precondition Failed',
413: 'Request Entity Too Long',
414: 'Request-URI Too Long',
415: 'Unsupported Media Type',
416: 'Requested Range Not Satisfiable',
417: 'Expectation Failed',
426: 'Upgrade Required',
500: 'Internal Server Error',
501: 'Not Implemented',
502: 'Bad Gateway',
503: 'Service Unavailable',
504: 'Gateway Timeout',
505: 'HTTP Version Not Supported'
};
function WebSocketRequest(socket, httpRequest, serverConfig) {
// Superclass Constructor
EventEmitter.call(this);
this.socket = socket;
this.httpRequest = httpRequest;
this.resource = httpRequest.url;
this.remoteAddress = socket.remoteAddress;
this.remoteAddresses = [this.remoteAddress];
this.serverConfig = serverConfig;
// Watch for the underlying TCP socket closing before we call accept
this._socketIsClosing = false;
this._socketCloseHandler = this._handleSocketCloseBeforeAccept.bind(this);
this.socket.on('end', this._socketCloseHandler);
this.socket.on('close', this._socketCloseHandler);
this._resolved = false;
}
util.inherits(WebSocketRequest, EventEmitter);
WebSocketRequest.prototype.readHandshake = function() {
var self = this;
var request = this.httpRequest;
// Decode URL
this.resourceURL = url.parse(this.resource, true);
this.host = request.headers['host'];
if (!this.host) {
throw new Error('Client must provide a Host header.');
}
this.key = request.headers['sec-websocket-key'];
if (!this.key) {
throw new Error('Client must provide a value for Sec-WebSocket-Key.');
}
this.webSocketVersion = parseInt(request.headers['sec-websocket-version'], 10);
if (!this.webSocketVersion || isNaN(this.webSocketVersion)) {
throw new Error('Client must provide a value for Sec-WebSocket-Version.');
}
switch (this.webSocketVersion) {
case 8:
case 13:
break;
default:
var e = new Error('Unsupported websocket client version: ' + this.webSocketVersion +
'Only versions 8 and 13 are supported.');
e.httpCode = 426;
e.headers = {
'Sec-WebSocket-Version': '13'
};
throw e;
}
if (this.webSocketVersion === 13) {
this.origin = request.headers['origin'];
}
else if (this.webSocketVersion === 8) {
this.origin = request.headers['sec-websocket-origin'];
}
// Protocol is optional.
var protocolString = request.headers['sec-websocket-protocol'];
this.protocolFullCaseMap = {};
this.requestedProtocols = [];
if (protocolString) {
var requestedProtocolsFullCase = protocolString.split(headerValueSplitRegExp);
requestedProtocolsFullCase.forEach(function(protocol) {
var lcProtocol = protocol.toLocaleLowerCase();
self.requestedProtocols.push(lcProtocol);
self.protocolFullCaseMap[lcProtocol] = protocol;
});
}
if (!this.serverConfig.ignoreXForwardedFor &&
request.headers['x-forwarded-for']) {
var immediatePeerIP = this.remoteAddress;
this.remoteAddresses = request.headers['x-forwarded-for']
.split(xForwardedForSeparatorRegExp);
this.remoteAddresses.push(immediatePeerIP);
this.remoteAddress = this.remoteAddresses[0];
}
// Extensions are optional.
var extensionsString = request.headers['sec-websocket-extensions'];
this.requestedExtensions = this.parseExtensions(extensionsString);
// Cookies are optional
var cookieString = request.headers['cookie'];
this.cookies = this.parseCookies(cookieString);
};
WebSocketRequest.prototype.parseExtensions = function(extensionsString) {
if (!extensionsString || extensionsString.length === 0) {
return [];
}
var extensions = extensionsString.toLocaleLowerCase().split(headerValueSplitRegExp);
extensions.forEach(function(extension, index, array) {
var params = extension.split(headerParamSplitRegExp);
var extensionName = params[0];
var extensionParams = params.slice(1);
extensionParams.forEach(function(rawParam, index, array) {
var arr = rawParam.split('=');
var obj = {
name: arr[0],
value: arr[1]
};
array.splice(index, 1, obj);
});
var obj = {
name: extensionName,
params: extensionParams
};
array.splice(index, 1, obj);
});
return extensions;
};
// This function adapted from node-cookie
// https://github.com/shtylman/node-cookie
WebSocketRequest.prototype.parseCookies = function(str) {
// Sanity Check
if (!str || typeof(str) !== 'string') {
return [];
}
var cookies = [];
var pairs = str.split(cookieSeparatorRegEx);
pairs.forEach(function(pair) {
var eq_idx = pair.indexOf('=');
if (eq_idx === -1) {
cookies.push({
name: pair,
value: null
});
return;
}
var key = pair.substr(0, eq_idx).trim();
var val = pair.substr(++eq_idx, pair.length).trim();
// quoted values
if ('"' === val[0]) {
val = val.slice(1, -1);
}
cookies.push({
name: key,
value: decodeURIComponent(val)
});
});
return cookies;
};
WebSocketRequest.prototype.accept = function(acceptedProtocol, allowedOrigin, cookies) {
this._verifyResolution();
// TODO: Handle extensions
var protocolFullCase;
if (acceptedProtocol) {
protocolFullCase = this.protocolFullCaseMap[acceptedProtocol.toLocaleLowerCase()];
if (typeof(protocolFullCase) === 'undefined') {
protocolFullCase = acceptedProtocol;
}
}
else {
protocolFullCase = acceptedProtocol;
}
this.protocolFullCaseMap = null;
var response = this.httpRequest._response;
response.statusCode = 101;
if (protocolFullCase) {
// validate protocol
for (var i=0; i < protocolFullCase.length; i++) {
var charCode = protocolFullCase.charCodeAt(i);
var character = protocolFullCase.charAt(i);
if (charCode < 0x21 || charCode > 0x7E || separators.indexOf(character) !== -1) {
this.reject(500);
throw new Error('Illegal character "' + String.fromCharCode(character) + '" in subprotocol.');
}
}
if (this.requestedProtocols.indexOf(acceptedProtocol) === -1) {
this.reject(500);
throw new Error('Specified protocol was not requested by the client.');
}
protocolFullCase = protocolFullCase.replace(headerSanitizeRegExp, '');
response += 'Sec-WebSocket-Protocol: ' + protocolFullCase + '\r\n';
}
this.requestedProtocols = null;
if (allowedOrigin) {
allowedOrigin = allowedOrigin.replace(headerSanitizeRegExp, '');
if (this.webSocketVersion === 13) {
response.setHeader('Origin', allowedOrigin);
}
else if (this.webSocketVersion === 8) {
response.setHeader('Sec-WebSocket-Origin', allowedOrigin);
}
}
if (cookies) {
if (!Array.isArray(cookies)) {
this.reject(500);
throw new Error('Value supplied for "cookies" argument must be an array.');
}
var seenCookies = {};
cookies.forEach(function(cookie) {
if (!cookie.name || !cookie.value) {
this.reject(500);
throw new Error('Each cookie to set must at least provide a "name" and "value"');
}
// Make sure there are no \r\n sequences inserted
cookie.name = cookie.name.replace(controlCharsAndSemicolonRegEx, '');
cookie.value = cookie.value.replace(controlCharsAndSemicolonRegEx, '');
if (seenCookies[cookie.name]) {
this.reject(500);
throw new Error('You may not specify the same cookie name twice.');
}
seenCookies[cookie.name] = true;
// token (RFC 2616, Section 2.2)
var invalidChar = cookie.name.match(cookieNameValidateRegEx);
if (invalidChar) {
this.reject(500);
throw new Error('Illegal character ' + invalidChar[0] + ' in cookie name');
}
// RFC 6265, Section 4.1.1
// *cookie-octet / ( DQUOTE *cookie-octet DQUOTE ) | %x21 / %x23-2B / %x2D-3A / %x3C-5B / %x5D-7E
if (cookie.value.match(cookieValueDQuoteValidateRegEx)) {
invalidChar = cookie.value.slice(1, -1).match(cookieValueValidateRegEx);
} else {
invalidChar = cookie.value.match(cookieValueValidateRegEx);
}
if (invalidChar) {
this.reject(500);
throw new Error('Illegal character ' + invalidChar[0] + ' in cookie value');
}
var cookieParts = [cookie.name + '=' + cookie.value];
// RFC 6265, Section 4.1.1
// 'Path=' path-value | <any CHAR except CTLs or ';'>
if(cookie.path){
invalidChar = cookie.path.match(controlCharsAndSemicolonRegEx);
if (invalidChar) {
this.reject(500);
throw new Error('Illegal character ' + invalidChar[0] + ' in cookie path');
}
cookieParts.push('Path=' + cookie.path);
}
// RFC 6265, Section 4.1.2.3
// 'Domain=' subdomain
if (cookie.domain) {
if (typeof(cookie.domain) !== 'string') {
this.reject(500);
throw new Error('Domain must be specified and must be a string.');
}
invalidChar = cookie.domain.match(controlCharsAndSemicolonRegEx);
if (invalidChar) {
this.reject(500);
throw new Error('Illegal character ' + invalidChar[0] + ' in cookie domain');
}
cookieParts.push('Domain=' + cookie.domain.toLowerCase());
}
// RFC 6265, Section 4.1.1
//'Expires=' sane-cookie-date | Force Date object requirement by using only epoch
if (cookie.expires) {
if (!(cookie.expires instanceof Date)){
this.reject(500);
throw new Error('Value supplied for cookie "expires" must be a vaild date object');
}
cookieParts.push('Expires=' + cookie.expires.toGMTString());
}
// RFC 6265, Section 4.1.1
//'Max-Age=' non-zero-digit *DIGIT
if (cookie.maxage) {
var maxage = cookie.maxage;
if (typeof(maxage) === 'string') {
maxage = parseInt(maxage, 10);
}
if (isNaN(maxage) || maxage <= 0 ) {
this.reject(500);
throw new Error('Value supplied for cookie "maxage" must be a non-zero number');
}
maxage = Math.round(maxage);
cookieParts.push('Max-Age=' + maxage.toString(10));
}
// RFC 6265, Section 4.1.1
//'Secure;'
if (cookie.secure) {
if (typeof(cookie.secure) !== 'boolean') {
this.reject(500);
throw new Error('Value supplied for cookie "secure" must be of type boolean');
}
cookieParts.push('Secure');
}
// RFC 6265, Section 4.1.1
//'HttpOnly;'
if (cookie.httponly) {
if (typeof(cookie.httponly) !== 'boolean') {
this.reject(500);
throw new Error('Value supplied for cookie "httponly" must be of type boolean');
}
cookieParts.push('HttpOnly');
}
response.addHeader('Set-Cookie', cookieParts.join(';'));
}.bind(this));
}
// TODO: handle negotiated extensions
// if (negotiatedExtensions) {
// response += 'Sec-WebSocket-Extensions: ' + negotiatedExtensions.join(', ') + '\r\n';
// }
// Mark the request resolved now so that the user can't call accept or
// reject a second time.
this._resolved = true;
this.emit('requestResolved', this);
var connection = new WebSocketConnection(this.socket, [], acceptedProtocol, false, this.serverConfig);
connection.webSocketVersion = this.webSocketVersion;
connection.remoteAddress = this.remoteAddress;
connection.remoteAddresses = this.remoteAddresses;
var self = this;
if (this._socketIsClosing) {
// Handle case when the client hangs up before we get a chance to
// accept the connection and send our side of the opening handshake.
cleanupFailedConnection(connection);
} else {
response._sendHeaders();
connection._addSocketEventListeners();
}
this.emit('requestAccepted', connection);
return connection;
};
WebSocketRequest.prototype.reject = function(status, reason, extraHeaders) {
this._verifyResolution();
// Mark the request resolved now so that the user can't call accept or
// reject a second time.
this._resolved = true;
this.emit('requestResolved', this);
if (typeof(status) !== 'number') {
status = 403;
}
var response = this.httpRequest._response;
response.statusCode = status;
if (reason) {
reason = reason.replace(headerSanitizeRegExp, '');
response.addHeader('X-WebSocket-Reject-Reason', reason);
}
if (extraHeaders) {
for (var key in extraHeaders) {
var sanitizedValue = extraHeaders[key].toString().replace(headerSanitizeRegExp, '');
var sanitizedKey = key.replace(headerSanitizeRegExp, '');
response += (sanitizedKey + ': ' + sanitizedValue + '\r\n');
}
}
response.end();
this.emit('requestRejected', this);
};
WebSocketRequest.prototype._handleSocketCloseBeforeAccept = function() {
this._socketIsClosing = true;
this._removeSocketCloseListeners();
};
WebSocketRequest.prototype._removeSocketCloseListeners = function() {
this.socket.removeListener('end', this._socketCloseHandler);
this.socket.removeListener('close', this._socketCloseHandler);
};
WebSocketRequest.prototype._verifyResolution = function() {
if (this._resolved) {
throw new Error('WebSocketRequest may only be accepted or rejected one time.');
}
};
function cleanupFailedConnection(connection) {
// Since we have to return a connection object even if the socket is
// already dead in order not to break the API, we schedule a 'close'
// event on the connection object to occur immediately.
process.nextTick(function() {
// WebSocketConnection.CLOSE_REASON_ABNORMAL = 1006
// Third param: Skip sending the close frame to a dead socket
connection.drop(1006, 'TCP connection lost before handshake completed.', true);
});
}
module.exports = WebSocketRequest;

View File

@@ -0,0 +1,157 @@
/************************************************************************
* Copyright 2010-2015 Brian McKelvey.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***********************************************************************/
var extend = require('./utils').extend;
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var WebSocketRouterRequest = require('./websocket_router_request');
function WebSocketRouter(config) {
// Superclass Constructor
EventEmitter.call(this);
this.config = {
// The WebSocketServer instance to attach to.
server: null
};
if (config) {
extend(this.config, config);
}
this.handlers = [];
this._requestHandler = this.handleRequest.bind(this);
if (this.config.server) {
this.attachServer(this.config.server);
}
}
util.inherits(WebSocketRouter, EventEmitter);
WebSocketRouter.prototype.attachServer = function(server) {
if (server) {
this.server = server;
this.server.on('request', this._requestHandler);
}
else {
throw new Error('You must specify a WebSocketServer instance to attach to.');
}
};
WebSocketRouter.prototype.detachServer = function() {
if (this.server) {
this.server.removeListener('request', this._requestHandler);
this.server = null;
}
else {
throw new Error('Cannot detach from server: not attached.');
}
};
WebSocketRouter.prototype.mount = function(path, protocol, callback) {
if (!path) {
throw new Error('You must specify a path for this handler.');
}
if (!protocol) {
protocol = '____no_protocol____';
}
if (!callback) {
throw new Error('You must specify a callback for this handler.');
}
path = this.pathToRegExp(path);
if (!(path instanceof RegExp)) {
throw new Error('Path must be specified as either a string or a RegExp.');
}
var pathString = path.toString();
// normalize protocol to lower-case
protocol = protocol.toLocaleLowerCase();
if (this.findHandlerIndex(pathString, protocol) !== -1) {
throw new Error('You may only mount one handler per path/protocol combination.');
}
this.handlers.push({
'path': path,
'pathString': pathString,
'protocol': protocol,
'callback': callback
});
};
WebSocketRouter.prototype.unmount = function(path, protocol) {
var index = this.findHandlerIndex(this.pathToRegExp(path).toString(), protocol);
if (index !== -1) {
this.handlers.splice(index, 1);
}
else {
throw new Error('Unable to find a route matching the specified path and protocol.');
}
};
WebSocketRouter.prototype.findHandlerIndex = function(pathString, protocol) {
protocol = protocol.toLocaleLowerCase();
for (var i=0, len=this.handlers.length; i < len; i++) {
var handler = this.handlers[i];
if (handler.pathString === pathString && handler.protocol === protocol) {
return i;
}
}
return -1;
};
WebSocketRouter.prototype.pathToRegExp = function(path) {
if (typeof(path) === 'string') {
if (path === '*') {
path = /^.*$/;
}
else {
path = path.replace(/[-[\]{}()*+?.,\\^$|#\s]/g, '\\$&');
path = new RegExp('^' + path + '$');
}
}
return path;
};
WebSocketRouter.prototype.handleRequest = function(request) {
var requestedProtocols = request.requestedProtocols;
if (requestedProtocols.length === 0) {
requestedProtocols = ['____no_protocol____'];
}
// Find a handler with the first requested protocol first
for (var i=0; i < requestedProtocols.length; i++) {
var requestedProtocol = requestedProtocols[i].toLocaleLowerCase();
// find the first handler that can process this request
for (var j=0, len=this.handlers.length; j < len; j++) {
var handler = this.handlers[j];
if (handler.path.test(request.resourceURL.pathname)) {
if (requestedProtocol === handler.protocol ||
handler.protocol === '*')
{
var routerRequest = new WebSocketRouterRequest(request, requestedProtocol);
handler.callback(routerRequest);
return;
}
}
}
}
// If we get here we were unable to find a suitable handler.
request.reject(404, 'No handler is available for the given request.');
};
module.exports = WebSocketRouter;

View File

@@ -0,0 +1,54 @@
/************************************************************************
* Copyright 2010-2015 Brian McKelvey.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***********************************************************************/
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function WebSocketRouterRequest(webSocketRequest, resolvedProtocol) {
// Superclass Constructor
EventEmitter.call(this);
this.webSocketRequest = webSocketRequest;
if (resolvedProtocol === '____no_protocol____') {
this.protocol = null;
}
else {
this.protocol = resolvedProtocol;
}
this.origin = webSocketRequest.origin;
this.resource = webSocketRequest.resource;
this.resourceURL = webSocketRequest.resourceURL;
this.httpRequest = webSocketRequest.httpRequest;
this.remoteAddress = webSocketRequest.remoteAddress;
this.webSocketVersion = webSocketRequest.webSocketVersion;
this.requestedExtensions = webSocketRequest.requestedExtensions;
this.cookies = webSocketRequest.cookies;
}
util.inherits(WebSocketRouterRequest, EventEmitter);
WebSocketRouterRequest.prototype.accept = function(origin, cookies) {
var connection = this.webSocketRequest.accept(this.protocol, origin, cookies);
this.emit('requestAccepted', connection);
return connection;
};
WebSocketRouterRequest.prototype.reject = function(status, reason, extraHeaders) {
this.webSocketRequest.reject(status, reason, extraHeaders);
this.emit('requestRejected', this);
};
module.exports = WebSocketRouterRequest;

View File

@@ -0,0 +1,213 @@
/************************************************************************
* Copyright 2010-2015 Brian McKelvey.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
***********************************************************************/
var extend = require('./utils').extend;
var utils = require('./utils');
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var WebSocketRequest = require('./websocket_request');
var WebSocketServer = function WebSocketServer(config) {
// Superclass Constructor
EventEmitter.call(this);
this._handlers = {
upgrade: this.handleUpgrade.bind(this),
requestAccepted: this.handleRequestAccepted.bind(this),
requestResolved: this.handleRequestResolved.bind(this)
};
this.connections = [];
this.pendingRequests = [];
if (config) {
this.mount(config);
}
};
util.inherits(WebSocketServer, EventEmitter);
WebSocketServer.prototype.mount = function(config) {
this.config = {
// The http server instance to attach to. Required.
httpServer: null,
// 64KiB max frame size.
maxReceivedFrameSize: 0x10000,
// 1MiB max message size, only applicable if
// assembleFragments is true
maxReceivedMessageSize: 0x100000,
// Outgoing messages larger than fragmentationThreshold will be
// split into multiple fragments.
fragmentOutgoingMessages: true,
// Outgoing frames are fragmented if they exceed this threshold.
// Default is 16KiB
fragmentationThreshold: 0x4000,
// If true, fragmented messages will be automatically assembled
// and the full message will be emitted via a 'message' event.
// If false, each frame will be emitted via a 'frame' event and
// the application will be responsible for aggregating multiple
// fragmented frames. Single-frame messages will emit a 'message'
// event in addition to the 'frame' event.
// Most users will want to leave this set to 'true'
assembleFragments: true,
// If this is true, websocket connections will be accepted
// regardless of the path and protocol specified by the client.
// The protocol accepted will be the first that was requested
// by the client. Clients from any origin will be accepted.
// This should only be used in the simplest of cases. You should
// probably leave this set to 'false' and inspect the request
// object to make sure it's acceptable before accepting it.
autoAcceptConnections: false,
// Whether or not the X-Forwarded-For header should be respected.
// It's important to set this to 'true' when accepting connections
// from untrusted clients, as a malicious client could spoof its
// IP address by simply setting this header. It's meant to be added
// by a trusted proxy or other intermediary within your own
// infrastructure.
// See: http://en.wikipedia.org/wiki/X-Forwarded-For
ignoreXForwardedFor: false,
// The Nagle Algorithm makes more efficient use of network resources
// by introducing a small delay before sending small packets so that
// multiple messages can be batched together before going onto the
// wire. This however comes at the cost of latency, so the default
// is to disable it. If you don't need low latency and are streaming
// lots of small messages, you can change this to 'false'
disableNagleAlgorithm: true,
// The number of milliseconds to wait after sending a close frame
// for an acknowledgement to come back before giving up and just
// closing the socket.
closeTimeout: 5000
};
extend(this.config, config);
if (this.config.httpServer) {
if (!Array.isArray(this.config.httpServer)) {
this.config.httpServer = [this.config.httpServer];
}
var upgradeHandler = this._handlers.upgrade;
this.config.httpServer.forEach(function(httpServer) {
httpServer.on('upgrade', upgradeHandler);
});
}
else {
throw new Error('You must specify an httpServer on which to mount the WebSocket server.');
}
};
WebSocketServer.prototype.unmount = function() {
var upgradeHandler = this._handlers.upgrade;
this.config.httpServer.forEach(function(httpServer) {
httpServer.removeListener('upgrade', upgradeHandler);
});
};
WebSocketServer.prototype.closeAllConnections = function() {
this.connections.forEach(function(connection) {
connection.close();
});
this.pendingRequests.forEach(function(request) {
process.nextTick(function() {
request.reject(503); // HTTP 503 Service Unavailable
});
});
};
WebSocketServer.prototype.broadcast = function(data) {
if (Buffer.isBuffer(data)) {
this.broadcastBytes(data);
}
else if (typeof(data.toString) === 'function') {
this.broadcastUTF(data);
}
};
WebSocketServer.prototype.broadcastUTF = function(utfData) {
this.connections.forEach(function(connection) {
connection.sendUTF(utfData);
});
};
WebSocketServer.prototype.broadcastBytes = function(binaryData) {
this.connections.forEach(function(connection) {
connection.sendBytes(binaryData);
});
};
WebSocketServer.prototype.shutDown = function() {
this.unmount();
this.closeAllConnections();
};
WebSocketServer.prototype.handleUpgrade = function(request, socket) {
var wsRequest = new WebSocketRequest(socket, request, this.config);
try {
wsRequest.readHandshake();
}
catch(e) {
wsRequest.reject(
e.httpCode ? e.httpCode : 400,
e.message,
e.headers
);
return;
}
this.pendingRequests.push(wsRequest);
wsRequest.once('requestAccepted', this._handlers.requestAccepted);
wsRequest.once('requestResolved', this._handlers.requestResolved);
if (!this.config.autoAcceptConnections && utils.eventEmitterListenerCount(this, 'request') > 0) {
this.emit('request', wsRequest);
}
else if (this.config.autoAcceptConnections) {
wsRequest.accept(wsRequest.requestedProtocols[0], wsRequest.origin);
}
else {
wsRequest.reject(404, 'No handler is configured to accept the connection.');
}
};
WebSocketServer.prototype.handleRequestAccepted = function(connection) {
var self = this;
connection.once('close', function(closeReason, description) {
self.handleConnectionClose(connection, closeReason, description);
});
this.connections.push(connection);
this.emit('connect', connection);
};
WebSocketServer.prototype.handleConnectionClose = function(connection, closeReason, description) {
var index = this.connections.indexOf(connection);
if (index !== -1) {
this.connections.splice(index, 1);
}
this.emit('close', connection, closeReason, description);
};
WebSocketServer.prototype.handleRequestResolved = function(request) {
var index = this.pendingRequests.indexOf(request);
if (index !== -1) { this.pendingRequests.splice(index, 1); }
};
module.exports = WebSocketServer;