Perl: added implementation delayed response and streaming body.
This commit is contained in:
@@ -92,6 +92,8 @@ static ssize_t nxt_perl_psgi_io_read(nxt_unit_read_info_t *read_info, void *dst,
|
|||||||
size_t size);
|
size_t size);
|
||||||
static int nxt_perl_psgi_result_array(PerlInterpreter *my_perl,
|
static int nxt_perl_psgi_result_array(PerlInterpreter *my_perl,
|
||||||
SV *result, nxt_unit_request_info_t *req);
|
SV *result, nxt_unit_request_info_t *req);
|
||||||
|
static void nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
|
||||||
|
nxt_unit_request_info_t *req);
|
||||||
|
|
||||||
static nxt_int_t nxt_perl_psgi_init(nxt_task_t *task,
|
static nxt_int_t nxt_perl_psgi_init(nxt_task_t *task,
|
||||||
nxt_common_app_conf_t *conf);
|
nxt_common_app_conf_t *conf);
|
||||||
@@ -101,8 +103,11 @@ static void nxt_perl_psgi_atexit(void);
|
|||||||
typedef SV *(*nxt_perl_psgi_callback_f)(PerlInterpreter *my_perl,
|
typedef SV *(*nxt_perl_psgi_callback_f)(PerlInterpreter *my_perl,
|
||||||
SV *env, nxt_task_t *task);
|
SV *env, nxt_task_t *task);
|
||||||
|
|
||||||
|
static CV *nxt_perl_psgi_cb;
|
||||||
static PerlInterpreter *nxt_perl_psgi;
|
static PerlInterpreter *nxt_perl_psgi;
|
||||||
static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_input, nxt_perl_psgi_arg_error;
|
static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_input;
|
||||||
|
static nxt_perl_psgi_io_arg_t nxt_perl_psgi_arg_error;
|
||||||
|
static nxt_unit_request_info_t *nxt_perl_psgi_request;
|
||||||
|
|
||||||
static uint32_t nxt_perl_psgi_compat[] = {
|
static uint32_t nxt_perl_psgi_compat[] = {
|
||||||
NXT_VERNUM, NXT_DEBUG,
|
NXT_VERNUM, NXT_DEBUG,
|
||||||
@@ -206,6 +211,115 @@ XS(XS_NGINX__Unit__PSGI_exit)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
XS(XS_NGINX__Unit__Sandbox_write);
|
||||||
|
XS(XS_NGINX__Unit__Sandbox_write)
|
||||||
|
{
|
||||||
|
int rc;
|
||||||
|
char *body;
|
||||||
|
size_t len;
|
||||||
|
|
||||||
|
dXSARGS;
|
||||||
|
|
||||||
|
if (nxt_slow_path(items != 2)) {
|
||||||
|
Perl_croak(aTHX_ "Wrong number of arguments. Need one string");
|
||||||
|
|
||||||
|
XSRETURN_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
body = SvPV(ST(1), len);
|
||||||
|
|
||||||
|
rc = nxt_unit_response_write(nxt_perl_psgi_request, body, len);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
Perl_croak(aTHX_ "Failed to write response body");
|
||||||
|
|
||||||
|
XSRETURN_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
XSRETURN_IV(len);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
nxt_inline void
|
||||||
|
nxt_perl_psgi_cb_request_done(nxt_int_t status)
|
||||||
|
{
|
||||||
|
nxt_unit_request_info_t *req;
|
||||||
|
|
||||||
|
req = nxt_perl_psgi_request;
|
||||||
|
|
||||||
|
if (req != NULL) {
|
||||||
|
nxt_unit_request_done(req, status);
|
||||||
|
nxt_perl_psgi_request = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
XS(XS_NGINX__Unit__Sandbox_close);
|
||||||
|
XS(XS_NGINX__Unit__Sandbox_close)
|
||||||
|
{
|
||||||
|
I32 ax;
|
||||||
|
|
||||||
|
ax = POPMARK;
|
||||||
|
|
||||||
|
nxt_perl_psgi_cb_request_done(NXT_UNIT_OK);
|
||||||
|
|
||||||
|
XSRETURN_NO;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
XS(XS_NGINX__Unit__Sandbox_cb);
|
||||||
|
XS(XS_NGINX__Unit__Sandbox_cb)
|
||||||
|
{
|
||||||
|
SV *obj;
|
||||||
|
int rc;
|
||||||
|
long array_len;
|
||||||
|
|
||||||
|
dXSARGS;
|
||||||
|
|
||||||
|
if (nxt_slow_path(items != 1)) {
|
||||||
|
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR);
|
||||||
|
|
||||||
|
Perl_croak(aTHX_ "Wrong number of arguments");
|
||||||
|
|
||||||
|
XSRETURN_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nxt_slow_path(SvOK(ST(0)) == 0 || SvROK(ST(0)) == 0
|
||||||
|
|| SvTYPE(SvRV(ST(0))) != SVt_PVAV))
|
||||||
|
{
|
||||||
|
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR);
|
||||||
|
|
||||||
|
Perl_croak(aTHX_ "PSGI: An unexpected response was received "
|
||||||
|
"from Perl Application");
|
||||||
|
|
||||||
|
XSRETURN_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
rc = nxt_perl_psgi_result_array(PERL_GET_CONTEXT, ST(0),
|
||||||
|
nxt_perl_psgi_request);
|
||||||
|
if (nxt_slow_path(rc != NXT_UNIT_OK)) {
|
||||||
|
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR);
|
||||||
|
|
||||||
|
Perl_croak(aTHX_ (char *) NULL);
|
||||||
|
|
||||||
|
XSRETURN_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
array_len = av_len((AV *) SvRV(ST(0)));
|
||||||
|
|
||||||
|
if (array_len < 2) {
|
||||||
|
obj = sv_bless(newRV_noinc((SV *) newHV()),
|
||||||
|
gv_stashpv("NGINX::Unit::Sandbox", GV_ADD));
|
||||||
|
ST(0) = obj;
|
||||||
|
|
||||||
|
XSRETURN(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
nxt_perl_psgi_cb_request_done(NXT_UNIT_OK);
|
||||||
|
|
||||||
|
XSRETURN_EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
nxt_perl_psgi_xs_init(pTHX)
|
nxt_perl_psgi_xs_init(pTHX)
|
||||||
{
|
{
|
||||||
@@ -218,6 +332,14 @@ nxt_perl_psgi_xs_init(pTHX)
|
|||||||
|
|
||||||
/* DynaLoader for Perl modules who use XS */
|
/* DynaLoader for Perl modules who use XS */
|
||||||
newXS("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__);
|
newXS("DynaLoader::boot_DynaLoader", boot_DynaLoader, __FILE__);
|
||||||
|
|
||||||
|
newXS("NGINX::Unit::Sandbox::write", XS_NGINX__Unit__Sandbox_write,
|
||||||
|
__FILE__);
|
||||||
|
newXS("NGINX::Unit::Sandbox::close", XS_NGINX__Unit__Sandbox_close,
|
||||||
|
__FILE__);
|
||||||
|
|
||||||
|
nxt_perl_psgi_cb = newXS("NGINX::Unit::Sandbox::cb",
|
||||||
|
XS_NGINX__Unit__Sandbox_cb, __FILE__);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -300,6 +422,9 @@ nxt_perl_psgi_module_create(nxt_task_t *task, const char *script)
|
|||||||
|
|
||||||
static nxt_str_t prefix = nxt_string(
|
static nxt_str_t prefix = nxt_string(
|
||||||
"package NGINX::Unit::Sandbox;"
|
"package NGINX::Unit::Sandbox;"
|
||||||
|
"sub new {"
|
||||||
|
" return bless {}, $_[0];"
|
||||||
|
"}"
|
||||||
"{my $app = do \""
|
"{my $app = do \""
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -546,7 +671,7 @@ nxt_perl_psgi_env_create(PerlInterpreter *my_perl,
|
|||||||
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.nonblocking"),
|
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.nonblocking"),
|
||||||
&PL_sv_no));
|
&PL_sv_no));
|
||||||
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.streaming"),
|
RC(nxt_perl_psgi_add_value(my_perl, hash_env, NL("psgi.streaming"),
|
||||||
&PL_sv_no));
|
&PL_sv_yes));
|
||||||
|
|
||||||
RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("QUERY_STRING"),
|
RC(nxt_perl_psgi_add_sptr(my_perl, hash_env, NL("QUERY_STRING"),
|
||||||
&r->query, r->query_length));
|
&r->query, r->query_length));
|
||||||
@@ -975,6 +1100,36 @@ nxt_perl_psgi_result_array(PerlInterpreter *my_perl, SV *result,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
nxt_perl_psgi_result_cb(PerlInterpreter *my_perl, SV *result,
|
||||||
|
nxt_unit_request_info_t *req)
|
||||||
|
{
|
||||||
|
dSP;
|
||||||
|
|
||||||
|
ENTER;
|
||||||
|
SAVETMPS;
|
||||||
|
|
||||||
|
PUSHMARK(sp);
|
||||||
|
XPUSHs(newRV_noinc((SV*) nxt_perl_psgi_cb));
|
||||||
|
PUTBACK;
|
||||||
|
|
||||||
|
call_sv(result, G_EVAL|G_SCALAR);
|
||||||
|
|
||||||
|
SPAGAIN;
|
||||||
|
|
||||||
|
if (SvTRUE(ERRSV)) {
|
||||||
|
nxt_unit_error(NULL, "PSGI: Failed to execute result callback: \n%s",
|
||||||
|
SvPV_nolen(ERRSV));
|
||||||
|
|
||||||
|
nxt_perl_psgi_cb_request_done(NXT_UNIT_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
PUTBACK;
|
||||||
|
FREETMPS;
|
||||||
|
LEAVE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static nxt_int_t
|
static nxt_int_t
|
||||||
nxt_perl_psgi_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
|
nxt_perl_psgi_init(nxt_task_t *task, nxt_common_app_conf_t *conf)
|
||||||
{
|
{
|
||||||
@@ -1031,6 +1186,8 @@ nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req)
|
|||||||
input.my_perl = my_perl;
|
input.my_perl = my_perl;
|
||||||
input.req = req;
|
input.req = req;
|
||||||
|
|
||||||
|
nxt_perl_psgi_request = req;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Create environ variable for perl sub "application".
|
* Create environ variable for perl sub "application".
|
||||||
* > sub application {
|
* > sub application {
|
||||||
@@ -1051,23 +1208,26 @@ nxt_perl_psgi_request_handler(nxt_unit_request_info_t *req)
|
|||||||
/* Call perl sub and get result as SV*. */
|
/* Call perl sub and get result as SV*. */
|
||||||
result = nxt_perl_psgi_call_var_application(my_perl, env, module->app, req);
|
result = nxt_perl_psgi_call_var_application(my_perl, env, module->app, req);
|
||||||
|
|
||||||
/*
|
if (nxt_fast_path(SvOK(result) != 0 && SvROK(result) != 0)) {
|
||||||
* We expect ARRAY ref like a
|
|
||||||
* ['200', ['Content-Type' => "text/plain"], ["body"]]
|
if (SvTYPE(SvRV(result)) == SVt_PVAV) {
|
||||||
*/
|
rc = nxt_perl_psgi_result_array(my_perl, result, req);
|
||||||
if (nxt_slow_path(SvOK(result) == 0 || SvROK(result) == 0
|
nxt_unit_request_done(req, rc);
|
||||||
|| SvTYPE(SvRV(result)) != SVt_PVAV))
|
goto release;
|
||||||
{
|
}
|
||||||
|
|
||||||
|
if (SvTYPE(SvRV(result)) == SVt_PVCV) {
|
||||||
|
nxt_perl_psgi_result_cb(my_perl, result, req);
|
||||||
|
goto release;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
nxt_unit_req_error(req, "PSGI: An unexpected response was received "
|
nxt_unit_req_error(req, "PSGI: An unexpected response was received "
|
||||||
"from Perl Application");
|
"from Perl Application");
|
||||||
|
|
||||||
rc = NXT_UNIT_ERROR;
|
nxt_unit_request_done(req, NXT_UNIT_ERROR);
|
||||||
|
|
||||||
} else {
|
release:
|
||||||
rc = nxt_perl_psgi_result_array(my_perl, result, req);
|
|
||||||
}
|
|
||||||
|
|
||||||
nxt_unit_request_done(req, rc);
|
|
||||||
|
|
||||||
SvREFCNT_dec(result);
|
SvREFCNT_dec(result);
|
||||||
SvREFCNT_dec(env);
|
SvREFCNT_dec(env);
|
||||||
|
|||||||
@@ -45,7 +45,7 @@ class TestUnitPerlApplication(unit.TestUnitApplicationPerl):
|
|||||||
'Psgi-Multiprocess': '1',
|
'Psgi-Multiprocess': '1',
|
||||||
'Psgi-Run-Once': '',
|
'Psgi-Run-Once': '',
|
||||||
'Psgi-Nonblocking': '',
|
'Psgi-Nonblocking': '',
|
||||||
'Psgi-Streaming': ''
|
'Psgi-Streaming': '1'
|
||||||
}, 'headers')
|
}, 'headers')
|
||||||
self.assertEqual(resp['body'], body, 'body')
|
self.assertEqual(resp['body'], body, 'body')
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user