Python: supporting ASGI legacy protocol.
Introducing manual protocol selection for 'universal' apps and frameworks.
This commit is contained in:
@@ -51,6 +51,7 @@ typedef struct {
|
||||
nxt_str_t path;
|
||||
nxt_str_t module;
|
||||
char *callable;
|
||||
nxt_str_t protocol;
|
||||
uint32_t threads;
|
||||
uint32_t thread_stack_size;
|
||||
} nxt_python_app_conf_t;
|
||||
|
||||
@@ -95,6 +95,8 @@ static nxt_int_t nxt_conf_vldt_return(nxt_conf_validation_t *vldt,
|
||||
nxt_conf_value_t *value, void *data);
|
||||
static nxt_int_t nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt,
|
||||
nxt_conf_value_t *value, void *data);
|
||||
static nxt_int_t nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt,
|
||||
nxt_conf_value_t *value, void *data);
|
||||
static nxt_int_t nxt_conf_vldt_threads(nxt_conf_validation_t *vldt,
|
||||
nxt_conf_value_t *value, void *data);
|
||||
static nxt_int_t nxt_conf_vldt_thread_stack_size(nxt_conf_validation_t *vldt,
|
||||
@@ -493,6 +495,10 @@ static nxt_conf_vldt_object_t nxt_conf_vldt_python_members[] = {
|
||||
}, {
|
||||
.name = nxt_string("callable"),
|
||||
.type = NXT_CONF_VLDT_STRING,
|
||||
}, {
|
||||
.name = nxt_string("protocol"),
|
||||
.type = NXT_CONF_VLDT_STRING,
|
||||
.validator = nxt_conf_vldt_python_protocol,
|
||||
}, {
|
||||
.name = nxt_string("threads"),
|
||||
.type = NXT_CONF_VLDT_INTEGER,
|
||||
@@ -1360,6 +1366,26 @@ nxt_conf_vldt_proxy(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
|
||||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_conf_vldt_python_protocol(nxt_conf_validation_t *vldt,
|
||||
nxt_conf_value_t *value, void *data)
|
||||
{
|
||||
nxt_str_t proto;
|
||||
|
||||
static const nxt_str_t wsgi = nxt_string("wsgi");
|
||||
static const nxt_str_t asgi = nxt_string("asgi");
|
||||
|
||||
nxt_conf_get_string(value, &proto);
|
||||
|
||||
if (nxt_strstr_eq(&proto, &wsgi) || nxt_strstr_eq(&proto, &asgi)) {
|
||||
return NXT_OK;
|
||||
}
|
||||
|
||||
return nxt_conf_vldt_error(vldt, "The \"protocol\" can either be "
|
||||
"\"wsgi\" or \"asgi\".");
|
||||
}
|
||||
|
||||
|
||||
static nxt_int_t
|
||||
nxt_conf_vldt_threads(nxt_conf_validation_t *vldt, nxt_conf_value_t *value,
|
||||
void *data)
|
||||
|
||||
@@ -198,6 +198,12 @@ static nxt_conf_map_t nxt_python_app_conf[] = {
|
||||
offsetof(nxt_common_app_conf_t, u.python.callable),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("protocol"),
|
||||
NXT_CONF_MAP_STR,
|
||||
offsetof(nxt_common_app_conf_t, u.python.protocol),
|
||||
},
|
||||
|
||||
{
|
||||
nxt_string("threads"),
|
||||
NXT_CONF_MAP_INT32,
|
||||
|
||||
@@ -68,6 +68,7 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
char *nxt_py_module;
|
||||
size_t len;
|
||||
PyObject *obj, *pypath, *module;
|
||||
nxt_str_t proto;
|
||||
const char *callable;
|
||||
nxt_unit_ctx_t *unit_ctx;
|
||||
nxt_unit_init_t python_init;
|
||||
@@ -82,6 +83,9 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
static const char bin_python[] = "/bin/python";
|
||||
#endif
|
||||
|
||||
static const nxt_str_t wsgi = nxt_string("wsgi");
|
||||
static const nxt_str_t asgi = nxt_string("asgi");
|
||||
|
||||
app_conf = data->app;
|
||||
c = &app_conf->u.python;
|
||||
|
||||
@@ -244,7 +248,13 @@ nxt_python_start(nxt_task_t *task, nxt_process_data_t *data)
|
||||
python_init.shm_limit = data->app->shm_limit;
|
||||
python_init.callbacks.ready_handler = nxt_python_ready_handler;
|
||||
|
||||
if (nxt_python_asgi_check(nxt_py_application)) {
|
||||
proto = c->protocol;
|
||||
|
||||
if (proto.length == 0) {
|
||||
proto = nxt_python_asgi_check(nxt_py_application) ? asgi : wsgi;
|
||||
}
|
||||
|
||||
if (nxt_strstr_eq(&proto, &asgi)) {
|
||||
rc = nxt_python_asgi_init(&python_init, &nxt_py_proto);
|
||||
|
||||
} else {
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#include <python/nxt_python_asgi_str.h>
|
||||
|
||||
|
||||
static PyObject *nxt_python_asgi_get_func(PyObject *obj);
|
||||
static int nxt_python_asgi_ctx_data_alloc(void **pdata);
|
||||
static void nxt_python_asgi_ctx_data_free(void *data);
|
||||
static int nxt_python_asgi_startup(void *data);
|
||||
@@ -42,6 +43,7 @@ static PyObject *nxt_py_asgi_port_read(PyObject *self, PyObject *args);
|
||||
static void nxt_python_asgi_done(void);
|
||||
|
||||
|
||||
int nxt_py_asgi_legacy;
|
||||
static PyObject *nxt_py_port_read;
|
||||
static nxt_unit_port_t *nxt_py_shared_port;
|
||||
|
||||
@@ -64,56 +66,78 @@ int
|
||||
nxt_python_asgi_check(PyObject *obj)
|
||||
{
|
||||
int res;
|
||||
PyObject *call;
|
||||
PyObject *func;
|
||||
PyCodeObject *code;
|
||||
|
||||
if (PyFunction_Check(obj)) {
|
||||
code = (PyCodeObject *) PyFunction_GET_CODE(obj);
|
||||
func = nxt_python_asgi_get_func(obj);
|
||||
|
||||
return (code->co_flags & CO_COROUTINE) != 0;
|
||||
if (func == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
code = (PyCodeObject *) PyFunction_GET_CODE(func);
|
||||
|
||||
nxt_unit_debug(NULL, "asgi_check: callable is %sa coroutine function with "
|
||||
"%d argument(s)",
|
||||
(code->co_flags & CO_COROUTINE) != 0 ? "" : "not ",
|
||||
code->co_argcount);
|
||||
|
||||
res = (code->co_flags & CO_COROUTINE) != 0 || code->co_argcount == 1;
|
||||
|
||||
Py_DECREF(func);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
static PyObject *
|
||||
nxt_python_asgi_get_func(PyObject *obj)
|
||||
{
|
||||
PyObject *call;
|
||||
|
||||
if (PyFunction_Check(obj)) {
|
||||
Py_INCREF(obj);
|
||||
return obj;
|
||||
}
|
||||
|
||||
if (PyMethod_Check(obj)) {
|
||||
obj = PyMethod_GET_FUNCTION(obj);
|
||||
|
||||
code = (PyCodeObject *) PyFunction_GET_CODE(obj);
|
||||
|
||||
return (code->co_flags & CO_COROUTINE) != 0;
|
||||
Py_INCREF(obj);
|
||||
return obj;
|
||||
}
|
||||
|
||||
call = PyObject_GetAttrString(obj, "__call__");
|
||||
|
||||
if (call == NULL) {
|
||||
return 0;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (PyFunction_Check(call)) {
|
||||
code = (PyCodeObject *) PyFunction_GET_CODE(call);
|
||||
return call;
|
||||
}
|
||||
|
||||
res = (code->co_flags & CO_COROUTINE) != 0;
|
||||
|
||||
} else {
|
||||
if (PyMethod_Check(call)) {
|
||||
obj = PyMethod_GET_FUNCTION(call);
|
||||
|
||||
code = (PyCodeObject *) PyFunction_GET_CODE(obj);
|
||||
Py_INCREF(obj);
|
||||
Py_DECREF(call);
|
||||
|
||||
res = (code->co_flags & CO_COROUTINE) != 0;
|
||||
|
||||
} else {
|
||||
res = 0;
|
||||
}
|
||||
return obj;
|
||||
}
|
||||
|
||||
Py_DECREF(call);
|
||||
|
||||
return res;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
|
||||
{
|
||||
PyObject *func;
|
||||
PyCodeObject *code;
|
||||
|
||||
nxt_unit_debug(NULL, "asgi_init");
|
||||
|
||||
if (nxt_slow_path(nxt_py_asgi_str_init() != NXT_UNIT_OK)) {
|
||||
@@ -136,6 +160,22 @@ nxt_python_asgi_init(nxt_unit_init_t *init, nxt_python_proto_t *proto)
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
func = nxt_python_asgi_get_func(nxt_py_application);
|
||||
if (nxt_slow_path(func == NULL)) {
|
||||
nxt_unit_alert(NULL, "Python cannot find function for callable");
|
||||
return NXT_UNIT_ERROR;
|
||||
}
|
||||
|
||||
code = (PyCodeObject *) PyFunction_GET_CODE(func);
|
||||
|
||||
if ((code->co_flags & CO_COROUTINE) == 0) {
|
||||
nxt_unit_debug(NULL, "asgi: callable is not a coroutine function "
|
||||
"switching to legacy mode");
|
||||
nxt_py_asgi_legacy = 1;
|
||||
}
|
||||
|
||||
Py_DECREF(func);
|
||||
|
||||
init->callbacks.request_handler = nxt_py_asgi_request_handler;
|
||||
init->callbacks.data_handler = nxt_py_asgi_http_data_handler;
|
||||
init->callbacks.websocket_handler = nxt_py_asgi_websocket_handler;
|
||||
@@ -366,6 +406,7 @@ static void
|
||||
nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
|
||||
{
|
||||
PyObject *scope, *res, *task, *receive, *send, *done, *asgi;
|
||||
PyObject *stage2;
|
||||
nxt_py_asgi_ctx_data_t *ctx_data;
|
||||
|
||||
if (req->request->websocket_handshake) {
|
||||
@@ -415,8 +456,42 @@ nxt_py_asgi_request_handler(nxt_unit_request_info_t *req)
|
||||
|
||||
req->data = asgi;
|
||||
|
||||
if (!nxt_py_asgi_legacy) {
|
||||
nxt_unit_req_debug(req, "Python call ASGI 3.0 application");
|
||||
|
||||
res = PyObject_CallFunctionObjArgs(nxt_py_application,
|
||||
scope, receive, send, NULL);
|
||||
|
||||
} else {
|
||||
nxt_unit_req_debug(req, "Python call legacy application");
|
||||
|
||||
res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL);
|
||||
|
||||
if (nxt_slow_path(res == NULL)) {
|
||||
nxt_unit_req_error(req, "Python failed to call legacy app stage1");
|
||||
nxt_python_print_exception();
|
||||
nxt_unit_request_done(req, NXT_UNIT_ERROR);
|
||||
|
||||
goto release_scope;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(PyCallable_Check(res) == 0)) {
|
||||
nxt_unit_req_error(req,
|
||||
"Legacy ASGI application returns not a callable");
|
||||
nxt_unit_request_done(req, NXT_UNIT_ERROR);
|
||||
|
||||
Py_DECREF(res);
|
||||
|
||||
goto release_scope;
|
||||
}
|
||||
|
||||
stage2 = res;
|
||||
|
||||
res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
|
||||
|
||||
Py_DECREF(stage2);
|
||||
}
|
||||
|
||||
if (nxt_slow_path(res == NULL)) {
|
||||
nxt_unit_req_error(req, "Python failed to call the application");
|
||||
nxt_python_print_exception();
|
||||
|
||||
@@ -68,4 +68,6 @@ int nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data);
|
||||
int nxt_py_asgi_lifespan_shutdown(nxt_unit_ctx_t *ctx);
|
||||
|
||||
|
||||
extern int nxt_py_asgi_legacy;
|
||||
|
||||
#endif /* _NXT_PYTHON_ASGI_H_INCLUDED_ */
|
||||
|
||||
@@ -71,6 +71,7 @@ nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data)
|
||||
{
|
||||
int rc;
|
||||
PyObject *scope, *res, *py_task, *receive, *send, *done;
|
||||
PyObject *stage2;
|
||||
nxt_py_asgi_lifespan_t *lifespan;
|
||||
|
||||
if (nxt_slow_path(PyType_Ready(&nxt_py_asgi_lifespan_type) != 0)) {
|
||||
@@ -129,8 +130,43 @@ nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data)
|
||||
goto release_future;
|
||||
}
|
||||
|
||||
if (!nxt_py_asgi_legacy) {
|
||||
nxt_unit_req_debug(NULL, "Python call ASGI 3.0 application");
|
||||
|
||||
res = PyObject_CallFunctionObjArgs(nxt_py_application,
|
||||
scope, receive, send, NULL);
|
||||
|
||||
} else {
|
||||
nxt_unit_req_debug(NULL, "Python call legacy application");
|
||||
|
||||
res = PyObject_CallFunctionObjArgs(nxt_py_application, scope, NULL);
|
||||
if (nxt_slow_path(res == NULL)) {
|
||||
nxt_unit_log(NULL, NXT_UNIT_LOG_INFO,
|
||||
"ASGI Lifespan processing exception");
|
||||
nxt_python_print_exception();
|
||||
|
||||
lifespan->disabled = 1;
|
||||
rc = NXT_UNIT_OK;
|
||||
|
||||
goto release_scope;
|
||||
}
|
||||
|
||||
if (nxt_slow_path(PyCallable_Check(res) == 0)) {
|
||||
nxt_unit_req_error(NULL,
|
||||
"Legacy ASGI application returns not a callable");
|
||||
|
||||
Py_DECREF(res);
|
||||
|
||||
goto release_scope;
|
||||
}
|
||||
|
||||
stage2 = res;
|
||||
|
||||
res = PyObject_CallFunctionObjArgs(stage2, receive, send, NULL);
|
||||
|
||||
Py_DECREF(stage2);
|
||||
}
|
||||
|
||||
if (nxt_slow_path(res == NULL)) {
|
||||
nxt_unit_error(NULL, "Python failed to call the application");
|
||||
nxt_python_print_exception();
|
||||
@@ -143,7 +179,8 @@ nxt_py_asgi_lifespan_startup(nxt_py_asgi_ctx_data_t *ctx_data)
|
||||
goto release_scope;
|
||||
}
|
||||
|
||||
py_task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res, NULL);
|
||||
py_task = PyObject_CallFunctionObjArgs(ctx_data->loop_create_task, res,
|
||||
NULL);
|
||||
if (nxt_slow_path(py_task == NULL)) {
|
||||
nxt_unit_alert(NULL, "Python failed to call the create_task");
|
||||
nxt_python_print_exception();
|
||||
|
||||
13
test/python/legacy/asgi.py
Normal file
13
test/python/legacy/asgi.py
Normal file
@@ -0,0 +1,13 @@
|
||||
def application(scope):
|
||||
assert scope['type'] == 'http'
|
||||
|
||||
return app_http
|
||||
|
||||
async def app_http(receive, send):
|
||||
await send({
|
||||
'type': 'http.response.start',
|
||||
'status': 200,
|
||||
'headers': [
|
||||
(b'content-length', b'0'),
|
||||
]
|
||||
})
|
||||
17
test/python/legacy_force/asgi.py
Normal file
17
test/python/legacy_force/asgi.py
Normal file
@@ -0,0 +1,17 @@
|
||||
def application(scope, receive=None, send=None):
|
||||
assert scope['type'] == 'http'
|
||||
|
||||
if receive == None and send == None:
|
||||
return app_http
|
||||
|
||||
else:
|
||||
return app_http(receive, send)
|
||||
|
||||
async def app_http(receive, send):
|
||||
await send({
|
||||
'type': 'http.response.start',
|
||||
'status': 200,
|
||||
'headers': [
|
||||
(b'content-length', b'0'),
|
||||
]
|
||||
})
|
||||
@@ -418,3 +418,29 @@ Connection: close
|
||||
sock.close()
|
||||
|
||||
assert len(socks) == len(threads), 'threads differs'
|
||||
|
||||
def test_asgi_application_legacy(self):
|
||||
self.load('legacy')
|
||||
|
||||
resp = self.get(
|
||||
headers={
|
||||
'Host': 'localhost',
|
||||
'Content-Length': '0',
|
||||
'Connection': 'close',
|
||||
},
|
||||
)
|
||||
|
||||
assert resp['status'] == 200, 'status'
|
||||
|
||||
def test_asgi_application_legacy_force(self):
|
||||
self.load('legacy_force', protocol='asgi')
|
||||
|
||||
resp = self.get(
|
||||
headers={
|
||||
'Host': 'localhost',
|
||||
'Content-Length': '0',
|
||||
'Connection': 'close',
|
||||
},
|
||||
)
|
||||
|
||||
assert resp['status'] == 200, 'status'
|
||||
|
||||
@@ -42,7 +42,8 @@ class TestApplicationPython(TestApplicationProto):
|
||||
"module": module,
|
||||
}
|
||||
|
||||
for attr in ('callable', 'home', 'limits', 'path', 'threads'):
|
||||
for attr in ('callable', 'home', 'limits', 'path', 'protocol',
|
||||
'threads'):
|
||||
if attr in kwargs:
|
||||
app[attr] = kwargs.pop(attr)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user