Node.js: added async request execution.

This commit is contained in:
Alexander Borisov
2018-10-31 15:51:51 +03:00
parent 3b0afb1681
commit c838c3bd15
5 changed files with 181 additions and 98 deletions

View File

@@ -232,7 +232,6 @@ ServerResponse.prototype.write = function write(chunk, encoding, callback) {
ServerResponse.prototype.end = function end(chunk, encoding, callback) { ServerResponse.prototype.end = function end(chunk, encoding, callback) {
this._writeBody(chunk, encoding, callback); this._writeBody(chunk, encoding, callback);
unit_lib.unit_response_end(this)
this.finished = true; this.finished = true;
@@ -290,10 +289,10 @@ function Server(requestListener) {
EventEmitter.call(this); EventEmitter.call(this);
this.unit = new unit_lib.Unit(); this.unit = new unit_lib.Unit();
this.unit.createServer();
this.unit.server = this; this.unit.server = this;
this.unit.createServer();
this.socket = Socket; this.socket = Socket;
this.request = ServerRequest; this.request = ServerRequest;
this.response = ServerResponse; this.response = ServerResponse;
@@ -318,9 +317,31 @@ Server.prototype.listen = function () {
this.unit.listen(); this.unit.listen();
}; };
function connectionListener(socket) { Server.prototype.run_events = function (server, req, res) {
/* Important!!! setImmediate starts the next iteration in Node.js loop. */
setImmediate(function () {
server.emit("request", req, res);
Promise.resolve().then(() => {
let buf = server.unit._read(req.socket.req_pointer);
if (buf.length != 0) {
req.emit("data", buf);
} }
req.emit("end");
});
Promise.resolve().then(() => {
if (res.finished) {
unit_lib.unit_response_end(res);
}
});
});
};
function connectionListener(socket) {
}
module.exports = { module.exports = {
STATUS_CODES: http.STATUS_CODES, STATUS_CODES: http.STATUS_CODES,

View File

@@ -5,6 +5,11 @@
#include "unit.h" #include "unit.h"
#include <unistd.h>
#include <fcntl.h>
#include <uv.h>
napi_ref Unit::constructor_; napi_ref Unit::constructor_;
@@ -31,11 +36,12 @@ Unit::init(napi_env env, napi_value exports)
napi_property_descriptor properties[] = { napi_property_descriptor properties[] = {
{ "createServer", 0, create_server, 0, 0, 0, napi_default, 0 }, { "createServer", 0, create_server, 0, 0, 0, napi_default, 0 },
{ "listen", 0, listen, 0, 0, 0, napi_default, 0 } { "listen", 0, listen, 0, 0, 0, napi_default, 0 },
{ "_read", 0, _read, 0, 0, 0, napi_default, 0 }
}; };
status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr, status = napi_define_class(env, "Unit", NAPI_AUTO_LENGTH, create, nullptr,
2, properties, &cons); 3, properties, &cons);
if (status != napi_ok) { if (status != napi_ok) {
goto failed; goto failed;
} }
@@ -158,14 +164,13 @@ Unit::create_server(napi_env env, napi_callback_info info)
{ {
Unit *obj; Unit *obj;
size_t argc; size_t argc;
napi_value jsthis; napi_value jsthis, argv;
napi_status status; napi_status status;
napi_value argv[1];
nxt_unit_init_t unit_init; nxt_unit_init_t unit_init;
argc = 1; argc = 1;
status = napi_get_cb_info(env, info, &argc, argv, &jsthis, nullptr); status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr);
if (status != napi_ok) { if (status != napi_ok) {
goto failed; goto failed;
} }
@@ -179,6 +184,8 @@ Unit::create_server(napi_env env, napi_callback_info info)
unit_init.data = obj; unit_init.data = obj;
unit_init.callbacks.request_handler = request_handler; unit_init.callbacks.request_handler = request_handler;
unit_init.callbacks.add_port = add_port;
unit_init.callbacks.remove_port = remove_port;
obj->unit_ctx_ = nxt_unit_init(&unit_init); obj->unit_ctx_ = nxt_unit_init(&unit_init);
if (obj->unit_ctx_ == NULL) { if (obj->unit_ctx_ == NULL) {
@@ -198,41 +205,53 @@ failed:
napi_value napi_value
Unit::listen(napi_env env, napi_callback_info info) Unit::listen(napi_env env, napi_callback_info info)
{ {
int ret; return nullptr;
Unit *obj; }
napi_value jsthis;
napi_status status;
status = napi_get_cb_info(env, info, nullptr, nullptr, &jsthis, nullptr);
napi_value
Unit::_read(napi_env env, napi_callback_info info)
{
Unit *obj;
void *data;
size_t argc;
int64_t req_pointer;
napi_value jsthis, buffer, argv;
napi_status status;
nxt_unit_request_info_t *req;
argc = 1;
status = napi_get_cb_info(env, info, &argc, &argv, &jsthis, nullptr);
if (status != napi_ok) { if (status != napi_ok) {
goto failed; napi_throw_error(env, NULL, "Failed to get arguments from js");
return nullptr;
} }
status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj)); status = napi_unwrap(env, jsthis, reinterpret_cast<void **>(&obj));
if (status != napi_ok) { if (status != napi_ok) {
goto failed; napi_throw_error(env, NULL, "Failed to get Unit object form js");
}
if (obj->unit_ctx_ == NULL) {
napi_throw_error(env, NULL, "Unit context was not created");
return nullptr; return nullptr;
} }
ret = nxt_unit_run(obj->unit_ctx_); status = napi_get_value_int64(env, argv, &req_pointer);
if (ret != NXT_UNIT_OK) { if (status != napi_ok) {
napi_throw_error(env, NULL, "Failed to run Unit"); napi_throw_error(env, NULL, "Failed to get request pointer");
return nullptr; return nullptr;
} }
nxt_unit_done(obj->unit_ctx_); req = (nxt_unit_request_info_t *) (uintptr_t) req_pointer;
status = napi_create_buffer(env, (size_t) req->content_length,
&data, &buffer);
if (status != napi_ok) {
napi_throw_error(env, NULL, "Failed to create request buffer");
return nullptr; return nullptr;
}
failed: nxt_unit_request_read(req, data, req->content_length);
napi_throw_error(env, NULL, "Failed to listen Unit socket"); return buffer;
return nullptr;
} }
@@ -242,8 +261,9 @@ Unit::request_handler(nxt_unit_request_info_t *req)
Unit *obj; Unit *obj;
napi_value socket, request, response; napi_value socket, request, response;
napi_value global, server_obj; napi_value global, server_obj;
napi_value req_argv[3]; napi_value run_events, events_res;
napi_status status; napi_status status;
napi_value events_args[3];
obj = reinterpret_cast<Unit *>(req->unit->data); obj = reinterpret_cast<Unit *>(req->unit->data);
@@ -284,22 +304,109 @@ Unit::request_handler(nxt_unit_request_info_t *req)
return; return;
} }
req_argv[1] = request;
req_argv[2] = response;
status = obj->create_headers(req, request); status = obj->create_headers(req, request);
if (status != napi_ok) { if (status != napi_ok) {
napi_throw_error(obj->env_, NULL, "Failed to create headers"); napi_throw_error(obj->env_, NULL, "Failed to create headers");
return; return;
} }
obj->emit(server_obj, "request", sizeof("request") - 1, 3, req_argv); status = napi_get_named_property(obj->env_, server_obj, "run_events",
obj->emit_post_data(request, req); &run_events);
if (status != napi_ok) {
napi_throw_error(obj->env_, NULL, "Failed to get"
" 'run_events' function");
return;
}
events_args[0] = server_obj;
events_args[1] = request;
events_args[2] = response;
status = napi_call_function(obj->env_, server_obj, run_events, 3,
events_args, &events_res);
if (status != napi_ok) {
napi_throw_error(obj->env_, NULL, "Failed to call"
" 'run_events' function");
return;
}
napi_close_handle_scope(obj->env_, scope); napi_close_handle_scope(obj->env_, scope);
} }
void
nxt_uv_read_callback(uv_poll_t *handle, int status, int events)
{
nxt_unit_run_once((nxt_unit_ctx_t *) handle->data);
}
int
Unit::add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port)
{
int err;
Unit *obj;
uv_loop_t *loop;
uv_poll_t *uv_handle;
napi_status status;
if (port->in_fd != -1) {
obj = reinterpret_cast<Unit *>(ctx->unit->data);
if (fcntl(port->in_fd, F_SETFL, O_NONBLOCK) == -1) {
napi_throw_error(obj->env_, NULL, "Failed to upgrade read"
" file descriptor to O_NONBLOCK");
return -1;
}
status = napi_get_uv_event_loop(obj->env_, &loop);
if (status != napi_ok) {
napi_throw_error(obj->env_, NULL, "Failed to get uv.loop");
return NXT_UNIT_ERROR;
}
uv_handle = new uv_poll_t;
err = uv_poll_init(loop, uv_handle, port->in_fd);
if (err < 0) {
napi_throw_error(obj->env_, NULL, "Failed to init uv.poll");
return NXT_UNIT_ERROR;
}
err = uv_poll_start(uv_handle, UV_READABLE, nxt_uv_read_callback);
if (err < 0) {
napi_throw_error(obj->env_, NULL, "Failed to start uv.poll");
return NXT_UNIT_ERROR;
}
port->data = uv_handle;
uv_handle->data = ctx;
}
return nxt_unit_add_port(ctx, port);
}
void
Unit::remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id)
{
nxt_unit_port_t *port;
port = nxt_unit_find_port(ctx, port_id);
if (port == NULL) {
return;
}
if (port->in_fd != -1 && port->data != NULL) {
uv_poll_stop((uv_poll_t *) port->data);
delete (uv_poll_t *) port->data;
}
nxt_unit_remove_port(ctx, port_id);
}
napi_value napi_value
Unit::get_server_object() Unit::get_server_object()
{ {
@@ -320,40 +427,6 @@ Unit::get_server_object()
} }
napi_value
Unit::emit(napi_value obj, const char *name, size_t name_len, size_t argc,
napi_value *argv)
{
napi_value emitter, return_val, str;
napi_status status;
status = napi_get_named_property(env_, obj, "emit", &emitter);
if (status != napi_ok) {
return nullptr;
}
status = napi_create_string_latin1(env_, name, name_len, &str);
if (status != napi_ok) {
return nullptr;
}
if (argc != 0) {
argv[0] = str;
} else {
argc = 1;
argv = &str;
}
status = napi_call_function(env_, obj, emitter, argc, argv, &return_val);
if (status != napi_ok) {
return nullptr;
}
return return_val;
}
napi_status napi_status
Unit::create_headers(nxt_unit_request_info_t *req, napi_value request) Unit::create_headers(nxt_unit_request_info_t *req, napi_value request)
{ {
@@ -480,7 +553,7 @@ Unit::append_header(nxt_unit_field_t *f, napi_value headers,
napi_value napi_value
Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req) Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
{ {
napi_value constructor, return_val; napi_value constructor, return_val, req_pointer;
napi_status status; napi_status status;
status = napi_get_named_property(env_, server_obj, "socket", status = napi_get_named_property(env_, server_obj, "socket",
@@ -494,6 +567,17 @@ Unit::create_socket(napi_value server_obj, nxt_unit_request_info_t *req)
return nullptr; return nullptr;
} }
status = napi_create_int64(env_, (uintptr_t) req, &req_pointer);
if (status != napi_ok) {
return nullptr;
}
status = napi_set_named_property(env_, return_val, "req_pointer",
req_pointer);
if (status != napi_ok) {
return nullptr;
}
return return_val; return return_val;
} }
@@ -563,27 +647,6 @@ Unit::create_response(napi_value server_obj, napi_value socket,
} }
void
Unit::emit_post_data(napi_value request, nxt_unit_request_info_t *req)
{
void *data;
napi_value req_argv[2];
napi_status status;
status = napi_create_buffer(env_, (size_t) req->content_length,
&data, &req_argv[1]);
if (status != napi_ok) {
napi_throw_error(env_, NULL, "Failed to create request buffer");
return;
}
nxt_unit_request_read(req, data, req->content_length);
emit(request, "data", sizeof("data") - 1, 2, req_argv);
emit(request, "end", sizeof("end") - 1, 0, nullptr);
}
napi_value napi_value
Unit::response_send_headers(napi_env env, napi_callback_info info) Unit::response_send_headers(napi_env env, napi_callback_info info)
{ {

View File

@@ -36,13 +36,13 @@ private:
static napi_value create_server(napi_env env, napi_callback_info info); static napi_value create_server(napi_env env, napi_callback_info info);
static napi_value listen(napi_env env, napi_callback_info info); static napi_value listen(napi_env env, napi_callback_info info);
static napi_value _read(napi_env env, napi_callback_info info);
static void request_handler(nxt_unit_request_info_t *req); static void request_handler(nxt_unit_request_info_t *req);
static int add_port(nxt_unit_ctx_t *ctx, nxt_unit_port_t *port);
static void remove_port(nxt_unit_ctx_t *ctx, nxt_unit_port_id_t *port_id);
napi_value get_server_object(); napi_value get_server_object();
napi_value emit(napi_value obj, const char *name, size_t name_len,
size_t argc, napi_value *argv);
napi_value create_socket(napi_value server_obj, napi_value create_socket(napi_value server_obj,
nxt_unit_request_info_t *req); nxt_unit_request_info_t *req);
@@ -52,8 +52,6 @@ private:
napi_value request, napi_value request,
nxt_unit_request_info_t *req, Unit *obj); nxt_unit_request_info_t *req, Unit *obj);
void emit_post_data(napi_value request, nxt_unit_request_info_t *req);
static napi_value response_send_headers(napi_env env, static napi_value response_send_headers(napi_env env,
napi_callback_info info); napi_callback_info info);

View File

@@ -74,7 +74,6 @@ static nxt_unit_process_t *nxt_unit_process_get(nxt_unit_ctx_t *ctx,
static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx, static nxt_unit_process_t *nxt_unit_process_find(nxt_unit_ctx_t *ctx,
pid_t pid, int remove); pid_t pid, int remove);
static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib); static nxt_unit_process_t *nxt_unit_process_pop_first(nxt_unit_impl_t *lib);
static int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
static int nxt_unit_create_port(nxt_unit_ctx_t *ctx, static int nxt_unit_create_port(nxt_unit_ctx_t *ctx,
nxt_unit_port_id_t *port_id, int *fd); nxt_unit_port_id_t *port_id, int *fd);
@@ -2697,7 +2696,7 @@ nxt_unit_run(nxt_unit_ctx_t *ctx)
} }
static int int
nxt_unit_run_once(nxt_unit_ctx_t *ctx) nxt_unit_run_once(nxt_unit_ctx_t *ctx)
{ {
int rc; int rc;

View File

@@ -196,6 +196,8 @@ int nxt_unit_process_msg(nxt_unit_ctx_t *, nxt_unit_port_id_t *port_id,
*/ */
int nxt_unit_run(nxt_unit_ctx_t *); int nxt_unit_run(nxt_unit_ctx_t *);
int nxt_unit_run_once(nxt_unit_ctx_t *ctx);
/* Destroy application library object. */ /* Destroy application library object. */
void nxt_unit_done(nxt_unit_ctx_t *); void nxt_unit_done(nxt_unit_ctx_t *);