Node.js: ServerRequest and ServerResponse compliance to Stream API.

ServerRequest now inherit stream Readable object.  ServerResponse
provides 'writable' property.

Thanks to Wu Jian Ping (@wujjpp).

This closes #274, closes #317 issues and closes #502 PR on GitHub.
This commit is contained in:
Max Romanov
2020-12-29 19:00:54 +03:00
parent 3abca42caf
commit d3d6864bdc
5 changed files with 94 additions and 89 deletions

View File

@@ -37,6 +37,13 @@ ability to specify multiple directories in the "path" option of Python apps.
</para>
</change>
<change type="feature">
<para>
ServerRequest and ServerResponse objects of Node.js module are now compliant
to Stream API.
</para>
</change>
<change type="bugfix">
<para>
invalid HTTP responses were generated for some unusual status codes.

View File

@@ -11,6 +11,7 @@ const util = require('util');
const unit_lib = require('./build/Release/unit-http');
const Socket = require('./socket');
const WebSocketFrame = require('./websocket_frame');
const Readable = require('stream').Readable;
function ServerResponse(req) {
@@ -23,6 +24,7 @@ function ServerResponse(req) {
req._response = this;
this.socket = req.socket;
this.connection = req.connection;
this.writable = true;
}
util.inherits(ServerResponse, EventEmitter);
@@ -268,6 +270,7 @@ ServerResponse.prototype._writeBody = function(chunk, encoding, callback) {
res = this._write(chunk, 0, contentLength);
if (res < contentLength) {
this.socket.writable = false;
this.writable = false;
o = new BufferedOutput(this, res, chunk, encoding, callback);
this.server._output.push(o);
@@ -328,6 +331,8 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
if (typeof callback === 'function') {
callback();
}
this.emit("finish");
});
this.finished = true;
@@ -337,15 +342,14 @@ ServerResponse.prototype.end = function end(chunk, encoding, callback) {
};
function ServerRequest(server, socket) {
EventEmitter.call(this);
Readable.call(this);
this.server = server;
this.socket = socket;
this.connection = socket;
this._pushed_eofchunk = false;
}
util.inherits(ServerRequest, EventEmitter);
ServerRequest.prototype.unpipe = undefined;
util.inherits(ServerRequest, Readable);
ServerRequest.prototype.setTimeout = function setTimeout(msecs, callback) {
this.timeout = msecs;
@@ -377,35 +381,21 @@ ServerRequest.prototype.STATUS_CODES = function STATUS_CODES() {
return http.STATUS_CODES;
};
ServerRequest.prototype.listeners = function listeners() {
return [];
};
ServerRequest.prototype._request_read = unit_lib.request_read;
ServerRequest.prototype.resume = function resume() {
return [];
};
ServerRequest.prototype._read = function _read(n) {
const b = this._request_read(n);
/*
* The "on" method is overridden to defer reading data until user code is
* ready, that is (ev === "data"). This can occur after req.emit("end") is
* executed, since the user code can be scheduled asynchronously by Promises
* and so on. Passing the data is postponed by process.nextTick() until
* the "on" method caller completes.
*/
ServerRequest.prototype.on = function on(ev, fn) {
Server.prototype.on.call(this, ev, fn);
if (ev === "data") {
process.nextTick(function () {
if (this._data.length !== 0) {
this.emit("data", this._data);
if (b != null) {
this.push(b);
}
}.bind(this));
if (!this._pushed_eofchunk && (b == null || b.length < n)) {
this._pushed_eofchunk = true;
this.push(null);
}
};
ServerRequest.prototype.addListener = ServerRequest.prototype.on;
function Server(requestListener) {
EventEmitter.call(this);
@@ -472,11 +462,6 @@ Server.prototype.emit_request = function (req, res) {
} else {
this.emit("request", req, res);
}
process.nextTick(() => {
req.emit("finish");
req.emit("end");
});
};
Server.prototype.emit_close = function () {
@@ -523,6 +508,7 @@ Server.prototype.emit_drain = function () {
}
resp.socket.writable = true;
resp.writable = true;
process.nextTick(() => {
resp.emit("drain");

View File

@@ -27,6 +27,7 @@ struct port_data_t {
struct req_data_t {
napi_ref sock_ref;
napi_ref req_ref;
napi_ref resp_ref;
napi_ref conn_ref;
};
@@ -65,6 +66,7 @@ Unit::init(napi_env env, napi_value exports)
constructor_ = napi.create_reference(ctor);
napi.set_named_property(exports, "Unit", ctor);
napi.set_named_property(exports, "request_read", request_read);
napi.set_named_property(exports, "response_send_headers",
response_send_headers);
napi.set_named_property(exports, "response_write", response_write);
@@ -206,7 +208,7 @@ Unit::request_handler(nxt_unit_request_info_t *req)
server_obj = get_server_object();
socket = create_socket(server_obj, req);
request = create_request(server_obj, socket);
request = create_request(server_obj, socket, req);
response = create_response(server_obj, request, req);
create_headers(req, request);
@@ -301,6 +303,7 @@ Unit::close_handler(nxt_unit_request_info_t *req)
nxt_napi::create(0));
remove_wrap(req_data->sock_ref);
remove_wrap(req_data->req_ref);
remove_wrap(req_data->resp_ref);
remove_wrap(req_data->conn_ref);
@@ -488,9 +491,8 @@ Unit::get_server_object()
void
Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
{
void *data;
uint32_t i;
napi_value headers, raw_headers, buffer;
napi_value headers, raw_headers;
napi_status status;
nxt_unit_request_t *r;
@@ -515,11 +517,6 @@ Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
set_named_property(request, "url", r->target, r->target_length);
set_named_property(request, "_websocket_handshake", r->websocket_handshake);
buffer = create_buffer((size_t) req->content_length, &data);
nxt_unit_request_read(req, data, req->content_length);
set_named_property(request, "_data", buffer);
}
@@ -577,13 +574,20 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
napi_value
Unit::create_request(napi_value server_obj, napi_value socket)
Unit::create_request(napi_value server_obj, napi_value socket,
nxt_unit_request_info_t *req)
{
napi_value constructor;
napi_value constructor, res;
req_data_t *req_data;
constructor = get_named_property(server_obj, "ServerRequest");
return new_instance(constructor, server_obj, socket);
res = new_instance(constructor, server_obj, socket);
req_data = (req_data_t *) req->data;
req_data->req_ref = wrap(res, req, req_destroy);
return res;
}
@@ -642,6 +646,47 @@ Unit::create_websocket_frame(napi_value server_obj,
}
napi_value
Unit::request_read(napi_env env, napi_callback_info info)
{
void *data;
uint32_t wm;
nxt_napi napi(env);
napi_value this_arg, argv, buffer;
nxt_unit_request_info_t *req;
try {
this_arg = napi.get_cb_info(info, argv);
try {
req = napi.get_request_info(this_arg);
} catch (exception &e) {
return nullptr;
}
if (req->content_length == 0) {
return nullptr;
}
wm = napi.get_value_uint32(argv);
if (wm > req->content_length) {
wm = req->content_length;
}
buffer = napi.create_buffer((size_t) wm, &data);
nxt_unit_request_read(req, data, wm);
} catch (exception &e) {
napi.throw_error(e);
return nullptr;
}
return buffer;
}
napi_value
Unit::response_send_headers(napi_env env, napi_callback_info info)
{
@@ -884,6 +929,7 @@ Unit::response_end(napi_env env, napi_callback_info info)
req_data = (req_data_t *) req->data;
napi.remove_wrap(req_data->sock_ref);
napi.remove_wrap(req_data->req_ref);
napi.remove_wrap(req_data->resp_ref);
napi.remove_wrap(req_data->conn_ref);
@@ -1011,6 +1057,13 @@ Unit::sock_destroy(napi_env env, void *r, void *finalize_hint)
}
void
Unit::req_destroy(napi_env env, void *r, void *finalize_hint)
{
nxt_unit_req_debug(NULL, "req_destroy: %p", r);
}
void
Unit::resp_destroy(napi_env env, void *r, void *finalize_hint)
{

View File

@@ -21,6 +21,7 @@ private:
static void destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void conn_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void sock_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void req_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static void resp_destroy(napi_env env, void *nativeObject, void *finalize_hint);
static napi_value create_server(napi_env env, napi_callback_info info);
@@ -50,7 +51,8 @@ private:
napi_value create_socket(napi_value server_obj,
nxt_unit_request_info_t *req);
napi_value create_request(napi_value server_obj, napi_value socket);
napi_value create_request(napi_value server_obj, napi_value socket,
nxt_unit_request_info_t *req);
napi_value create_response(napi_value server_obj, napi_value request,
nxt_unit_request_info_t *req);
@@ -58,6 +60,8 @@ private:
napi_value create_websocket_frame(napi_value server_obj,
nxt_unit_websocket_frame_t *ws);
static napi_value request_read(napi_env env, napi_callback_info info);
static napi_value response_send_headers(napi_env env,
napi_callback_info info);

View File

@@ -222,22 +222,6 @@ class TestNodeApplication(TestApplicationNode):
assert 'X-Header' not in headers, 'insensitive'
assert 'X-header' not in headers, 'insensitive 2'
def test_node_application_promise_handler(self, temp_dir):
self.load('promise_handler')
assert (
self.post(
headers={
'Host': 'localhost',
'Content-Type': 'text/html',
'Connection': 'close',
},
body='callback',
)['status']
== 200
), 'promise handler request'
assert waitforfiles(temp_dir + '/node/callback'), 'promise handler'
def test_node_application_promise_handler_write_after_end(self):
self.load('promise_handler')
@@ -270,35 +254,6 @@ class TestNodeApplication(TestApplicationNode):
), 'promise end request'
assert waitforfiles(temp_dir + '/node/callback'), 'promise end'
def test_node_application_promise_multiple_calls(self, temp_dir):
self.load('promise_handler')
self.post(
headers={
'Host': 'localhost',
'Content-Type': 'text/html',
'Connection': 'close',
},
body='callback1',
)
assert waitforfiles(
temp_dir + '/node/callback1'
), 'promise first call'
self.post(
headers={
'Host': 'localhost',
'Content-Type': 'text/html',
'Connection': 'close',
},
body='callback2',
)
assert waitforfiles(
temp_dir + '/node/callback2'
), 'promise second call'
@pytest.mark.skip('not yet')
def test_node_application_header_name_valid(self):
self.load('header_name_valid')