Ruby: request processing in multiple threads.

This closes #482 issue on GitHub.
This commit is contained in:
Max Romanov
2020-11-05 12:45:10 +03:00
parent 9f8b746e77
commit b6475df79c
6 changed files with 515 additions and 211 deletions

View File

@@ -69,6 +69,7 @@ typedef struct {
typedef struct { typedef struct {
nxt_str_t script; nxt_str_t script;
uint32_t threads;
} nxt_ruby_app_conf_t; } nxt_ruby_app_conf_t;

View File

@@ -615,6 +615,10 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_ruby_members[] = {
.name = nxt_string("script"), .name = nxt_string("script"),
.type = NXT_CONF_VLDT_STRING, .type = NXT_CONF_VLDT_STRING,
.flags = NXT_CONF_VLDT_REQUIRED, .flags = NXT_CONF_VLDT_REQUIRED,
}, {
.name = nxt_string("threads"),
.type = NXT_CONF_VLDT_INTEGER,
.validator = nxt_conf_vldt_threads,
}, },
NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members) NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members)

View File

@@ -242,6 +242,11 @@ static nxt_conf_map_t nxt_ruby_app_conf[] = {
NXT_CONF_MAP_STR, NXT_CONF_MAP_STR,
offsetof(nxt_common_app_conf_t, u.ruby.script), offsetof(nxt_common_app_conf_t, u.ruby.script),
}, },
{
nxt_string("threads"),
NXT_CONF_MAP_INT32,
offsetof(nxt_common_app_conf_t, u.ruby.threads),
},
}; };

File diff suppressed because it is too large Load Diff

View File

@@ -21,9 +21,13 @@
typedef struct { typedef struct {
nxt_unit_ctx_t *unit_ctx; VALUE env;
VALUE io_input;
VALUE io_error;
VALUE thread;
nxt_unit_ctx_t *ctx;
nxt_unit_request_info_t *req; nxt_unit_request_info_t *req;
} nxt_ruby_run_ctx_t; } nxt_ruby_ctx_t;
VALUE nxt_ruby_stream_io_input_init(void); VALUE nxt_ruby_stream_io_input_init(void);

View File

