Files
nginx-unit/src/ruby/nxt_ruby.c
Alejandro Colomar 9b4b4925b3 Ruby: fixed contents of SCRIPT_NAME.
Having the basename of the script pathname was incorrect.  While
we don't have something more accurate, the best thing to do is to
have it empty (which should be the right thing most of the time).

This closes #715 issue on GitHub.

The bug was introduced in git commit
0032543fa6
'Ruby: added the Rack environment parameter "SCRIPT_NAME".'.
2022-07-27 12:46:42 +02:00

1433 lines
36 KiB
C

/*
* Copyright (C) Alexander Borisov
* Copyright (C) NGINX, Inc.
*/
#include <ruby/nxt_ruby.h>
#include <nxt_unit.h>
#include <nxt_unit_request.h>
#include <ruby/thread.h>
#include NXT_RUBY_MOUNTS_H
#include <locale.h>
#define NXT_RUBY_RACK_API_VERSION_MAJOR 1
#define NXT_RUBY_RACK_API_VERSION_MINOR 3
typedef struct {
nxt_task_t *task;
nxt_str_t *script;
nxt_ruby_ctx_t *rctx;
} nxt_ruby_rack_init_t;
static nxt_int_t nxt_ruby_start(nxt_task_t *task,
nxt_process_data_t *data);
static VALUE nxt_ruby_init_basic(VALUE arg);
static VALUE nxt_ruby_hook_procs_load(VALUE path);
static VALUE nxt_ruby_hook_register(VALUE arg);
static VALUE nxt_ruby_hook_call(VALUE name);
static VALUE nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init);
static VALUE nxt_ruby_require_rubygems(VALUE arg);
static VALUE nxt_ruby_bundler_setup(VALUE arg);
static VALUE nxt_ruby_require_rack(VALUE arg);
static VALUE nxt_ruby_rack_parse_script(VALUE ctx);
static VALUE nxt_ruby_rack_env_create(VALUE arg);
static int nxt_ruby_init_io(nxt_ruby_ctx_t *rctx);
static void nxt_ruby_request_handler(nxt_unit_request_info_t *req);
static void *nxt_ruby_request_handler_gvl(void *req);
static int nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx);
static void *nxt_ruby_thread_create_gvl(void *rctx);
static VALUE nxt_ruby_thread_func(VALUE arg);
static void *nxt_ruby_unit_run(void *ctx);
static void nxt_ruby_ubf(void *ctx);
static int nxt_ruby_init_threads(nxt_ruby_app_conf_t *c);
static void nxt_ruby_join_threads(nxt_unit_ctx_t *ctx,
nxt_ruby_app_conf_t *c);
static VALUE nxt_ruby_rack_app_run(VALUE arg);
static int nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env);
nxt_inline void nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
nxt_unit_sptr_t *sptr, uint32_t len);
static nxt_int_t nxt_ruby_rack_result_status(nxt_unit_request_info_t *req,
VALUE result);
static int nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req,
VALUE result, nxt_int_t status);
static int nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg);
static int nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg);
static int nxt_ruby_rack_result_body(nxt_unit_request_info_t *req,
VALUE result);
static int nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
VALUE filepath);
static void *nxt_ruby_response_write_cb(void *read_info);
static VALUE nxt_ruby_rack_result_body_each(VALUE body, VALUE arg,
int argc, const VALUE *argv, VALUE blockarg);
static void *nxt_ruby_response_write(void *body);
static void nxt_ruby_exception_log(nxt_unit_request_info_t *req,
uint32_t level, const char *desc);
static void nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx);
static void nxt_ruby_atexit(void);
static uint32_t compat[] = {
NXT_VERNUM, NXT_DEBUG,
};
static VALUE nxt_ruby_hook_procs;
static VALUE nxt_ruby_rackup;
static VALUE nxt_ruby_call;
static uint32_t nxt_ruby_threads;
static nxt_ruby_ctx_t *nxt_ruby_ctxs;
NXT_EXPORT nxt_app_module_t nxt_app_module = {
sizeof(compat),
compat,
nxt_string("ruby"),
ruby_version,
nxt_ruby_mounts,
nxt_nitems(nxt_ruby_mounts),
NULL,
nxt_ruby_start,
};
typedef struct {
nxt_str_t string;
VALUE *v;
} nxt_ruby_string_t;
static VALUE nxt_rb_80_str;
static VALUE nxt_rb_content_length_str;
static VALUE nxt_rb_content_type_str;
static VALUE nxt_rb_http_str;
static VALUE nxt_rb_https_str;
static VALUE nxt_rb_path_info_str;
static VALUE nxt_rb_query_string_str;
static VALUE nxt_rb_rack_url_scheme_str;
static VALUE nxt_rb_remote_addr_str;
static VALUE nxt_rb_request_method_str;
static VALUE nxt_rb_request_uri_str;
static VALUE nxt_rb_server_addr_str;
static VALUE nxt_rb_server_name_str;
static VALUE nxt_rb_server_port_str;
static VALUE nxt_rb_server_protocol_str;
static VALUE nxt_rb_on_worker_boot;
static VALUE nxt_rb_on_worker_shutdown;
static VALUE nxt_rb_on_thread_boot;
static VALUE nxt_rb_on_thread_shutdown;
static nxt_ruby_string_t nxt_rb_strings[] = {
{ nxt_string("80"), &nxt_rb_80_str },
{ nxt_string("CONTENT_LENGTH"), &nxt_rb_content_length_str },
{ nxt_string("CONTENT_TYPE"), &nxt_rb_content_type_str },
{ nxt_string("http"), &nxt_rb_http_str },
{ nxt_string("https"), &nxt_rb_https_str },
{ nxt_string("PATH_INFO"), &nxt_rb_path_info_str },
{ nxt_string("QUERY_STRING"), &nxt_rb_query_string_str },
{ nxt_string("rack.url_scheme"), &nxt_rb_rack_url_scheme_str },
{ nxt_string("REMOTE_ADDR"), &nxt_rb_remote_addr_str },
{ nxt_string("REQUEST_METHOD"), &nxt_rb_request_method_str },
{ nxt_string("REQUEST_URI"), &nxt_rb_request_uri_str },
{ nxt_string("SERVER_ADDR"), &nxt_rb_server_addr_str },
{ nxt_string("SERVER_NAME"), &nxt_rb_server_name_str },
{ nxt_string("SERVER_PORT"), &nxt_rb_server_port_str },
{ nxt_string("SERVER_PROTOCOL"), &nxt_rb_server_protocol_str },
{ nxt_string("on_worker_boot"), &nxt_rb_on_worker_boot },
{ nxt_string("on_worker_shutdown"), &nxt_rb_on_worker_shutdown },
{ nxt_string("on_thread_boot"), &nxt_rb_on_thread_boot },
{ nxt_string("on_thread_shutdown"), &nxt_rb_on_thread_shutdown },
{ nxt_null_string, NULL },
};
static int
nxt_ruby_init_strings(void)
{
VALUE v;
nxt_ruby_string_t *pstr;
pstr = nxt_rb_strings;
while (pstr->string.start != NULL) {
v = rb_str_new_static((char *) pstr->string.start, pstr->string.length);
if (nxt_slow_path(v == Qnil)) {
nxt_unit_alert(NULL, "Ruby: failed to create const string '%.*s'",
(int) pstr->string.length,
(char *) pstr->string.start);
return NXT_UNIT_ERROR;
}
*pstr->v = v;
rb_gc_register_address(pstr->v);
pstr++;
}
return NXT_UNIT_OK;
}
static void
nxt_ruby_done_strings(void)
{
nxt_ruby_string_t *pstr;
pstr = nxt_rb_strings;
while (pstr->string.start != NULL) {
rb_gc_unregister_address(pstr->v);
*pstr->v = Qnil;
pstr++;
}
}
static VALUE
nxt_ruby_hook_procs_load(VALUE path)
{
VALUE module, file, file_obj;
module = rb_define_module("Unit");
nxt_ruby_hook_procs = rb_hash_new();
rb_gc_register_address(&nxt_ruby_hook_procs);
rb_define_module_function(module, "on_worker_boot",
&nxt_ruby_hook_register, 0);
rb_define_module_function(module, "on_worker_shutdown",
&nxt_ruby_hook_register, 0);
rb_define_module_function(module, "on_thread_boot",
&nxt_ruby_hook_register, 0);
rb_define_module_function(module, "on_thread_shutdown",
&nxt_ruby_hook_register, 0);
file = rb_const_get(rb_cObject, rb_intern("File"));
file_obj = rb_funcall(file, rb_intern("read"), 1, path);
return rb_funcall(module, rb_intern("module_eval"), 3, file_obj, path,
INT2NUM(1));
}
static VALUE
nxt_ruby_hook_register(VALUE arg)
{
VALUE kernel, callee, callee_str;
rb_need_block();
kernel = rb_const_get(rb_cObject, rb_intern("Kernel"));
callee = rb_funcall(kernel, rb_intern("__callee__"), 0);
callee_str = rb_funcall(callee, rb_intern("to_s"), 0);
rb_hash_aset(nxt_ruby_hook_procs, callee_str, rb_block_proc());
return Qnil;
}
static VALUE
nxt_ruby_hook_call(VALUE name)
{
VALUE proc;
proc = rb_hash_lookup(nxt_ruby_hook_procs, name);
if (proc == Qnil) {
return Qnil;
}
return rb_funcall(proc, rb_intern("call"), 0);
}
static nxt_int_t
nxt_ruby_start(nxt_task_t *task, nxt_process_data_t *data)
{
int state, rc;
VALUE res, path;
nxt_ruby_ctx_t ruby_ctx;
nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t ruby_unit_init;
nxt_ruby_app_conf_t *c;
nxt_ruby_rack_init_t rack_init;
nxt_common_app_conf_t *conf;
static char *argv[2] = { (char *) "NGINX_Unit", (char *) "-e0" };
conf = data->app;
c = &conf->u.ruby;
nxt_ruby_threads = c->threads;
setlocale(LC_CTYPE, "");
RUBY_INIT_STACK
ruby_init();
ruby_options(2, argv);
ruby_script("NGINX_Unit");
ruby_ctx.env = Qnil;
ruby_ctx.io_input = Qnil;
ruby_ctx.io_error = Qnil;
ruby_ctx.thread = Qnil;
ruby_ctx.ctx = NULL;
ruby_ctx.req = NULL;
rack_init.task = task;
rack_init.script = &c->script;
rack_init.rctx = &ruby_ctx;
nxt_ruby_init_strings();
res = rb_protect(nxt_ruby_init_basic,
(VALUE) (uintptr_t) &rack_init, &state);
if (nxt_slow_path(res == Qnil || state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to init basic variables");
return NXT_ERROR;
}
nxt_ruby_call = Qnil;
nxt_ruby_hook_procs = Qnil;
if (c->hooks.start != NULL) {
path = rb_str_new((const char *) c->hooks.start,
(long) c->hooks.length);
rb_protect(nxt_ruby_hook_procs_load, path, &state);
rb_str_free(path);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to setup hooks");
return NXT_ERROR;
}
}
if (nxt_ruby_hook_procs != Qnil) {
rb_protect(nxt_ruby_hook_call, nxt_rb_on_worker_boot, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to call on_worker_boot()");
return NXT_ERROR;
}
}
nxt_ruby_rackup = nxt_ruby_rack_init(&rack_init);
if (nxt_slow_path(nxt_ruby_rackup == Qnil)) {
return NXT_ERROR;
}
rb_gc_register_address(&nxt_ruby_rackup);
nxt_ruby_call = rb_intern("call");
if (nxt_slow_path(nxt_ruby_call == Qnil)) {
nxt_alert(task, "Ruby: Unable to find rack entry point");
goto fail;
}
rb_gc_register_address(&nxt_ruby_call);
ruby_ctx.env = rb_protect(nxt_ruby_rack_env_create,
(VALUE) (uintptr_t) &ruby_ctx, &state);
if (nxt_slow_path(ruby_ctx.env == Qnil || state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to create 'environ' variable");
goto fail;
}
rc = nxt_ruby_init_threads(c);
if (nxt_slow_path(rc == NXT_UNIT_ERROR)) {
goto fail;
}
nxt_unit_default_init(task, &ruby_unit_init, conf);
ruby_unit_init.callbacks.request_handler = nxt_ruby_request_handler;
ruby_unit_init.callbacks.ready_handler = nxt_ruby_ready_handler;
ruby_unit_init.data = c;
ruby_unit_init.ctx_data = &ruby_ctx;
unit_ctx = nxt_unit_init(&ruby_unit_init);
if (nxt_slow_path(unit_ctx == NULL)) {
goto fail;
}
if (nxt_ruby_hook_procs != Qnil) {
rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_boot, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to call on_thread_boot()");
}
}
rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_unit_run, unit_ctx,
nxt_ruby_ubf, unit_ctx);
if (nxt_ruby_hook_procs != Qnil) {
rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_shutdown, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to call on_thread_shutdown()");
}
}
nxt_ruby_join_threads(unit_ctx, c);
if (nxt_ruby_hook_procs != Qnil) {
rb_protect(nxt_ruby_hook_call, nxt_rb_on_worker_shutdown, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to call on_worker_shutdown()");
}
}
nxt_unit_done(unit_ctx);
nxt_ruby_ctx_done(&ruby_ctx);
nxt_ruby_atexit();
exit(rc);
return NXT_OK;
fail:
nxt_ruby_join_threads(NULL, c);
nxt_ruby_ctx_done(&ruby_ctx);
nxt_ruby_atexit();
return NXT_ERROR;
}
static VALUE
nxt_ruby_init_basic(VALUE arg)
{
int state;
nxt_ruby_rack_init_t *rack_init;
rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) arg;
state = rb_enc_find_index("encdb");
if (nxt_slow_path(state == 0)) {
nxt_alert(rack_init->task,
"Ruby: Failed to find encoding index 'encdb'");
return Qnil;
}
rb_funcall(rb_cObject, rb_intern("require"), 1,
rb_str_new2("enc/trans/transdb"));
return arg;
}
static VALUE
nxt_ruby_rack_init(nxt_ruby_rack_init_t *rack_init)
{
int state;
VALUE rackup, err;
rb_protect(nxt_ruby_require_rubygems, Qnil, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to require 'rubygems' package");
return Qnil;
}
rb_protect(nxt_ruby_bundler_setup, Qnil, &state);
if (state != 0) {
err = rb_errinfo();
if (rb_obj_is_kind_of(err, rb_eLoadError) == Qfalse) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to require 'bundler/setup' package");
return Qnil;
}
rb_set_errinfo(Qnil);
}
rb_protect(nxt_ruby_require_rack, Qnil, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to require 'rack' package");
return Qnil;
}
rackup = rb_protect(nxt_ruby_rack_parse_script,
(VALUE) (uintptr_t) rack_init, &state);
if (nxt_slow_path(TYPE(rackup) != T_ARRAY || state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to parse rack script");
return Qnil;
}
if (nxt_slow_path(RARRAY_LEN(rackup) < 1)) {
nxt_alert(rack_init->task, "Ruby: Invalid rack config file");
return Qnil;
}
return RARRAY_PTR(rackup)[0];
}
static VALUE
nxt_ruby_require_rubygems(VALUE arg)
{
return rb_funcall(rb_cObject, rb_intern("require"), 1,
rb_str_new2("rubygems"));
}
static VALUE
nxt_ruby_bundler_setup(VALUE arg)
{
return rb_funcall(rb_cObject, rb_intern("require"), 1,
rb_str_new2("bundler/setup"));
}
static VALUE
nxt_ruby_require_rack(VALUE arg)
{
return rb_funcall(rb_cObject, rb_intern("require"), 1, rb_str_new2("rack"));
}
static VALUE
nxt_ruby_rack_parse_script(VALUE ctx)
{
VALUE script, res, rack, builder;
nxt_ruby_rack_init_t *rack_init;
rack_init = (nxt_ruby_rack_init_t *) (uintptr_t) ctx;
rack = rb_const_get(rb_cObject, rb_intern("Rack"));
builder = rb_const_get(rack, rb_intern("Builder"));
script = rb_str_new((const char *) rack_init->script->start,
(long) rack_init->script->length);
res = rb_funcall(builder, rb_intern("parse_file"), 1, script);
rb_str_free(script);
return res;
}
static VALUE
nxt_ruby_rack_env_create(VALUE arg)
{
int rc;
VALUE hash_env, version;
nxt_ruby_ctx_t *rctx;
rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
rc = nxt_ruby_init_io(rctx);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return Qnil;
}
hash_env = rb_hash_new();
rb_hash_aset(hash_env, rb_str_new2("SERVER_SOFTWARE"),
rb_str_new((const char *) nxt_server.start,
(long) nxt_server.length));
version = rb_ary_new();
rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MAJOR));
rb_ary_push(version, UINT2NUM(NXT_RUBY_RACK_API_VERSION_MINOR));
rb_hash_aset(hash_env, rb_str_new2("SCRIPT_NAME"), rb_str_new("", 0));
rb_hash_aset(hash_env, rb_str_new2("rack.version"), version);
rb_hash_aset(hash_env, rb_str_new2("rack.input"), rctx->io_input);
rb_hash_aset(hash_env, rb_str_new2("rack.errors"), rctx->io_error);
rb_hash_aset(hash_env, rb_str_new2("rack.multithread"),
nxt_ruby_threads > 1 ? Qtrue : Qfalse);
rb_hash_aset(hash_env, rb_str_new2("rack.multiprocess"), Qtrue);
rb_hash_aset(hash_env, rb_str_new2("rack.run_once"), Qfalse);
rb_hash_aset(hash_env, rb_str_new2("rack.hijack?"), Qfalse);
rb_hash_aset(hash_env, rb_str_new2("rack.hijack"), Qnil);
rb_hash_aset(hash_env, rb_str_new2("rack.hijack_io"), Qnil);
rctx->env = hash_env;
rb_gc_register_address(&rctx->env);
return hash_env;
}
static int
nxt_ruby_init_io(nxt_ruby_ctx_t *rctx)
{
VALUE io_input, io_error;
io_input = nxt_ruby_stream_io_input_init();
rctx->io_input = rb_funcall(io_input, rb_intern("new"), 1,
(VALUE) (uintptr_t) rctx);
if (nxt_slow_path(rctx->io_input == Qnil)) {
nxt_unit_alert(NULL,
"Ruby: Failed to create environment 'rack.input' var");
return NXT_UNIT_ERROR;
}
rb_gc_register_address(&rctx->io_input);
io_error = nxt_ruby_stream_io_error_init();
rctx->io_error = rb_funcall(io_error, rb_intern("new"), 1,
(VALUE) (uintptr_t) rctx);
if (nxt_slow_path(rctx->io_error == Qnil)) {
nxt_unit_alert(NULL,
"Ruby: Failed to create environment 'rack.error' var");
return NXT_UNIT_ERROR;
}
rb_gc_register_address(&rctx->io_error);
return NXT_UNIT_OK;
}
static void
nxt_ruby_request_handler(nxt_unit_request_info_t *req)
{
(void) rb_thread_call_with_gvl(nxt_ruby_request_handler_gvl, req);
}
static void *
nxt_ruby_request_handler_gvl(void *data)
{
int state;
VALUE res;
nxt_ruby_ctx_t *rctx;
nxt_unit_request_info_t *req;
req = data;
rctx = req->ctx->data;
rctx->req = req;
res = rb_protect(nxt_ruby_rack_app_run, (VALUE) (uintptr_t) req, &state);
if (nxt_slow_path(res == Qnil || state != 0)) {
nxt_ruby_exception_log(req, NXT_LOG_ERR,
"Failed to run ruby script");
nxt_unit_request_done(req, NXT_UNIT_ERROR);
} else {
nxt_unit_request_done(req, NXT_UNIT_OK);
}
rctx->req = NULL;
return NULL;
}
static VALUE
nxt_ruby_rack_app_run(VALUE arg)
{
int rc;
VALUE env, result;
nxt_int_t status;
nxt_ruby_ctx_t *rctx;
nxt_unit_request_info_t *req;
req = (nxt_unit_request_info_t *) arg;
rctx = req->ctx->data;
env = rb_hash_dup(rctx->env);
rc = nxt_ruby_read_request(req, env);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_alert(req,
"Ruby: Failed to process incoming request");
goto fail;
}
result = rb_funcall(nxt_ruby_rackup, nxt_ruby_call, 1, env);
if (nxt_slow_path(TYPE(result) != T_ARRAY)) {
nxt_unit_req_error(req,
"Ruby: Invalid response format from application");
goto fail;
}
if (nxt_slow_path(RARRAY_LEN(result) != 3)) {
nxt_unit_req_error(req,
"Ruby: Invalid response format from application. "
"Need 3 entries [Status, Headers, Body]");
goto fail;
}
status = nxt_ruby_rack_result_status(req, result);
if (nxt_slow_path(status < 0)) {
nxt_unit_req_error(req,
"Ruby: Invalid response status from application.");
goto fail;
}
rc = nxt_ruby_rack_result_headers(req, result, status);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
rc = nxt_ruby_rack_result_body(req, result);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
rb_hash_delete(env, rb_obj_id(env));
return result;
fail:
rb_hash_delete(env, rb_obj_id(env));
return Qnil;
}
static int
nxt_ruby_read_request(nxt_unit_request_info_t *req, VALUE hash_env)
{
VALUE name;
uint32_t i;
nxt_unit_field_t *f;
nxt_unit_request_t *r;
r = req->request;
nxt_ruby_add_sptr(hash_env, nxt_rb_request_method_str, &r->method,
r->method_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_request_uri_str, &r->target,
r->target_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_path_info_str, &r->path, r->path_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_query_string_str, &r->query,
r->query_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_server_protocol_str, &r->version,
r->version_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_remote_addr_str, &r->remote,
r->remote_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_server_addr_str, &r->local,
r->local_length);
nxt_ruby_add_sptr(hash_env, nxt_rb_server_name_str, &r->server_name,
r->server_name_length);
rb_hash_aset(hash_env, nxt_rb_server_port_str, nxt_rb_80_str);
rb_hash_aset(hash_env, nxt_rb_rack_url_scheme_str,
r->tls ? nxt_rb_https_str : nxt_rb_http_str);
for (i = 0; i < r->fields_count; i++) {
f = r->fields + i;
name = rb_str_new(nxt_unit_sptr_get(&f->name), f->name_length);
nxt_ruby_add_sptr(hash_env, name, &f->value, f->value_length);
}
if (r->content_length_field != NXT_UNIT_NONE_FIELD) {
f = r->fields + r->content_length_field;
nxt_ruby_add_sptr(hash_env, nxt_rb_content_length_str,
&f->value, f->value_length);
}
if (r->content_type_field != NXT_UNIT_NONE_FIELD) {
f = r->fields + r->content_type_field;
nxt_ruby_add_sptr(hash_env, nxt_rb_content_type_str,
&f->value, f->value_length);
}
return NXT_UNIT_OK;
}
nxt_inline void
nxt_ruby_add_sptr(VALUE hash_env, VALUE name,
nxt_unit_sptr_t *sptr, uint32_t len)
{
char *str;
str = nxt_unit_sptr_get(sptr);
rb_hash_aset(hash_env, name, rb_str_new(str, len));
}
static nxt_int_t
nxt_ruby_rack_result_status(nxt_unit_request_info_t *req, VALUE result)
{
VALUE status;
status = rb_ary_entry(result, 0);
if (TYPE(status) == T_FIXNUM) {
return FIX2INT(status);
}
if (TYPE(status) == T_STRING) {
return nxt_int_parse((u_char *) RSTRING_PTR(status),
RSTRING_LEN(status));
}
nxt_unit_req_error(req, "Ruby: Invalid response 'status' "
"format from application");
return -2;
}
typedef struct {
int rc;
uint32_t fields;
uint32_t size;
nxt_unit_request_info_t *req;
} nxt_ruby_headers_info_t;
static int
nxt_ruby_rack_result_headers(nxt_unit_request_info_t *req, VALUE result,
nxt_int_t status)
{
int rc;
VALUE headers;
nxt_ruby_headers_info_t headers_info;
headers = rb_ary_entry(result, 1);
if (nxt_slow_path(TYPE(headers) != T_HASH)) {
nxt_unit_req_error(req,
"Ruby: Invalid response 'headers' format from "
"application");
return NXT_UNIT_ERROR;
}
rc = NXT_UNIT_OK;
headers_info.rc = NXT_UNIT_OK;
headers_info.fields = 0;
headers_info.size = 0;
headers_info.req = req;
rb_hash_foreach(headers, nxt_ruby_hash_info,
(VALUE) (uintptr_t) &headers_info);
if (nxt_slow_path(headers_info.rc != NXT_UNIT_OK)) {
return headers_info.rc;
}
rc = nxt_unit_response_init(req, status,
headers_info.fields, headers_info.size);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
rb_hash_foreach(headers, nxt_ruby_hash_add,
(VALUE) (uintptr_t) &headers_info);
return rc;
}
static int
nxt_ruby_hash_info(VALUE r_key, VALUE r_value, VALUE arg)
{
const char *value, *value_end, *pos;
nxt_ruby_headers_info_t *headers_info;
headers_info = (void *) (uintptr_t) arg;
if (nxt_slow_path(TYPE(r_key) != T_STRING)) {
nxt_unit_req_error(headers_info->req,
"Ruby: Wrong header entry 'key' from application");
goto fail;
}
if (nxt_slow_path(TYPE(r_value) != T_STRING)) {
nxt_unit_req_error(headers_info->req,
"Ruby: Wrong header entry 'value' from application");
goto fail;
}
value = RSTRING_PTR(r_value);
value_end = value + RSTRING_LEN(r_value);
pos = value;
for ( ;; ) {
pos = strchr(pos, '\n');
if (pos == NULL) {
break;
}
headers_info->fields++;
headers_info->size += RSTRING_LEN(r_key) + (pos - value);
pos++;
value = pos;
}
if (value <= value_end) {
headers_info->fields++;
headers_info->size += RSTRING_LEN(r_key) + (value_end - value);
}
return ST_CONTINUE;
fail:
headers_info->rc = NXT_UNIT_ERROR;
return ST_STOP;
}
static int
nxt_ruby_hash_add(VALUE r_key, VALUE r_value, VALUE arg)
{
int *rc;
uint32_t key_len;
const char *value, *value_end, *pos;
nxt_ruby_headers_info_t *headers_info;
headers_info = (void *) (uintptr_t) arg;
rc = &headers_info->rc;
value = RSTRING_PTR(r_value);
value_end = value + RSTRING_LEN(r_value);
key_len = RSTRING_LEN(r_key);
pos = value;
for ( ;; ) {
pos = strchr(pos, '\n');
if (pos == NULL) {
break;
}
*rc = nxt_unit_response_add_field(headers_info->req,
RSTRING_PTR(r_key), key_len,
value, pos - value);
if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
goto fail;
}
pos++;
value = pos;
}
if (value <= value_end) {
*rc = nxt_unit_response_add_field(headers_info->req,
RSTRING_PTR(r_key), key_len,
value, value_end - value);
if (nxt_slow_path(*rc != NXT_UNIT_OK)) {
goto fail;
}
}
return ST_CONTINUE;
fail:
*rc = NXT_UNIT_ERROR;
return ST_STOP;
}
static int
nxt_ruby_rack_result_body(nxt_unit_request_info_t *req, VALUE result)
{
int rc;
VALUE fn, body;
body = rb_ary_entry(result, 2);
if (rb_respond_to(body, rb_intern("to_path"))) {
fn = rb_funcall(body, rb_intern("to_path"), 0);
if (nxt_slow_path(TYPE(fn) != T_STRING)) {
nxt_unit_req_error(req,
"Ruby: Failed to get 'body' file path from "
"application");
return NXT_UNIT_ERROR;
}
rc = nxt_ruby_rack_result_body_file_write(req, fn);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return rc;
}
} else if (rb_respond_to(body, rb_intern("each"))) {
rb_block_call(body, rb_intern("each"), 0, 0,
nxt_ruby_rack_result_body_each, (VALUE) (uintptr_t) req);
} else {
nxt_unit_req_error(req,
"Ruby: Invalid response 'body' format "
"from application");
return NXT_UNIT_ERROR;
}
if (rb_respond_to(body, rb_intern("close"))) {
rb_funcall(body, rb_intern("close"), 0);
}
return NXT_UNIT_OK;
}
typedef struct {
int fd;
off_t pos;
off_t rest;
} nxt_ruby_rack_file_t;
static ssize_t
nxt_ruby_rack_file_read(nxt_unit_read_info_t *read_info, void *dst, size_t size)
{
ssize_t res;
nxt_ruby_rack_file_t *file;
file = read_info->data;
size = nxt_min(size, (size_t) file->rest);
res = pread(file->fd, dst, size, file->pos);
if (res >= 0) {
file->pos += res;
file->rest -= res;
if (size > (size_t) res) {
file->rest = 0;
}
}
read_info->eof = file->rest == 0;
return res;
}
typedef struct {
nxt_unit_read_info_t read_info;
nxt_unit_request_info_t *req;
} nxt_ruby_read_info_t;
static int
nxt_ruby_rack_result_body_file_write(nxt_unit_request_info_t *req,
VALUE filepath)
{
int fd, rc;
struct stat finfo;
nxt_ruby_rack_file_t ruby_file;
nxt_ruby_read_info_t ri;
fd = open(RSTRING_PTR(filepath), O_RDONLY, 0);
if (nxt_slow_path(fd == -1)) {
nxt_unit_req_error(req,
"Ruby: Failed to open content file \"%s\": %s (%d)",
RSTRING_PTR(filepath), strerror(errno), errno);
return NXT_UNIT_ERROR;
}
rc = fstat(fd, &finfo);
if (nxt_slow_path(rc == -1)) {
nxt_unit_req_error(req,
"Ruby: Content file fstat(\"%s\") failed: %s (%d)",
RSTRING_PTR(filepath), strerror(errno), errno);
close(fd);
return NXT_UNIT_ERROR;
}
ruby_file.fd = fd;
ruby_file.pos = 0;
ruby_file.rest = finfo.st_size;
ri.read_info.read = nxt_ruby_rack_file_read;
ri.read_info.eof = ruby_file.rest == 0;
ri.read_info.buf_size = ruby_file.rest;
ri.read_info.data = &ruby_file;
ri.req = req;
rc = (intptr_t) rb_thread_call_without_gvl(nxt_ruby_response_write_cb,
&ri,
nxt_ruby_ubf,
req->ctx);
close(fd);
return rc;
}
static void *
nxt_ruby_response_write_cb(void *data)
{
int rc;
nxt_ruby_read_info_t *ri;
ri = data;
rc = nxt_unit_response_write_cb(ri->req, &ri->read_info);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(ri->req, "Ruby: Failed to write content file.");
}
return (void *) (intptr_t) rc;
}
typedef struct {
VALUE body;
nxt_unit_request_info_t *req;
} nxt_ruby_write_info_t;
static VALUE
nxt_ruby_rack_result_body_each(VALUE body, VALUE arg, int argc,
const VALUE *argv, VALUE blockarg)
{
nxt_ruby_write_info_t wi;
if (TYPE(body) != T_STRING) {
return Qnil;
}
wi.body = body;
wi.req = (void *) (uintptr_t) arg;
(void) rb_thread_call_without_gvl(nxt_ruby_response_write,
(void *) (uintptr_t) &wi,
nxt_ruby_ubf, wi.req->ctx);
return Qnil;
}
static void *
nxt_ruby_response_write(void *data)
{
int rc;
nxt_ruby_write_info_t *wi;
wi = data;
rc = nxt_unit_response_write(wi->req, RSTRING_PTR(wi->body),
RSTRING_LEN(wi->body));
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_unit_req_error(wi->req,
"Ruby: Failed to write 'body' from application");
}
return (void *) (intptr_t) rc;
}
static void
nxt_ruby_exception_log(nxt_unit_request_info_t *req, uint32_t level,
const char *desc)
{
int i;
VALUE err, ary, eclass, msg;
nxt_unit_req_log(req, level, "Ruby: %s", desc);
err = rb_errinfo();
if (nxt_slow_path(err == Qnil)) {
return;
}
eclass = rb_class_name(rb_class_of(err));
msg = rb_funcall(err, rb_intern("message"), 0);
ary = rb_funcall(err, rb_intern("backtrace"), 0);
if (RARRAY_LEN(ary) == 0) {
nxt_unit_req_log(req, level, "Ruby: %s (%s)", RSTRING_PTR(msg),
RSTRING_PTR(eclass));
return;
}
nxt_unit_req_log(req, level, "Ruby: %s: %s (%s)",
RSTRING_PTR(RARRAY_PTR(ary)[0]),
RSTRING_PTR(msg), RSTRING_PTR(eclass));
for (i = 1; i < RARRAY_LEN(ary); i++) {
nxt_unit_req_log(req, level, "from %s",
RSTRING_PTR(RARRAY_PTR(ary)[i]));
}
}
static void
nxt_ruby_ctx_done(nxt_ruby_ctx_t *rctx)
{
if (rctx->io_input != Qnil) {
rb_gc_unregister_address(&rctx->io_input);
}
if (rctx->io_error != Qnil) {
rb_gc_unregister_address(&rctx->io_error);
}
if (rctx->env != Qnil) {
rb_gc_unregister_address(&rctx->env);
}
}
static void
nxt_ruby_atexit(void)
{
if (nxt_ruby_rackup != Qnil) {
rb_gc_unregister_address(&nxt_ruby_rackup);
}
if (nxt_ruby_call != Qnil) {
rb_gc_unregister_address(&nxt_ruby_call);
}
if (nxt_ruby_hook_procs != Qnil) {
rb_gc_unregister_address(&nxt_ruby_hook_procs);
}
nxt_ruby_done_strings();
ruby_cleanup(0);
}
static int
nxt_ruby_ready_handler(nxt_unit_ctx_t *ctx)
{
VALUE res;
uint32_t i;
nxt_ruby_ctx_t *rctx;
nxt_ruby_app_conf_t *c;
c = ctx->unit->data;
if (c->threads <= 1) {
return NXT_UNIT_OK;
}
for (i = 0; i < c->threads - 1; i++) {
rctx = &nxt_ruby_ctxs[i];
rctx->ctx = ctx;
res = (VALUE) rb_thread_call_with_gvl(nxt_ruby_thread_create_gvl, rctx);
if (nxt_fast_path(res != Qnil)) {
nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
rctx->thread = res;
} else {
nxt_unit_alert(ctx, "thread #%d create failed", (int) (i + 1));
return NXT_UNIT_ERROR;
}
}
return NXT_UNIT_OK;
}
static void *
nxt_ruby_thread_create_gvl(void *rctx)
{
VALUE res;
res = rb_thread_create(RUBY_METHOD_FUNC(nxt_ruby_thread_func), rctx);
return (void *) (uintptr_t) res;
}
static VALUE
nxt_ruby_thread_func(VALUE arg)
{
int state;
nxt_unit_ctx_t *ctx;
nxt_ruby_ctx_t *rctx;
rctx = (nxt_ruby_ctx_t *) (uintptr_t) arg;
nxt_unit_debug(rctx->ctx, "worker thread start");
ctx = nxt_unit_ctx_alloc(rctx->ctx, rctx);
if (nxt_slow_path(ctx == NULL)) {
goto fail;
}
if (nxt_ruby_hook_procs != Qnil) {
rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_boot, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to call on_thread_boot()");
}
}
(void) rb_thread_call_without_gvl(nxt_ruby_unit_run, ctx,
nxt_ruby_ubf, ctx);
if (nxt_ruby_hook_procs != Qnil) {
rb_protect(nxt_ruby_hook_call, nxt_rb_on_thread_shutdown, &state);
if (nxt_slow_path(state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ERR,
"Failed to call on_thread_shutdown()");
}
}
nxt_unit_done(ctx);
fail:
nxt_unit_debug(NULL, "worker thread end");
return Qnil;
}
static void *
nxt_ruby_unit_run(void *ctx)
{
return (void *) (intptr_t) nxt_unit_run(ctx);
}
static void
nxt_ruby_ubf(void *ctx)
{
nxt_unit_warn(ctx, "Ruby: UBF");
}
static int
nxt_ruby_init_threads(nxt_ruby_app_conf_t *c)
{
int state;
uint32_t i;
nxt_ruby_ctx_t *rctx;
if (c->threads <= 1) {
return NXT_UNIT_OK;
}
nxt_ruby_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_ruby_ctx_t)
* (c->threads - 1));
if (nxt_slow_path(nxt_ruby_ctxs == NULL)) {
nxt_unit_alert(NULL, "Failed to allocate run contexts array");
return NXT_UNIT_ERROR;
}
for (i = 0; i < c->threads - 1; i++) {
rctx = &nxt_ruby_ctxs[i];
rctx->env = Qnil;
rctx->io_input = Qnil;
rctx->io_error = Qnil;
rctx->thread = Qnil;
}
for (i = 0; i < c->threads - 1; i++) {
rctx = &nxt_ruby_ctxs[i];
rctx->env = rb_protect(nxt_ruby_rack_env_create,
(VALUE) (uintptr_t) rctx, &state);
if (nxt_slow_path(rctx->env == Qnil || state != 0)) {
nxt_ruby_exception_log(NULL, NXT_LOG_ALERT,
"Failed to create 'environ' variable");
return NXT_UNIT_ERROR;
}
}
return NXT_UNIT_OK;
}
static void
nxt_ruby_join_threads(nxt_unit_ctx_t *ctx, nxt_ruby_app_conf_t *c)
{
uint32_t i;
nxt_ruby_ctx_t *rctx;
if (nxt_ruby_ctxs == NULL) {
return;
}
for (i = 0; i < c->threads - 1; i++) {
rctx = &nxt_ruby_ctxs[i];
if (rctx->thread != Qnil) {
rb_funcall(rctx->thread, rb_intern("join"), 0);
nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
} else {
nxt_unit_debug(ctx, "thread #%d not started", (int) (i + 1));
}
}
for (i = 0; i < c->threads - 1; i++) {
nxt_ruby_ctx_done(&nxt_ruby_ctxs[i]);
}
nxt_unit_free(ctx, nxt_ruby_ctxs);
}