Perl: request processing in multiple threads.

This closes #486 issue on GitHub.
This commit is contained in:
Max Romanov
2020-11-05 16:10:59 +03:00
parent e17e73edda
commit d321d454f9
5 changed files with 374 additions and 148 deletions

View File

@@ -64,6 +64,8 @@ typedef struct {
typedef struct { typedef struct {
char *script; char *script;
uint32_t threads;
uint32_t thread_stack_size;
} nxt_perl_app_conf_t; } nxt_perl_app_conf_t;

View File

@@ -604,6 +604,14 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_perl_members[] = {
.name = nxt_string("script"), .name = nxt_string("script"),
.type = NXT_CONF_VLDT_STRING, .type = NXT_CONF_VLDT_STRING,
.flags = NXT_CONF_VLDT_REQUIRED, .flags = NXT_CONF_VLDT_REQUIRED,
}, {
.name = nxt_string("threads"),
.type = NXT_CONF_VLDT_INTEGER,
.validator = nxt_conf_vldt_threads,
}, {
.name = nxt_string("thread_stack_size"),
.type = NXT_CONF_VLDT_INTEGER,
.validator = nxt_conf_vldt_thread_stack_size,
}, },
NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members) NXT_CONF_VLDT_NEXT(nxt_conf_vldt_common_members)

View File

@@ -233,6 +233,18 @@ static nxt_conf_map_t nxt_perl_app_conf[] = {
NXT_CONF_MAP_CSTRZ, NXT_CONF_MAP_CSTRZ,
offsetof(nxt_common_app_conf_t, u.perl.script), offsetof(nxt_common_app_conf_t, u.perl.script),
}, },
{
nxt_string("threads"),
NXT_CONF_MAP_INT32,
offsetof(nxt_common_app_conf_t, u.perl.threads),
},
{
nxt_string("thread_stack_size"),
NXT_CONF_MAP_INT32,
offsetof(nxt_common_app_conf_t, u.perl.thread_stack_size),
},
}; };

View File