@@ -8,7 +8,7 @@
#include <nxt_unit.h> #include <nxt_unit.h>
static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE wrap); static VALUE nxt_ruby_stream_io_new(VALUE class, VALUE arg);
static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self); static VALUE nxt_ruby_stream_io_initialize(int argc, VALUE *argv, VALUE self);
static VALUE nxt_ruby_stream_io_gets(VALUE obj); static VALUE nxt_ruby_stream_io_gets(VALUE obj);
static VALUE nxt_ruby_stream_io_each(VALUE obj); static VALUE nxt_ruby_stream_io_each(VALUE obj);
@@ -16,8 +16,7 @@ static VALUE nxt_ruby_stream_io_read(VALUE obj, VALUE args);
static VALUE nxt_ruby_stream_io_rewind(VALUE obj); static VALUE nxt_ruby_stream_io_rewind(VALUE obj);
static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args); static VALUE nxt_ruby_stream_io_puts(VALUE obj, VALUE args);
static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args); static VALUE nxt_ruby_stream_io_write(VALUE obj, VALUE args);
nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, nxt_inline long nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val);
VALUE val);
static VALUE nxt_ruby_stream_io_flush(VALUE obj); static VALUE nxt_ruby_stream_io_flush(VALUE obj);
@@ -63,13 +62,11 @@ nxt_ruby_stream_io_error_init(void)
static VALUE static VALUE
nxt_ruby_stream_io_new(VALUE class, VALUE wrap) nxt_ruby_stream_io_new(VALUE class, VALUE arg)
{ {
VALUE self; VALUE self;
nxt_ruby_run_ctx_t *run_ctx;
Data_Get_Struct(wrap, nxt_ruby_run_ctx_t, run_ctx); self = Data_Wrap_Struct(class, 0, 0, (void *) (uintptr_t) arg);
self = Data_Wrap_Struct(class, 0, 0, run_ctx);
rb_obj_call_init(self, 0, NULL); rb_obj_call_init(self, 0, NULL);
@@ -89,12 +86,11 @@ nxt_ruby_stream_io_gets(VALUE obj)
{ {
VALUE buf; VALUE buf;
ssize_t res; ssize_t res;
nxt_ruby_run_ctx_t *run_ctx; nxt_ruby_ctx_t *rctx;
nxt_unit_request_info_t *req; nxt_unit_request_info_t *req;
Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
req = rctx->req;
req = run_ctx->req;
if (req->content_length == 0) { if (req->content_length == 0) {
return Qnil; return Qnil;
@@ -145,13 +141,13 @@ nxt_ruby_stream_io_each(VALUE obj)
static VALUE static VALUE
nxt_ruby_stream_io_read(VALUE obj, VALUE args) nxt_ruby_stream_io_read(VALUE obj, VALUE args)
{ {
VALUE buf; VALUE buf;
long copy_size, u_size; long copy_size, u_size;
nxt_ruby_run_ctx_t *run_ctx; nxt_ruby_ctx_t *rctx;
Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
copy_size = run_ctx->req->content_length; copy_size = rctx->req->content_length;
if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) { if (RARRAY_LEN(args) > 0 && TYPE(RARRAY_PTR(args)[0]) == T_FIXNUM) {
u_size = NUM2LONG(RARRAY_PTR(args)[0]); u_size = NUM2LONG(RARRAY_PTR(args)[0]);
@@ -175,8 +171,7 @@ nxt_ruby_stream_io_read(VALUE obj, VALUE args)
return Qnil; return Qnil;
} }
copy_size = nxt_unit_request_read(run_ctx->req, RSTRING_PTR(buf), copy_size = nxt_unit_request_read(rctx->req, RSTRING_PTR(buf), copy_size);
copy_size);
if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) { if (RARRAY_LEN(args) > 1 && TYPE(RARRAY_PTR(args)[1]) == T_STRING) {
@@ -200,15 +195,15 @@ nxt_ruby_stream_io_rewind(VALUE obj)
static VALUE static VALUE
nxt_ruby_stream_io_puts(VALUE obj, VALUE args) nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
{ {
nxt_ruby_run_ctx_t *run_ctx; nxt_ruby_ctx_t *rctx;
if (RARRAY_LEN(args) != 1) { if (RARRAY_LEN(args) != 1) {
return Qnil; return Qnil;
} }
Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
return Qnil; return Qnil;
} }
@@ -217,23 +212,23 @@ nxt_ruby_stream_io_puts(VALUE obj, VALUE args)
static VALUE static VALUE
nxt_ruby_stream_io_write(VALUE obj, VALUE args) nxt_ruby_stream_io_write(VALUE obj, VALUE args)
{ {
long len; long len;
nxt_ruby_run_ctx_t *run_ctx; nxt_ruby_ctx_t *rctx;
if (RARRAY_LEN(args) != 1) { if (RARRAY_LEN(args) != 1) {
return Qnil; return Qnil;
} }
Data_Get_Struct(obj, nxt_ruby_run_ctx_t, run_ctx); Data_Get_Struct(obj, nxt_ruby_ctx_t, rctx);
len = nxt_ruby_stream_io_s_write(run_ctx, RARRAY_PTR(args)[0]); len = nxt_ruby_stream_io_s_write(rctx, RARRAY_PTR(args)[0]);
return LONG2FIX(len); return LONG2FIX(len);
} }
nxt_inline long nxt_inline long
nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val) nxt_ruby_stream_io_s_write(nxt_ruby_ctx_t *rctx, VALUE val)
{ {
if (nxt_slow_path(val == Qnil)) { if (nxt_slow_path(val == Qnil)) {
return 0; return 0;
@@ -247,7 +242,7 @@ nxt_ruby_stream_io_s_write(nxt_ruby_run_ctx_t *run_ctx, VALUE val)
} }
} }
nxt_unit_req_error(run_ctx->req, "Ruby: %s", RSTRING_PTR(val)); nxt_unit_req_error(rctx->req, "Ruby: %s", RSTRING_PTR(val));
return RSTRING_LEN(val); return RSTRING_LEN(val);
} }