Tests: Python targets.
This commit is contained in:
@@ -14,45 +14,88 @@ class TestASGILifespan(TestApplicationPython):
|
||||
}
|
||||
load_module = 'asgi'
|
||||
|
||||
def setup_cookies(self, prefix):
|
||||
base_dir = option.test_dir + '/python/lifespan/empty'
|
||||
|
||||
os.chmod(base_dir, 0o777)
|
||||
|
||||
for name in ['startup', 'shutdown', 'version']:
|
||||
path = option.test_dir + '/python/lifespan/empty/' + prefix + name
|
||||
open(path, 'a').close()
|
||||
os.chmod(path, 0o777)
|
||||
|
||||
def assert_cookies(self, prefix):
|
||||
for name in ['startup', 'shutdown']:
|
||||
path = option.test_dir + '/python/lifespan/empty/' + prefix + name
|
||||
exists = os.path.isfile(path)
|
||||
if exists:
|
||||
os.remove(path)
|
||||
|
||||
assert not exists, name
|
||||
|
||||
path = option.test_dir + '/python/lifespan/empty/' + prefix + 'version'
|
||||
|
||||
with open(path, 'r') as f:
|
||||
version = f.read()
|
||||
|
||||
os.remove(path)
|
||||
|
||||
assert version == '3.0 2.0', 'version'
|
||||
|
||||
def test_asgi_lifespan(self):
|
||||
self.load('lifespan/empty')
|
||||
|
||||
startup_path = option.test_dir + '/python/lifespan/empty/startup'
|
||||
shutdown_path = option.test_dir + '/python/lifespan/empty/shutdown'
|
||||
version_path = option.test_dir + '/python/lifespan/empty/version'
|
||||
|
||||
os.chmod(option.test_dir + '/python/lifespan/empty', 0o777)
|
||||
|
||||
open(startup_path, 'a').close()
|
||||
os.chmod(startup_path, 0o777)
|
||||
|
||||
open(shutdown_path, 'a').close()
|
||||
os.chmod(shutdown_path, 0o777)
|
||||
|
||||
open(version_path, 'a').close()
|
||||
os.chmod(version_path, 0o777)
|
||||
self.setup_cookies('')
|
||||
|
||||
assert self.get()['status'] == 204
|
||||
|
||||
unit_stop()
|
||||
|
||||
is_startup = os.path.isfile(startup_path)
|
||||
is_shutdown = os.path.isfile(shutdown_path)
|
||||
self.assert_cookies('')
|
||||
|
||||
if is_startup:
|
||||
os.remove(startup_path)
|
||||
def test_asgi_lifespan_targets(self):
|
||||
assert 'success' in self.conf(
|
||||
{
|
||||
"listeners": {"*:7080": {"pass": "routes"}},
|
||||
"routes": [
|
||||
{
|
||||
"match": {"uri": "/1"},
|
||||
"action": {"pass": "applications/targets/1"},
|
||||
},
|
||||
{
|
||||
"match": {"uri": "/2"},
|
||||
"action": {"pass": "applications/targets/2"},
|
||||
},
|
||||
],
|
||||
"applications": {
|
||||
"targets": {
|
||||
"type": "python",
|
||||
"processes": {"spare": 0},
|
||||
"working_directory": option.test_dir
|
||||
+ "/python/lifespan/empty",
|
||||
"path": option.test_dir + '/python/lifespan/empty',
|
||||
"targets": {
|
||||
"1": {"module": "asgi", "callable": "application"},
|
||||
"2": {
|
||||
"module": "asgi",
|
||||
"callable": "application2",
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
if is_shutdown:
|
||||
os.remove(shutdown_path)
|
||||
self.setup_cookies('')
|
||||
self.setup_cookies('app2_')
|
||||
|
||||
with open(version_path, 'r') as f:
|
||||
version = f.read()
|
||||
assert self.get(url="/1")['status'] == 204
|
||||
assert self.get(url="/2")['status'] == 204
|
||||
|
||||
os.remove(version_path)
|
||||
unit_stop()
|
||||
|
||||
assert not is_startup, 'startup'
|
||||
assert not is_shutdown, 'shutdown'
|
||||
assert version == '3.0 2.0', 'version'
|
||||
self.assert_cookies('')
|
||||
self.assert_cookies('app2_')
|
||||
|
||||
def test_asgi_lifespan_failed(self):
|
||||
self.load('lifespan/failed')
|
||||
|
||||
Reference in New Issue
Block a user