@@ -18,14 +18,14 @@
typedef struct { typedef struct {
PerlInterpreter *my_perl; PerlInterpreter *my_perl;
nxt_unit_request_info_t *req; nxt_perl_psgi_io_arg_t arg_input;
} nxt_perl_psgi_input_t; nxt_perl_psgi_io_arg_t arg_error;
typedef struct {
PerlInterpreter *my_perl;
SV *app; SV *app;
} nxt_perl_psgi_module_t; CV *cb;
nxt_unit_request_info_t *req;
pthread_t thread;
nxt_unit_ctx_t *ctx;
} nxt_perl_psgi_ctx_t;
static long nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl, static long nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl,
@@ -62,11 +62,11 @@ static nxt_int_t nxt_perl_psgi_io_input_init(PerlInterpreter *my_perl,
static nxt_int_t nxt_perl_psgi_io_error_init(PerlInterpreter *my_perl, static nxt_int_t nxt_perl_psgi_io_error_init(PerlInterpreter *my_perl,
nxt_perl_psgi_io_arg_t *arg); nxt_perl_psgi_io_arg_t *arg);
static PerlInterpreter *nxt_perl_psgi_interpreter_init(nxt_task_t *task, static int nxt_perl_psgi_ctx_init(const char *script,
char *script, SV **app); nxt_perl_psgi_ctx_t *pctx);
static SV *nxt_perl_psgi_env_create(PerlInterpreter *my_perl, static SV *nxt_perl_psgi_env_create(PerlInterpreter *my_perl,
nxt_unit_request_info_t *req, nxt_perl_psgi_input_t *input); nxt_unit_request_info_t *req);
nxt_inline int nxt_perl_psgi_add_sptr(PerlInterpreter *my_perl, HV *hash_env, nxt_inline int nxt_perl_psgi_add_sptr(PerlInterpreter *my_perl, HV *hash_env,
const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len); const char *name, uint32_t name_len, nxt_unit_sptr_t *sptr, uint32_t len);
nxt_inline int nxt_perl_psgi_add_str(PerlInterpreter *my_perl, HV *hash_env, nxt_inline int nxt_perl_psgi_add_str(PerlInterpreter *my_perl, HV *hash_env,
@@ -75,8 +75,7 @@ nxt_inline int nxt_perl_psgi_add_value(PerlInterpreter *my_perl, HV *hash_env,
const char *name, uint32_t name_len, void *value); const char *name, uint32_t name_len, void *value);
static u_char *nxt_perl_psgi_module_create(nxt_task_t *task, static char *nxt_perl_psgi_module_create(const char *script);
const char *script);
static nxt_int_t nxt_perl_psgi_result_status(PerlInterpreter *my_perl, static nxt_int_t nxt_perl_psgi_result_status(PerlInterpreter *my_perl,
SV *result); SV *result);
@@ -96,18 +95,20 @@ static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
nxt_unit_request_info_t *req); nxt_unit_request_info_t *req);
static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task, static nxt_int_t nxt_perl_psgi_start(nxt_task_t *task,
nxt_process_data_t *conf); nxt_process_data_t *data);
static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req); static void nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req);
static void nxt_perl_psgi_atexit(void); static int nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx);
static void *nxt_perl_psgi_thread_func(void *main_ctx);
typedef SV *(*nxt_perl_psgi_callback_f)(PerlInterpreter *my_perl, static int nxt_perl_psgi_init_threads(nxt_perl_app_conf_t *c);
SV *env, nxt_task_t *task); static void nxt_perl_psgi_join_threads(nxt_unit_ctx_t *ctx,
nxt_perl_app_conf_t *c);
static void nxt_perl_psgi_ctx_free(nxt_perl_psgi_ctx_t *pctx);
static CV *nxt_perl_psgi_write;
static CV *nxt_perl_psgi_close;
static CV *nxt_perl_psgi_cb; static CV *nxt_perl_psgi_cb;
static PerlInterpreter *nxt_perl_psgi; static pthread_attr_t *nxt_perl_psgi_thread_attr;
static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_input; static nxt_perl_psgi_ctx_t *nxt_perl_psgi_ctxs;
static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_error;
static nxt_unit_request_info_t *nxt_perl_psgi_request;
static uint32_t nxt_perl_psgi_compat[] = { static uint32_t nxt_perl_psgi_compat[] = {
NXT_VERNUM, NXT_DEBUG, NXT_VERNUM, NXT_DEBUG,
@@ -129,11 +130,11 @@ static long
nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl, nxt_perl_psgi_io_input_read(PerlInterpreter *my_perl,
nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length) nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length)
{ {
nxt_perl_psgi_input_t *input; nxt_perl_psgi_ctx_t *pctx;
input = (nxt_perl_psgi_input_t *) arg->ctx; pctx = arg->pctx;
return nxt_unit_request_read(input->req, vbuf, length); return nxt_unit_request_read(pctx->req, vbuf, length);
} }
@@ -165,10 +166,11 @@ static long
nxt_perl_psgi_io_error_write(PerlInterpreter *my_perl, nxt_perl_psgi_io_error_write(PerlInterpreter *my_perl,
nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length) nxt_perl_psgi_io_arg_t *arg, const void *vbuf, size_t length)
{ {
nxt_perl_psgi_input_t *input; nxt_perl_psgi_ctx_t *pctx;
input = (nxt_perl_psgi_input_t *) arg->ctx; pctx = arg->pctx;
nxt_unit_req_error(input->req, "Perl: %s", (const char*) vbuf);
nxt_unit_req_error(pctx->req, "Perl: %s", (const char*) vbuf);
return (long) length; return (long) length;
} }
@@ -219,6 +221,7 @@ XS(XS_NGINX__Unit__Sandbox_write)
int rc; int rc;
char *body; char *body;
size_t len; size_t len;
nxt_perl_psgi_ctx_t *pctx;
dXSARGS; dXSARGS;
@@ -230,7 +233,9 @@ XS(XS_NGINX__Unit__Sandbox_write)
body = SvPV(ST(1), len); body = SvPV(ST(1), len);
rc = nxt_unit_response_write(nxt_perl_psgi_request, body, len); pctx = CvXSUBANY(cv).any_ptr;
rc = nxt_unit_response_write(pctx->req, body, len);
if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (nxt_slow_path(rc != NXT_UNIT_OK)) {
Perl_croak(aTHX_ "Failed to write response body"); Perl_croak(aTHX_ "Failed to write response body");
@@ -242,15 +247,11 @@ XS(XS_NGINX__Unit__Sandbox_write)
nxt_inline void nxt_inline void
nxt_perl_psgi_cb_request_done(nxt_int_t status) nxt_perl_psgi_cb_request_done(nxt_perl_psgi_ctx_t *pctx, int status)
{ {
nxt_unit_request_info_t *req; if (pctx->req != NULL) {
nxt_unit_request_done(pctx->req, status);
req = nxt_perl_psgi_request; pctx->req = NULL;
if (req != NULL) {
nxt_unit_request_done(req, status);
nxt_perl_psgi_request = NULL;
} }
} }
@@ -262,7 +263,7 @@ XS(XS_NGINX__Unit__Sandbox_close)
ax = POPMARK; ax = POPMARK;
nxt_perl_psgi_cb_request_done(NXT_UNIT_OK); nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_OK);
XSRETURN_NO; XSRETURN_NO;
} }
@@ -274,11 +275,12 @@ XS(XS_NGINX__Unit__Sandbox_cb)
SV *obj; SV *obj;
int rc; int rc;
long array_len; long array_len;
nxt_perl_psgi_ctx_t *pctx;
dXSARGS; dXSARGS;
if (nxt_slow_path(items != 1)) { if (nxt_slow_path(items != 1)) {
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR);
Perl_croak(aTHX_ "Wrong number of arguments"); Perl_croak(aTHX_ "Wrong number of arguments");
@@ -288,7 +290,7 @@ XS(XS_NGINX__Unit__Sandbox_cb)
if (nxt_slow_path(SvOK(ST(0)) == 0 || SvROK(ST(0)) == 0 if (nxt_slow_path(SvOK(ST(0)) == 0 || SvROK(ST(0)) == 0
|| SvTYPE(SvRV(ST(0))) != SVt_PVAV)) || SvTYPE(SvRV(ST(0))) != SVt_PVAV))
{ {
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR);
Perl_croak(aTHX_ "PSGI: An unexpected response was received " Perl_croak(aTHX_ "PSGI: An unexpected response was received "
"from Perl Application"); "from Perl Application");
@@ -296,10 +298,11 @@ XS(XS_NGINX__Unit__Sandbox_cb)
XSRETURN_EMPTY; XSRETURN_EMPTY;
} }
rc = nxt_perl_psgi_result_array(PERL_GET_CONTEXT, ST(0), pctx = CvXSUBANY(cv).any_ptr;
nxt_perl_psgi_request);
rc = nxt_perl_psgi_result_array(PERL_GET_CONTEXT, ST(0), pctx->req);
if (nxt_slow_path(rc != NXT_UNIT_OK)) { if (nxt_slow_path(rc != NXT_UNIT_OK)) {
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_ERROR);
Perl_croak(aTHX_ (char *) NULL); Perl_croak(aTHX_ (char *) NULL);
@@ -316,7 +319,7 @@ XS(XS_NGINX__Unit__Sandbox_cb)
XSRETURN(1); XSRETURN(1);
} }
nxt_perl_psgi_cb_request_done(NXT_UNIT_OK); nxt_perl_psgi_cb_request_done(CvXSUBANY(cv).any_ptr, NXT_UNIT_OK);
XSRETURN_EMPTY; XSRETURN_EMPTY;
} }
@@ -335,10 +338,11 @@ nxt_perl_psgi_xs_init(pTHX)
/* DynaLoader for Perl modules who use XS */ /* DynaLoader for Perl modules who use XS */
newXS("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__); newXS("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__);
newXS("NGINX::Unit::Sandbox::write", XS_NGINX__Unit__Sandbox_write, nxt_perl_psgi_write = newXS("NGINX::Unit::Sandbox::write",
__FILE__); XS_NGINX__Unit__Sandbox_write, __FILE__);
newXS("NGINX::Unit::Sandbox::close", XS_NGINX__Unit__Sandbox_close,
__FILE__); nxt_perl_psgi_close = newXS("NGINX::Unit::Sandbox::close",
XS_NGINX__Unit__Sandbox_close, __FILE__);
nxt_perl_psgi_cb = newXS("NGINX::Unit::Sandbox::cb", nxt_perl_psgi_cb = newXS("NGINX::Unit::Sandbox::cb",
XS_NGINX__Unit__Sandbox_cb, __FILE__); XS_NGINX__Unit__Sandbox_cb, __FILE__);
@@ -416,10 +420,10 @@ nxt_perl_psgi_call_method(PerlInterpreter *my_perl, SV *obj, const char *method,
} }
static u_char * static char *
nxt_perl_psgi_module_create(nxt_task_t *task, const char *script) nxt_perl_psgi_module_create(const char *script)
{ {
u_char *buf, *p; char *buf, *p;
size_t length; size_t length;
static nxt_str_t prefix = nxt_string( static nxt_str_t prefix = nxt_string(
@@ -441,12 +445,11 @@ nxt_perl_psgi_module_create(nxt_task_t *task, const char *script)
length = strlen(script); length = strlen(script);
buf = nxt_malloc(prefix.length + length + suffix.length); buf = nxt_unit_malloc(NULL, prefix.length + length + suffix.length);
if (nxt_slow_path(buf == NULL)) { if (nxt_slow_path(buf == NULL)) {
nxt_log_error(NXT_LOG_ERR, task->log, nxt_unit_alert(NULL, "PSGI: Failed to allocate memory "
"PSGI: Failed to allocate memory "
"for Perl script file %s", script); "for Perl script file %s", script);
return NULL; return NULL;
} }
@@ -518,30 +521,27 @@ nxt_perl_psgi_io_error_init(PerlInterpreter *my_perl,
} }
static PerlInterpreter * static int
nxt_perl_psgi_interpreter_init(nxt_task_t *task, char *script, SV **app) nxt_perl_psgi_ctx_init(const char *script, nxt_perl_psgi_ctx_t *pctx)
{ {
int status, pargc; int status;
char **pargv, **penv; char *run_module;
u_char *run_module;
PerlInterpreter *my_perl; PerlInterpreter *my_perl;
static char argv[] = "\0""-e\0""0"; static char argv[] = "\0""-e\0""0";
static char *embedding[] = { &argv[0], &argv[1], &argv[4] }; static char *embedding[] = { &argv[0], &argv[1], &argv[4] };
pargc = 0;
pargv = NULL;
penv = NULL;
PERL_SYS_INIT3(&pargc, &pargv, &penv);
my_perl = perl_alloc(); my_perl = perl_alloc();
if (nxt_slow_path(my_perl == NULL)) { if (nxt_slow_path(my_perl == NULL)) {
nxt_alert(task, "PSGI: Failed to allocate memory for Perl interpreter"); nxt_unit_alert(NULL,
return NULL; "PSGI: Failed to allocate memory for Perl interpreter");
return NXT_UNIT_ERROR;
} }
pctx->my_perl = my_perl;
run_module = NULL; run_module = NULL;
perl_construct(my_perl); perl_construct(my_perl);
@@ -550,77 +550,86 @@ nxt_perl_psgi_interpreter_init(nxt_task_t *task, char *script, SV **app)
status = perl_parse(my_perl, nxt_perl_psgi_xs_init, 3, embedding, NULL); status = perl_parse(my_perl, nxt_perl_psgi_xs_init, 3, embedding, NULL);
if (nxt_slow_path(status != 0)) { if (nxt_slow_path(status != 0)) {
nxt_alert(task, "PSGI: Failed to parse Perl Script"); nxt_unit_alert(NULL, "PSGI: Failed to parse Perl Script");
goto fail; goto fail;
} }
CvXSUBANY(nxt_perl_psgi_write).any_ptr = pctx;
CvXSUBANY(nxt_perl_psgi_close).any_ptr = pctx;
CvXSUBANY(nxt_perl_psgi_cb).any_ptr = pctx;
pctx->cb = nxt_perl_psgi_cb;
PL_exit_flags |= PERL_EXIT_DESTRUCT_END; PL_exit_flags |= PERL_EXIT_DESTRUCT_END;
PL_origalen = 1; PL_origalen = 1;
status = perl_run(my_perl); status = perl_run(my_perl);
if (nxt_slow_path(status != 0)) { if (nxt_slow_path(status != 0)) {
nxt_alert(task, "PSGI: Failed to run Perl"); nxt_unit_alert(NULL, "PSGI: Failed to run Perl");
goto fail; goto fail;
} }
sv_setsv(get_sv("0", 0), newSVpv(script, 0)); sv_setsv(get_sv("0", 0), newSVpv(script, 0));
run_module = nxt_perl_psgi_module_create(task, script); run_module = nxt_perl_psgi_module_create(script);
if (nxt_slow_path(run_module == NULL)) { if (nxt_slow_path(run_module == NULL)) {
goto fail; goto fail;
} }
status = nxt_perl_psgi_io_input_init(my_perl, &nxt_perl_psgi_arg_input); pctx->arg_input.pctx = pctx;
status = nxt_perl_psgi_io_input_init(my_perl, &pctx->arg_input);
if (nxt_slow_path(status != NXT_OK)) { if (nxt_slow_path(status != NXT_OK)) {
nxt_alert(task, "PSGI: Failed to init io.psgi.input"); nxt_unit_alert(NULL, "PSGI: Failed to init io.psgi.input");
goto fail; goto fail;
} }
status = nxt_perl_psgi_io_error_init(my_perl, &nxt_perl_psgi_arg_error); pctx->arg_error.pctx = pctx;
status = nxt_perl_psgi_io_error_init(my_perl, &pctx->arg_error);
if (nxt_slow_path(status != NXT_OK)) { if (nxt_slow_path(status != NXT_OK)) {
nxt_alert(task, "PSGI: Failed to init io.psgi.errors"); nxt_unit_alert(NULL, "PSGI: Failed to init io.psgi.errors");
goto fail; goto fail;
} }
*app = eval_pv((const char *) run_module, FALSE); pctx->app = eval_pv(run_module, FALSE);
if (SvTRUE(ERRSV)) { if (SvTRUE(ERRSV)) {
nxt_alert(task, "PSGI: Failed to parse script: %s\n%s", nxt_unit_alert(NULL, "PSGI: Failed to parse script: %s\n%s",
script, SvPV_nolen(ERRSV)); script, SvPV_nolen(ERRSV));
goto fail; goto fail;
} }
nxt_free(run_module); nxt_unit_free(NULL, run_module);
return my_perl; return NXT_UNIT_OK;
fail: fail:
if (run_module != NULL) { if (run_module != NULL) {
nxt_free(run_module); nxt_unit_free(NULL, run_module);
} }
perl_destruct(my_perl); perl_destruct(my_perl);
perl_free(my_perl); perl_free(my_perl);
PERL_SYS_TERM();
return NULL; return NXT_UNIT_ERROR;
} }
static SV * static SV *
nxt_perl_psgi_env_create(PerlInterpreter *my_perl, nxt_perl_psgi_env_create(PerlInterpreter *my_perl,
nxt_unit_request_info_t *req, nxt_perl_psgi_input_t *input) nxt_unit_request_info_t *req)
{ {
HV *hash_env; HV *hash_env;
AV *array_version; AV *array_version;
uint32_t i; uint32_t i;
nxt_unit_field_t *f; nxt_unit_field_t *f;
nxt_unit_request_t *r; nxt_unit_request_t *r;
nxt_perl_psgi_ctx_t *pctx;
pctx = req->ctx->data;
hash_env = newHV(); hash_env = newHV();
if (nxt_slow_path(hash_env == NULL)) { if (nxt_slow_path(hash_env == NULL)) {
@@ -664,11 +673,12 @@ nxt_perl_psgi_env_create(PerlInterpreter *my_perl,
: newSVpv("http", 4))); : newSVpv("http", 4)));
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.input"), RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.input"),
SvREFCNT_inc(nxt_perl_psgi_arg_input.io))); SvREFCNT_inc(pctx->arg_input.io)));
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.errors"), RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.errors"),
SvREFCNT_inc(nxt_perl_psgi_arg_error.io))); SvREFCNT_inc(pctx->arg_error.io)));
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multithread"), RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multithread"),
&PL_sv_no)); nxt_perl_psgi_ctxs != NULL
? &PL_sv_yes : &PL_sv_no));
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multiprocess"), RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.multiprocess"),
&PL_sv_yes)); &PL_sv_yes));
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.run_once"), RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.run_once"),
@@ -1109,13 +1119,17 @@ static void
nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result, nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
nxt_unit_request_info_t *req) nxt_unit_request_info_t *req)
{ {
nxt_perl_psgi_ctx_t *pctx;
dSP; dSP;
pctx = req->ctx->data;
ENTER; ENTER;
SAVETMPS; SAVETMPS;
PUSHMARK(sp); PUSHMARK(sp);
XPUSHs(newRV_noinc((SV*) nxt_perl_psgi_cb)); XPUSHs(newRV_noinc((SV*) pctx->cb));
PUTBACK; PUTBACK;
call_sv(result, G_EVAL|G_SCALAR); call_sv(result, G_EVAL|G_SCALAR);
@@ -1126,7 +1140,7 @@ nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
nxt_unit_error(NULL, "PSGI: Failed to execute result callback: \n%s", nxt_unit_error(NULL, "PSGI: Failed to execute result callback: \n%s",
SvPV_nolen(ERRSV)); SvPV_nolen(ERRSV));
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR); nxt_perl_psgi_cb_request_done(pctx, NXT_UNIT_ERROR);
} }
PUTBACK; PUTBACK;
@@ -1138,45 +1152,74 @@ nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
static nxt_int_t static nxt_int_t
nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data) nxt_perl_psgi_start(nxt_task_t *task, nxt_process_data_t *data)
{ {
int rc; int rc, pargc;
char **pargv, **penv;
nxt_unit_ctx_t *unit_ctx; nxt_unit_ctx_t *unit_ctx;
nxt_unit_init_t perl_init; nxt_unit_init_t perl_init;
PerlInterpreter *my_perl; nxt_perl_psgi_ctx_t pctx;
nxt_common_app_conf_t *conf; nxt_perl_app_conf_t *c;
nxt_perl_psgi_module_t module; nxt_common_app_conf_t *common_conf;
conf = data->app; common_conf = data->app;
c = &common_conf->u.perl;
my_perl = nxt_perl_psgi_interpreter_init(task, conf->u.perl.script, pargc = 0;
&module.app); pargv = NULL;
penv = NULL;
if (nxt_slow_path(my_perl == NULL)) { PERL_SYS_INIT3(&pargc, &pargv, &penv);
return NXT_ERROR;
memset(&pctx, 0, sizeof(nxt_perl_psgi_ctx_t));
rc = nxt_perl_psgi_ctx_init(c->script, &pctx);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
} }
module.my_perl = my_perl; rc = nxt_perl_psgi_init_threads(c);
nxt_perl_psgi = my_perl;
PERL_SET_CONTEXT(pctx.my_perl);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
goto fail;
}
nxt_unit_default_init(task, &perl_init); nxt_unit_default_init(task, &perl_init);
perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler; perl_init.callbacks.request_handler = nxt_perl_psgi_request_handler;
perl_init.data = &module; perl_init.callbacks.ready_handler = nxt_perl_psgi_ready_handler;
perl_init.shm_limit = conf->shm_limit; perl_init.data = c;
perl_init.ctx_data = &pctx;
perl_init.shm_limit = common_conf->shm_limit;
unit_ctx = nxt_unit_init(&perl_init); unit_ctx = nxt_unit_init(&perl_init);
if (nxt_slow_path(unit_ctx == NULL)) { if (nxt_slow_path(unit_ctx == NULL)) {
return NXT_ERROR; goto fail;
} }
rc = nxt_unit_run(unit_ctx); rc = nxt_unit_run(unit_ctx);
nxt_perl_psgi_join_threads(unit_ctx, c);
nxt_unit_done(unit_ctx); nxt_unit_done(unit_ctx);
nxt_perl_psgi_atexit(); nxt_perl_psgi_ctx_free(&pctx);
PERL_SYS_TERM();
exit(rc); exit(rc);
return NXT_OK; return NXT_OK;
fail:
nxt_perl_psgi_join_threads(NULL, c);
nxt_perl_psgi_ctx_free(&pctx);
PERL_SYS_TERM();
return NXT_ERROR;
} }
@@ -1186,42 +1229,39 @@ nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req)
SV *env, *result; SV *env, *result;
nxt_int_t rc; nxt_int_t rc;
PerlInterpreter *my_perl; PerlInterpreter *my_perl;
nxt_perl_psgi_input_t input; nxt_perl_psgi_ctx_t *pctx;
nxt_perl_psgi_module_t *module;
module = req->unit->data; pctx = req->ctx->data;
my_perl = module->my_perl; my_perl = pctx->my_perl;
input.my_perl = my_perl; pctx->req = req;
input.req = req;
nxt_perl_psgi_request = req;
/* /*
* Create environ variable for perl sub "application". * Create environ variable for perl sub "application".
* > sub application { * > sub application {
* > my ($environ) = @_; * > my ($environ) = @_;
*/ */
env = nxt_perl_psgi_env_create(my_perl, req, &input); env = nxt_perl_psgi_env_create(my_perl, req);
if (nxt_slow_path(env == NULL)) { if (nxt_slow_path(env == NULL)) {
nxt_unit_req_error(req, nxt_unit_req_error(req,
"PSGI: Failed to create 'env' for Perl Application"); "PSGI: Failed to create 'env' for Perl Application");
nxt_unit_request_done(req, NXT_UNIT_ERROR); nxt_unit_request_done(req, NXT_UNIT_ERROR);
pctx->req = NULL;
return; return;
} }
nxt_perl_psgi_arg_input.ctx = &input;
nxt_perl_psgi_arg_error.ctx = &input;
/* Call perl sub and get result as SV*. */ /* Call perl sub and get result as SV*. */
result = nxt_perl_psgi_call_var_application(my_perl, env, module->app, req); result = nxt_perl_psgi_call_var_application(my_perl, env, pctx->app,
req);
if (nxt_fast_path(SvOK(result) != 0 && SvROK(result) != 0)) { if (nxt_fast_path(SvOK(result) != 0 && SvROK(result) != 0)) {
if (SvTYPE(SvRV(result)) == SVt_PVAV) { if (SvTYPE(SvRV(result)) == SVt_PVAV) {
rc = nxt_perl_psgi_result_array(my_perl, result, req); rc = nxt_perl_psgi_result_array(my_perl, result, req);
nxt_unit_request_done(req, rc); nxt_unit_request_done(req, rc);
pctx->req = NULL;
goto release; goto release;
} }
@@ -1235,6 +1275,7 @@ nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req)
"from Perl Application"); "from Perl Application");
nxt_unit_request_done(req, NXT_UNIT_ERROR); nxt_unit_request_done(req, NXT_UNIT_ERROR);
pctx->req = NULL;
release: release:
@@ -1243,18 +1284,181 @@ release:
} }
static void static int
nxt_perl_psgi_atexit(void) nxt_perl_psgi_ready_handler(nxt_unit_ctx_t *ctx)
{ {
dTHXa(nxt_perl_psgi); int res;
uint32_t i;
nxt_perl_app_conf_t *c;
nxt_perl_psgi_ctx_t *pctx;
nxt_perl_psgi_layer_stream_io_destroy(aTHX_ nxt_perl_psgi_arg_input.io); /* Worker thread context. */
nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ nxt_perl_psgi_arg_input.fp); if (!nxt_unit_is_main_ctx(ctx)) {
return NXT_UNIT_OK;
nxt_perl_psgi_layer_stream_io_destroy(aTHX_ nxt_perl_psgi_arg_error.io); }
nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ nxt_perl_psgi_arg_error.fp);
c = ctx->unit->data;
perl_destruct(nxt_perl_psgi);
perl_free(nxt_perl_psgi); if (c->threads <= 1) {
PERL_SYS_TERM(); return NXT_UNIT_OK;
}
for (i = 0; i < c->threads - 1; i++) {
pctx = &nxt_perl_psgi_ctxs[i];
pctx->ctx = ctx;
res = pthread_create(&pctx->thread, nxt_perl_psgi_thread_attr,
nxt_perl_psgi_thread_func, pctx);
if (nxt_fast_path(res == 0)) {
nxt_unit_debug(ctx, "thread #%d created", (int) (i + 1));
} else {
nxt_unit_alert(ctx, "thread #%d create failed: %s (%d)",
(int) (i + 1), strerror(res), res);
return NXT_UNIT_ERROR;
}
}
return NXT_UNIT_OK;
}
static void *
nxt_perl_psgi_thread_func(void *data)
{
nxt_unit_ctx_t *ctx;
nxt_perl_psgi_ctx_t *pctx;
pctx = data;
nxt_unit_debug(pctx->ctx, "worker thread start");
ctx = nxt_unit_ctx_alloc(pctx->ctx, pctx);
if (nxt_slow_path(ctx == NULL)) {
return NULL;
}
pctx->ctx = ctx;
PERL_SET_CONTEXT(pctx->my_perl);
(void) nxt_unit_run(ctx);
nxt_unit_done(ctx);
nxt_unit_debug(NULL, "worker thread end");
return NULL;
}
static int
nxt_perl_psgi_init_threads(nxt_perl_app_conf_t *c)
{
int rc;
uint32_t i;
static pthread_attr_t attr;
if (c->threads <= 1) {
return NXT_UNIT_OK;
}
if (c->thread_stack_size > 0) {
rc = pthread_attr_init(&attr);
if (nxt_slow_path(rc != 0)) {
nxt_unit_alert(NULL, "thread attr init failed: %s (%d)",
strerror(rc), rc);
return NXT_UNIT_ERROR;
}
rc = pthread_attr_setstacksize(&attr, c->thread_stack_size);
if (nxt_slow_path(rc != 0)) {
nxt_unit_alert(NULL, "thread attr set stack size failed: %s (%d)",
strerror(rc), rc);
return NXT_UNIT_ERROR;
}
nxt_perl_psgi_thread_attr = &attr;
}
nxt_perl_psgi_ctxs = nxt_unit_malloc(NULL, sizeof(nxt_perl_psgi_ctx_t)
* (c->threads - 1));
if (nxt_slow_path(nxt_perl_psgi_ctxs == NULL)) {
return NXT_UNIT_ERROR;
}
memset(nxt_perl_psgi_ctxs, 0, sizeof(nxt_perl_psgi_ctx_t)
* (c->threads - 1));
for (i = 0; i < c->threads - 1; i++) {
rc = nxt_perl_psgi_ctx_init(c->script, &nxt_perl_psgi_ctxs[i]);
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
return NXT_UNIT_ERROR;
}
}
return NXT_UNIT_OK;
}
static void
nxt_perl_psgi_join_threads(nxt_unit_ctx_t *ctx, nxt_perl_app_conf_t *c)
{
int res;
uint32_t i;
nxt_perl_psgi_ctx_t *pctx;
if (nxt_perl_psgi_ctxs == NULL) {
return;
}
for (i = 0; i < c->threads - 1; i++) {
pctx = &nxt_perl_psgi_ctxs[i];
res = pthread_join(pctx->thread, NULL);
if (nxt_fast_path(res == 0)) {
nxt_unit_debug(ctx, "thread #%d joined", (int) (i + 1));
} else {
nxt_unit_alert(ctx, "thread #%d join failed: %s (%d)",
(int) (i + 1), strerror(res), res);
}
}
for (i = 0; i < c->threads - 1; i++) {
nxt_perl_psgi_ctx_free(&nxt_perl_psgi_ctxs[i]);
}
nxt_unit_free(NULL, nxt_perl_psgi_ctxs);
}
static void
nxt_perl_psgi_ctx_free(nxt_perl_psgi_ctx_t *pctx)
{
PerlInterpreter *my_perl;
my_perl = pctx->my_perl;
if (nxt_slow_path(my_perl == NULL)) {
return;
}
PERL_SET_CONTEXT(my_perl);
nxt_perl_psgi_layer_stream_io_destroy(aTHX_ pctx->arg_input.io);
nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ pctx->arg_input.fp);
nxt_perl_psgi_layer_stream_io_destroy(aTHX_ pctx->arg_error.io);
nxt_perl_psgi_layer_stream_fp_destroy(aTHX_ pctx->arg_error.fp);
perl_destruct(my_perl);
perl_free(my_perl);
} }

View File

@@ -14,7 +14,7 @@
#include <perliol.h> #include <perliol.h>
typedef struct nxt_perl_psgi_io_arg nxt_perl_psgi_io_arg_t; typedef struct nxt_perl_psgi_io_arg_s nxt_perl_psgi_io_arg_t;
typedef long (*nxt_perl_psgi_io_read_f)(PerlInterpreter *my_perl, typedef long (*nxt_perl_psgi_io_read_f)(PerlInterpreter *my_perl,
nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length); nxt_perl_psgi_io_arg_t *arg, void *vbuf, size_t length);
@@ -24,7 +24,7 @@ typedef long (*nxt_perl_psgi_io_arg_f)(PerlInterpreter *my_perl,
nxt_perl_psgi_io_arg_t *arg); nxt_perl_psgi_io_arg_t *arg);
struct nxt_perl_psgi_io_arg { struct nxt_perl_psgi_io_arg_s {
SV *io; SV *io;
PerlIO *fp; PerlIO *fp;
@@ -32,7 +32,7 @@ struct nxt_perl_psgi_io_arg {
nxt_perl_psgi_io_read_f read; nxt_perl_psgi_io_read_f read;
nxt_perl_psgi_io_write_f write; nxt_perl_psgi_io_write_f write;
void *ctx; void *pctx;
}; };