Tests: prerequisites checking reworked.

Prerequisites check moved to the module level to simplify class structure.
Discovery and prerequisites checks functions moved to the separate files.
Introduced "require" fixture to provide per-test requirements check.
This commit is contained in:
Andrei Zeliankou
2023-06-12 14:16:59 +01:00
parent a3b9b49cfb
commit ce2405ec3d
79 changed files with 524 additions and 548 deletions

View File

@@ -13,14 +13,8 @@ import time
from multiprocessing import Process from multiprocessing import Process
import pytest import pytest
from unit.check.chroot import check_chroot from unit.check.discover_available import discover_available
from unit.check.go import check_go from unit.check.check_prerequisites import check_prerequisites
from unit.check.isolation import check_isolation
from unit.check.njs import check_njs
from unit.check.node import check_node
from unit.check.regex import check_regex
from unit.check.tls import check_openssl
from unit.check.unix_abstract import check_unix_abstract
from unit.http import TestHTTP from unit.http import TestHTTP
from unit.log import Log from unit.log import Log
from unit.log import print_log_on_assert from unit.log import print_log_on_assert
@@ -130,6 +124,9 @@ def pytest_generate_tests(metafunc):
type = cls.application_type type = cls.application_type
def generate_tests(versions): def generate_tests(versions):
if not versions:
pytest.skip('no available module versions')
metafunc.fixturenames.append('tmp_ct') metafunc.fixturenames.append('tmp_ct')
metafunc.parametrize('tmp_ct', versions) metafunc.parametrize('tmp_ct', versions)
@@ -140,75 +137,43 @@ def pytest_generate_tests(metafunc):
# take available module from option and generate tests for each version # take available module from option and generate tests for each version
for module, prereq_version in cls.prerequisites['modules'].items(): available_modules = option.available['modules']
if module in option.available['modules']:
available_versions = option.available['modules'][module]
if prereq_version == 'all': for module, version in metafunc.module.prerequisites['modules'].items():
if module in available_modules and available_modules[module]:
available_versions = available_modules[module]
if version == 'all':
generate_tests(available_versions) generate_tests(available_versions)
elif prereq_version == 'any': elif version == 'any':
option.generated_tests[ option.generated_tests[
metafunc.function.__name__ metafunc.function.__name__
] = f'{type} {available_versions[0]}' ] = f'{type} {available_versions[0]}'
elif callable(prereq_version): elif callable(version):
generate_tests(list(filter(prereq_version, available_versions))) generate_tests(list(filter(version, available_versions)))
else: else:
raise ValueError( raise ValueError(
f''' f'''
Unexpected prerequisite version "{prereq_version}" for module "{module}" in Unexpected prerequisite version "{version}" for module "{module}".
{cls}. 'all', 'any' or callable expected.''' 'all', 'any' or callable expected.'''
) )
def pytest_sessionstart(): def pytest_sessionstart():
option.available = {'modules': {}, 'features': {}}
unit = unit_run() unit = unit_run()
output_version = subprocess.check_output(
[unit['unitd'], '--version'], stderr=subprocess.STDOUT
).decode()
if not _wait_for_record(r'controller started'): discover_available(unit)
Log.print_log()
exit("Unit is writing log too long")
# discover available modules from unit.log _clear_conf()
for module in re.findall(
r'module: ([a-zA-Z]+) (.*) ".*"$', Log.read(), re.M
):
versions = option.available['modules'].setdefault(module[0], [])
if module[1] not in versions:
versions.append(module[1])
# discover modules from check
option.available['modules']['go'] = check_go()
option.available['modules']['njs'] = check_njs(output_version)
option.available['modules']['node'] = check_node(option.current_dir)
option.available['modules']['openssl'] = check_openssl(output_version)
option.available['modules']['regex'] = check_regex(output_version)
# remove None values
option.available['modules'] = {
k: v for k, v in option.available['modules'].items() if v is not None
}
check_chroot()
check_isolation()
check_unix_abstract()
_clear_conf(f'{unit["temp_dir"]}/control.unit.sock')
unit_stop() unit_stop()
Log.check_alerts() Log.check_alerts()
if option.restart: if option.restart:
shutil.rmtree(unit_instance['temp_dir']) shutil.rmtree(unit['temp_dir'])
else: else:
_clear_temp_dir() _clear_temp_dir()
@@ -225,38 +190,10 @@ def pytest_runtest_makereport(item):
setattr(item, f'rep_{rep.when}', rep) setattr(item, f'rep_{rep.when}', rep)
@pytest.fixture(scope='class', autouse=True) @pytest.fixture(scope='module', autouse=True)
def check_prerequisites(request): def check_prerequisites_module(request):
cls = request.cls if hasattr(request.module, 'prerequisites'):
missed = [] check_prerequisites(request.module.prerequisites)
# check modules
if 'modules' in cls.prerequisites:
available_modules = list(option.available['modules'].keys())
for module in cls.prerequisites['modules']:
if module in available_modules:
continue
missed.append(module)
if missed:
pytest.skip(f'Unit has no {", ".join(missed)} module(s)')
# check features
if 'features' in cls.prerequisites:
available_features = list(option.available['features'].keys())
for feature in cls.prerequisites['features']:
if feature in available_features:
continue
missed.append(feature)
if missed:
pytest.skip(f'{", ".join(missed)} feature(s) not supported')
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
@@ -283,7 +220,7 @@ def run(request):
# prepare log # prepare log
with Log.open(encoding='utf-8') as f: with Log.open() as f:
log = f.read() log = f.read()
Log.set_pos(f.tell()) Log.set_pos(f.tell())
@@ -294,7 +231,7 @@ def run(request):
# clean temp_dir before the next test # clean temp_dir before the next test
if not option.restart: if not option.restart:
_clear_conf(f'{unit["temp_dir"]}/control.unit.sock', log=log) _clear_conf(log=log)
_clear_temp_dir() _clear_temp_dir()
# check descriptors # check descriptors
@@ -384,7 +321,7 @@ def unit_run(state_dir=None):
unit_instance['pid'] = f.read().rstrip() unit_instance['pid'] = f.read().rstrip()
if state_dir is None: if state_dir is None:
_clear_conf(control_sock) _clear_conf()
_fds_info['main']['fds'] = _count_fds(unit_instance['pid']) _fds_info['main']['fds'] = _count_fds(unit_instance['pid'])
@@ -440,7 +377,9 @@ def unit_stop():
@print_log_on_assert @print_log_on_assert
def _clear_conf(sock, *, log=None): def _clear_conf(*, log=None):
sock = unit_instance['control_sock']
resp = http.put( resp = http.put(
url='/config', url='/config',
sock_type='unix', sock_type='unix',
@@ -456,7 +395,10 @@ def _clear_conf(sock, *, log=None):
def delete(url): def delete(url):
return http.delete(url=url, sock_type='unix', addr=sock)['body'] return http.delete(url=url, sock_type='unix', addr=sock)['body']
if 'openssl' in option.available['modules']: if (
'openssl' in option.available['modules']
and option.available['modules']['openssl']
):
try: try:
certs = json.loads(get('/certificates')).keys() certs = json.loads(get('/certificates')).keys()
@@ -466,7 +408,10 @@ def _clear_conf(sock, *, log=None):
for cert in certs: for cert in certs:
assert 'success' in delete(f'/certificates/{cert}'), 'delete cert' assert 'success' in delete(f'/certificates/{cert}'), 'delete cert'
if 'njs' in option.available['modules']: if (
'njs' in option.available['modules']
and option.available['modules']['njs']
):
try: try:
scripts = json.loads(get('/js_modules')).keys() scripts = json.loads(get('/js_modules')).keys()
@@ -621,19 +566,6 @@ def _count_fds(pid):
return 0 return 0
def _wait_for_record(pattern, name='unit.log', wait=150, flags=re.M):
with Log.open(name) as file:
for _ in range(wait):
found = re.search(pattern, file.read(), flags)
if found is not None:
break
time.sleep(0.1)
return found
def run_process(target, *args): def run_process(target, *args):
global _processes global _processes
@@ -696,8 +628,8 @@ def date_to_sec_epoch():
@pytest.fixture @pytest.fixture
def findall(): def findall():
def _findall(pattern, name='unit.log', flags=re.M): def _findall(*args, **kwargs):
return re.findall(pattern, Log.read(name), flags) return Log.findall(*args, **kwargs)
return _findall return _findall
@@ -712,6 +644,11 @@ def is_unsafe(request):
return request.config.getoption("--unsafe") return request.config.getoption("--unsafe")
@pytest.fixture
def require():
return check_prerequisites
@pytest.fixture @pytest.fixture
def search_in_file(): def search_in_file():
def _search_in_file(pattern, name='unit.log', flags=re.M): def _search_in_file(pattern, name='unit.log', flags=re.M):
@@ -760,4 +697,7 @@ def unit_pid():
@pytest.fixture @pytest.fixture
def wait_for_record(): def wait_for_record():
def _wait_for_record(*args, **kwargs):
return Log.wait_for_record(*args, **kwargs)
return _wait_for_record return _wait_for_record

View File

@@ -4,10 +4,10 @@ import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestAccessLog(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestAccessLog(TestApplicationPython):
def load(self, script): def load(self, script):
super().load(script) super().load(script)

View File

@@ -5,13 +5,12 @@ import pytest
from packaging import version from packaging import version
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
prerequisites = {
'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')}
}
class TestASGIApplication(TestApplicationPython): class TestASGIApplication(TestApplicationPython):
prerequisites = {
'modules': {
'python': lambda v: version.parse(v) >= version.parse('3.5')
}
}
load_module = 'asgi' load_module = 'asgi'
def test_asgi_application_variables(self, date_to_sec_epoch, sec_epoch): def test_asgi_application_variables(self, date_to_sec_epoch, sec_epoch):

View File

@@ -1,14 +1,13 @@
from packaging import version from packaging import version
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
prerequisites = {
'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')},
'features': {'unix_abstract': True},
}
class TestASGIApplicationUnixAbstract(TestApplicationPython): class TestASGIApplicationUnixAbstract(TestApplicationPython):
prerequisites = {
'modules': {
'python': lambda v: version.parse(v) >= version.parse('3.5')
},
'features': ['unix_abstract'],
}
load_module = 'asgi' load_module = 'asgi'
def test_asgi_application_unix_abstract(self): def test_asgi_application_unix_abstract(self):

View File

@@ -5,13 +5,12 @@ from packaging import version
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
prerequisites = {
'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')}
}
class TestASGILifespan(TestApplicationPython): class TestASGILifespan(TestApplicationPython):
prerequisites = {
'modules': {
'python': lambda v: version.parse(v) >= version.parse('3.5')
}
}
load_module = 'asgi' load_module = 'asgi'
def setup_cookies(self, prefix): def setup_cookies(self, prefix):

View File

@@ -3,13 +3,12 @@ from packaging import version
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
prerequisites = {
'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')}
}
class TestASGITargets(TestApplicationPython): class TestASGITargets(TestApplicationPython):
prerequisites = {
'modules': {
'python': lambda v: version.parse(v) >= version.parse('3.5')
}
}
load_module = 'asgi' load_module = 'asgi'
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@@ -6,13 +6,12 @@ from packaging import version
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.applications.websockets import TestApplicationWebsocket from unit.applications.websockets import TestApplicationWebsocket
prerequisites = {
'modules': {'python': lambda v: version.parse(v) >= version.parse('3.5')}
}
class TestASGIWebsockets(TestApplicationPython): class TestASGIWebsockets(TestApplicationPython):
prerequisites = {
'modules': {
'python': lambda v: version.parse(v) >= version.parse('3.5')
}
}
load_module = 'asgi' load_module = 'asgi'
ws = TestApplicationWebsocket() ws = TestApplicationWebsocket()

View File

@@ -2,10 +2,10 @@ import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestClientIP(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestClientIP(TestApplicationPython):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
self.load('client_ip') self.load('client_ip')

View File

@@ -3,10 +3,10 @@ import socket
import pytest import pytest
from unit.control import TestControl from unit.control import TestControl
class TestConfiguration(TestControl):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestConfiguration(TestControl):
def try_addr(self, addr): def try_addr(self, addr):
return self.conf( return self.conf(
{ {
@@ -420,10 +420,10 @@ class TestConfiguration(TestControl):
assert 'success' in self.conf(conf) assert 'success' in self.conf(conf)
def test_unprivileged_user_error(self, is_su, skip_alert): def test_unprivileged_user_error(self, require, skip_alert):
require({'privileged_user': False})
skip_alert(r'cannot set user "root"', r'failed to apply new conf') skip_alert(r'cannot set user "root"', r'failed to apply new conf')
if is_su:
pytest.skip('unprivileged tests')
assert 'error' in self.conf( assert 'error' in self.conf(
{ {

View File

@@ -1,10 +1,10 @@
import pytest import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
class TestForwardedHeader(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestForwardedHeader(TestApplicationPython):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
self.load('forwarded_header') self.load('forwarded_header')
@@ -190,9 +190,7 @@ class TestForwardedHeader(TestApplicationPython):
== '1.1.1.1' == '1.1.1.1'
), 'xff replace multi 3' ), 'xff replace multi 3'
assert ( assert (
self.get_addr( self.get_addr(xff='8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1')
xff='8.8.8.8, 2001:db8:3c4d:15::1a2f:1a2b, 127.0.0.1'
)
== '2001:db8:3c4d:15::1a2f:1a2b' == '2001:db8:3c4d:15::1a2f:1a2b'
), 'xff chain ipv6' ), 'xff chain ipv6'

View File

@@ -3,10 +3,10 @@ import re
import pytest import pytest
from unit.applications.lang.go import TestApplicationGo from unit.applications.lang.go import TestApplicationGo
class TestGoApplication(TestApplicationGo):
prerequisites = {'modules': {'go': 'all'}} prerequisites = {'modules': {'go': 'all'}}
class TestGoApplication(TestApplicationGo):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, skip_alert): def setup_method_fixture(self, skip_alert):
skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor') skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor')

View File

@@ -7,10 +7,10 @@ from unit.applications.lang.go import TestApplicationGo
from unit.option import option from unit.option import option
from unit.utils import getns from unit.utils import getns
prerequisites = {'modules': {'go': 'any'}, 'features': {'isolation': True}}
class TestGoIsolation(TestApplicationGo): class TestGoIsolation(TestApplicationGo):
prerequisites = {'modules': {'go': 'any'}, 'features': ['isolation']}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, skip_alert): def setup_method_fixture(self, skip_alert):
skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor') skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor')
@@ -27,9 +27,6 @@ class TestGoIsolation(TestApplicationGo):
return (nobody_uid, nogroup_gid, nogroup) return (nobody_uid, nogroup_gid, nogroup)
def isolation_key(self, key):
return key in option.available['features']['isolation'].keys()
def test_isolation_values(self): def test_isolation_values(self):
self.load('ns_inspect') self.load('ns_inspect')
@@ -39,12 +36,13 @@ class TestGoIsolation(TestApplicationGo):
if ns.upper() in obj['NS']: if ns.upper() in obj['NS']:
assert obj['NS'][ns.upper()] == ns_value, f'{ns} match' assert obj['NS'][ns.upper()] == ns_value, f'{ns} match'
def test_isolation_unpriv_user(self, is_su): def test_isolation_unpriv_user(self, require):
if not self.isolation_key('unprivileged_userns_clone'): require(
pytest.skip('unprivileged clone is not available') {
'privileged_user': False,
if is_su: 'features': {'isolation': ['unprivileged_userns_clone']},
pytest.skip('privileged tests, skip this') }
)
self.load('ns_inspect') self.load('ns_inspect')
obj = self.getjson()['body'] obj = self.getjson()['body']
@@ -101,9 +99,8 @@ class TestGoIsolation(TestApplicationGo):
assert obj['UID'] == 0, 'uid match uidmap' assert obj['UID'] == 0, 'uid match uidmap'
assert obj['GID'] == 0, 'gid match gidmap' assert obj['GID'] == 0, 'gid match gidmap'
def test_isolation_priv_user(self, is_su): def test_isolation_priv_user(self, require):
if not is_su: require({'privileged_user': True})
pytest.skip('unprivileged tests, skip this')
self.load('ns_inspect') self.load('ns_inspect')
@@ -176,12 +173,12 @@ class TestGoIsolation(TestApplicationGo):
assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody' assert obj['UID'] == nobody_uid, 'uid match uidmap user=nobody'
assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody' assert obj['GID'] == nogroup_gid, 'gid match uidmap user=nobody'
def test_isolation_mnt(self): def test_isolation_mnt(self, require):
if not self.isolation_key('mnt'): require(
pytest.skip('mnt namespace is not supported') {
'features': {'isolation': ['unprivileged_userns_clone', 'mnt']},
if not self.isolation_key('unprivileged_userns_clone'): }
pytest.skip('unprivileged clone is not available') )
self.load( self.load(
'ns_inspect', 'ns_inspect',
@@ -205,19 +202,21 @@ class TestGoIsolation(TestApplicationGo):
assert obj['NS']['MNT'] != getns('mnt'), 'mnt set' assert obj['NS']['MNT'] != getns('mnt'), 'mnt set'
assert obj['NS']['USER'] != getns('user'), 'user set' assert obj['NS']['USER'] != getns('user'), 'user set'
def test_isolation_pid(self, is_su): def test_isolation_pid(self, is_su, require):
if not self.isolation_key('pid'): require({'features': {'isolation': ['pid']}})
pytest.skip('pid namespace is not supported')
if not is_su: if not is_su:
if not self.isolation_key('unprivileged_userns_clone'): require(
pytest.skip('unprivileged clone is not available') {
'features': {
if not self.isolation_key('user'): 'isolation': [
pytest.skip('user namespace is not supported') 'unprivileged_userns_clone',
'user',
if not self.isolation_key('mnt'): 'mnt',
pytest.skip('mnt namespace is not supported') ]
}
}
)
isolation = {'namespaces': {'pid': True}} isolation = {'namespaces': {'pid': True}}
@@ -262,19 +261,20 @@ class TestGoIsolation(TestApplicationGo):
== option.available['features']['isolation'][ns] == option.available['features']['isolation'][ns]
), f'{ns} match' ), f'{ns} match'
def test_go_isolation_rootfs_container(self, is_su, temp_dir): def test_go_isolation_rootfs_container(self, is_su, require, temp_dir):
if not is_su: if not is_su:
if not self.isolation_key('unprivileged_userns_clone'): require(
pytest.skip('unprivileged clone is not available') {
'features': {
if not self.isolation_key('user'): 'isolation': [
pytest.skip('user namespace is not supported') 'unprivileged_userns_clone',
'user',
if not self.isolation_key('mnt'): 'mnt',
pytest.skip('mnt namespace is not supported') 'pid',
]
if not self.isolation_key('pid'): }
pytest.skip('pid namespace is not supported') }
)
isolation = {'rootfs': temp_dir} isolation = {'rootfs': temp_dir}
@@ -294,12 +294,8 @@ class TestGoIsolation(TestApplicationGo):
obj = self.getjson(url='/?file=/bin/sh')['body'] obj = self.getjson(url='/?file=/bin/sh')['body']
assert not obj['FileExists'], 'file should not exists' assert not obj['FileExists'], 'file should not exists'
def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir): def test_go_isolation_rootfs_container_priv(self, require, temp_dir):
if not is_su: require({'privileged_user': True, 'features': {'isolation': ['mnt']}})
pytest.skip('requires root')
if not self.isolation_key('mnt'):
pytest.skip('mnt namespace is not supported')
isolation = { isolation = {
'namespaces': {'mount': True}, 'namespaces': {'mount': True},
@@ -315,24 +311,27 @@ class TestGoIsolation(TestApplicationGo):
obj = self.getjson(url='/?file=/bin/sh')['body'] obj = self.getjson(url='/?file=/bin/sh')['body']
assert not obj['FileExists'], 'file should not exists' assert not obj['FileExists'], 'file should not exists'
def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir): def test_go_isolation_rootfs_automount_tmpfs(
self, is_su, require, temp_dir
):
try: try:
open("/proc/self/mountinfo") open("/proc/self/mountinfo")
except: except:
pytest.skip('The system lacks /proc/self/mountinfo file') pytest.skip('The system lacks /proc/self/mountinfo file')
if not is_su: if not is_su:
if not self.isolation_key('unprivileged_userns_clone'): require(
pytest.skip('unprivileged clone is not available') {
'features': {
if not self.isolation_key('user'): 'isolation': [
pytest.skip('user namespace is not supported') 'unprivileged_userns_clone',
'user',
if not self.isolation_key('mnt'): 'mnt',
pytest.skip('mnt namespace is not supported') 'pid',
]
if not self.isolation_key('pid'): }
pytest.skip('pid namespace is not supported') }
)
isolation = {'rootfs': temp_dir} isolation = {'rootfs': temp_dir}

View File

@@ -1,26 +1,20 @@
import os
import pytest import pytest
from unit.applications.lang.go import TestApplicationGo from unit.applications.lang.go import TestApplicationGo
prerequisites = {
'modules': {'go': 'all'},
'features': {'isolation': True},
'privileged_user': True,
}
class TestGoIsolationRootfs(TestApplicationGo): class TestGoIsolationRootfs(TestApplicationGo):
prerequisites = {'modules': {'go': 'all'}}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, skip_alert): def setup_method_fixture(self, skip_alert):
skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor') skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor')
def test_go_isolation_rootfs_chroot(self, is_su, temp_dir): def test_go_isolation_rootfs_chroot(self, temp_dir):
if not is_su: isolation = {'rootfs': temp_dir}
pytest.skip('requires root')
if os.uname().sysname == 'Darwin':
pytest.skip('chroot tests not supported on OSX')
isolation = {
'rootfs': temp_dir,
}
self.load('ns_inspect', isolation=isolation) self.load('ns_inspect', isolation=isolation)

View File

@@ -1,10 +1,10 @@
import pytest import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
class TestHTTPHeader(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestHTTPHeader(TestApplicationPython):
def test_http_header_value_leading_sp(self): def test_http_header_value_leading_sp(self):
self.load('custom_header') self.load('custom_header')

View File

@@ -7,10 +7,10 @@ from unit.applications.lang.java import TestApplicationJava
from unit.option import option from unit.option import option
from unit.utils import public_dir from unit.utils import public_dir
class TestJavaApplication(TestApplicationJava):
prerequisites = {'modules': {'java': 'all'}} prerequisites = {'modules': {'java': 'all'}}
class TestJavaApplication(TestApplicationJava):
def test_java_conf_error(self, temp_dir, skip_alert): def test_java_conf_error(self, temp_dir, skip_alert):
skip_alert( skip_alert(
r'realpath.*failed', r'realpath.*failed',

View File

@@ -5,15 +5,12 @@ import pytest
from unit.applications.lang.java import TestApplicationJava from unit.applications.lang.java import TestApplicationJava
from unit.option import option from unit.option import option
prerequisites = {'modules': {'java': 'all'}, 'privileged_user': True}
class TestJavaIsolationRootfs(TestApplicationJava): class TestJavaIsolationRootfs(TestApplicationJava):
prerequisites = {'modules': {'java': 'all'}}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, is_su, temp_dir): def setup_method_fixture(self, temp_dir):
if not is_su:
pytest.skip('require root')
os.makedirs(f'{temp_dir}/jars') os.makedirs(f'{temp_dir}/jars')
os.makedirs(f'{temp_dir}/tmp') os.makedirs(f'{temp_dir}/tmp')
os.chmod(f'{temp_dir}/tmp', 0o777) os.chmod(f'{temp_dir}/tmp', 0o777)
@@ -35,10 +32,7 @@ class TestJavaIsolationRootfs(TestApplicationJava):
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
pytest.fail("Can't run mount process.") pytest.fail("Can't run mount process.")
def teardown_method(self, is_su): def teardown_method(self):
if not is_su:
return
try: try:
subprocess.run( subprocess.run(
["umount", "--lazy", f"{option.temp_dir}/jars"], ["umount", "--lazy", f"{option.temp_dir}/jars"],
@@ -51,13 +45,8 @@ class TestJavaIsolationRootfs(TestApplicationJava):
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
pytest.fail("Can't run umount process.") pytest.fail("Can't run umount process.")
def test_java_isolation_rootfs_chroot_war(self, is_su, temp_dir): def test_java_isolation_rootfs_chroot_war(self, temp_dir):
if not is_su: isolation = {'rootfs': temp_dir}
pytest.skip('require root')
isolation = {
'rootfs': temp_dir,
}
self.load('empty_war', isolation=isolation) self.load('empty_war', isolation=isolation)

View File

@@ -5,10 +5,10 @@ import pytest
from unit.applications.lang.java import TestApplicationJava from unit.applications.lang.java import TestApplicationJava
from unit.applications.websockets import TestApplicationWebsocket from unit.applications.websockets import TestApplicationWebsocket
class TestJavaWebsockets(TestApplicationJava):
prerequisites = {'modules': {'java': 'any'}} prerequisites = {'modules': {'java': 'any'}}
class TestJavaWebsockets(TestApplicationJava):
ws = TestApplicationWebsocket() ws = TestApplicationWebsocket()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@@ -5,18 +5,16 @@ from unit.applications.proto import TestApplicationProto
from unit.option import option from unit.option import option
from unit.utils import waitforfiles from unit.utils import waitforfiles
class TestNJS(TestApplicationProto):
prerequisites = {'modules': {'njs': 'any'}} prerequisites = {'modules': {'njs': 'any'}}
class TestNJS(TestApplicationProto):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
assert 'success' in self.conf( assert 'success' in self.conf(
{ {
"listeners": {"*:7080": {"pass": "routes"}}, "listeners": {"*:7080": {"pass": "routes"}},
"routes": [ "routes": [{"action": {"share": f"{temp_dir}/assets$uri"}}],
{"action": {"share": f"{temp_dir}/assets$uri"}}
],
} }
) )

View File

@@ -1,10 +1,10 @@
from unit.applications.proto import TestApplicationProto from unit.applications.proto import TestApplicationProto
from unit.option import option from unit.option import option
class TestNJSModules(TestApplicationProto):
prerequisites = {'modules': {'njs': 'any'}} prerequisites = {'modules': {'njs': 'any'}}
class TestNJSModules(TestApplicationProto):
def njs_script_load(self, module, name=None, expect='success'): def njs_script_load(self, module, name=None, expect='success'):
if name is None: if name is None:
name = module name = module

View File

@@ -4,10 +4,10 @@ import pytest
from unit.applications.lang.node import TestApplicationNode from unit.applications.lang.node import TestApplicationNode
from unit.utils import waitforfiles from unit.utils import waitforfiles
class TestNodeApplication(TestApplicationNode):
prerequisites = {'modules': {'node': 'all'}} prerequisites = {'modules': {'node': 'all'}}
class TestNodeApplication(TestApplicationNode):
def assert_basic_application(self): def assert_basic_application(self):
resp = self.get() resp = self.get()
assert resp['headers']['Content-Type'] == 'text/plain', 'basic header' assert resp['headers']['Content-Type'] == 'text/plain', 'basic header'

View File

@@ -2,14 +2,12 @@ from packaging import version
from unit.applications.lang.node import TestApplicationNode from unit.applications.lang.node import TestApplicationNode
from unit.applications.websockets import TestApplicationWebsocket from unit.applications.websockets import TestApplicationWebsocket
prerequisites = {
'modules': {'node': lambda v: version.parse(v) >= version.parse('14.16.0')}
}
class TestNodeESModules(TestApplicationNode): class TestNodeESModules(TestApplicationNode):
prerequisites = {
'modules': {
'node': lambda v: version.parse(v) >= version.parse('14.16.0')
}
}
es_modules = True es_modules = True
ws = TestApplicationWebsocket() ws = TestApplicationWebsocket()

View File

@@ -5,10 +5,10 @@ import pytest
from unit.applications.lang.node import TestApplicationNode from unit.applications.lang.node import TestApplicationNode
from unit.applications.websockets import TestApplicationWebsocket from unit.applications.websockets import TestApplicationWebsocket
class TestNodeWebsockets(TestApplicationNode):
prerequisites = {'modules': {'node': 'any'}} prerequisites = {'modules': {'node': 'any'}}
class TestNodeWebsockets(TestApplicationNode):
ws = TestApplicationWebsocket() ws = TestApplicationWebsocket()
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@@ -3,10 +3,10 @@ import re
import pytest import pytest
from unit.applications.lang.perl import TestApplicationPerl from unit.applications.lang.perl import TestApplicationPerl
class TestPerlApplication(TestApplicationPerl):
prerequisites = {'modules': {'perl': 'all'}} prerequisites = {'modules': {'perl': 'all'}}
class TestPerlApplication(TestApplicationPerl):
def test_perl_application(self, date_to_sec_epoch, sec_epoch): def test_perl_application(self, date_to_sec_epoch, sec_epoch):
self.load('variables') self.load('variables')

View File

@@ -10,10 +10,10 @@ import pytest
from unit.applications.lang.php import TestApplicationPHP from unit.applications.lang.php import TestApplicationPHP
from unit.option import option from unit.option import option
class TestPHPApplication(TestApplicationPHP):
prerequisites = {'modules': {'php': 'all'}} prerequisites = {'modules': {'php': 'all'}}
class TestPHPApplication(TestApplicationPHP):
def before_disable_functions(self): def before_disable_functions(self):
body = self.get()['body'] body = self.get()['body']

View File

@@ -1,9 +1,9 @@
from unit.control import TestControl from unit.control import TestControl
prerequisites = {'modules': {'php': 'any'}}
class TestPHPBasic(TestControl): class TestPHPBasic(TestControl):
prerequisites = {'modules': {'php': 'any'}}
conf_app = { conf_app = {
"app": { "app": {
"type": "php", "type": "php",

View File

@@ -1,30 +1,26 @@
import pytest
from unit.applications.lang.php import TestApplicationPHP from unit.applications.lang.php import TestApplicationPHP
from unit.option import option
prerequisites = {'modules': {'php': 'any'}, 'features': {'isolation': True}}
class TestPHPIsolation(TestApplicationPHP): class TestPHPIsolation(TestApplicationPHP):
prerequisites = {'modules': {'php': 'any'}, 'features': ['isolation']} def test_php_isolation_rootfs(self, is_su, require, temp_dir):
def test_php_isolation_rootfs(self, is_su, temp_dir):
isolation_features = option.available['features']['isolation'].keys()
if not is_su:
if not 'unprivileged_userns_clone' in isolation_features:
pytest.skip('requires unprivileged userns or root')
if 'user' not in isolation_features:
pytest.skip('user namespace is not supported')
if 'mnt' not in isolation_features:
pytest.skip('mnt namespace is not supported')
if 'pid' not in isolation_features:
pytest.skip('pid namespace is not supported')
isolation = {'rootfs': temp_dir} isolation = {'rootfs': temp_dir}
if not is_su: if not is_su:
require(
{
'features': {
'isolation': [
'unprivileged_userns_clone',
'user',
'mnt',
'pid',
]
}
}
)
isolation['namespaces'] = { isolation['namespaces'] = {
'mount': True, 'mount': True,
'credential': True, 'credential': True,
@@ -42,25 +38,23 @@ class TestPHPIsolation(TestApplicationPHP):
assert self.get()['status'] == 200, 'empty rootfs' assert self.get()['status'] == 200, 'empty rootfs'
def test_php_isolation_rootfs_extensions(self, is_su, temp_dir): def test_php_isolation_rootfs_extensions(self, is_su, require, temp_dir):
isolation_features = option.available['features']['isolation'].keys()
if not is_su:
if not 'unprivileged_userns_clone' in isolation_features:
pytest.skip('requires unprivileged userns or root')
if 'user' not in isolation_features:
pytest.skip('user namespace is not supported')
if 'mnt' not in isolation_features:
pytest.skip('mnt namespace is not supported')
if 'pid' not in isolation_features:
pytest.skip('pid namespace is not supported')
isolation = {'rootfs': temp_dir} isolation = {'rootfs': temp_dir}
if not is_su: if not is_su:
require(
{
'features': {
'isolation': [
'unprivileged_userns_clone',
'user',
'mnt',
'pid',
]
}
}
)
isolation['namespaces'] = { isolation['namespaces'] = {
'mount': True, 'mount': True,
'credential': True, 'credential': True,

View File

@@ -1,10 +1,10 @@
from unit.applications.lang.php import TestApplicationPHP from unit.applications.lang.php import TestApplicationPHP
from unit.option import option from unit.option import option
class TestPHPTargets(TestApplicationPHP):
prerequisites = {'modules': {'php': 'any'}} prerequisites = {'modules': {'php': 'any'}}
class TestPHPTargets(TestApplicationPHP):
def test_php_application_targets(self): def test_php_application_targets(self):
targets_dir = f"{option.test_dir}/php/targets" targets_dir = f"{option.test_dir}/php/targets"
assert 'success' in self.conf( assert 'success' in self.conf(

View File

@@ -8,10 +8,10 @@ from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
from unit.utils import waitforsocket from unit.utils import waitforsocket
class TestProxy(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestProxy(TestApplicationPython):
SERVER_PORT = 7999 SERVER_PORT = 7999
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@@ -8,10 +8,10 @@ from conftest import run_process
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.utils import waitforsocket from unit.utils import waitforsocket
class TestProxyChunked(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestProxyChunked(TestApplicationPython):
SERVER_PORT = 7999 SERVER_PORT = 7999
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)

View File

@@ -10,10 +10,10 @@ import pytest
from packaging import version from packaging import version
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
class TestPythonApplication(TestApplicationPython):
prerequisites = {'modules': {'python': 'all'}} prerequisites = {'modules': {'python': 'all'}}
class TestPythonApplication(TestApplicationPython):
def test_python_application_variables(self, date_to_sec_epoch, sec_epoch): def test_python_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables') self.load('variables')
@@ -740,9 +740,8 @@ last line: 987654321
), 'exception raise close' ), 'exception raise close'
assert len(findall(r'Traceback')) == 8, 'traceback count 8' assert len(findall(r'Traceback')) == 8, 'traceback count 8'
def test_python_user_group(self, is_su): def test_python_user_group(self, require):
if not is_su: require({'privileged_user': True})
pytest.skip('requires root')
nobody_uid = pwd.getpwnam('nobody').pw_uid nobody_uid = pwd.getpwnam('nobody').pw_uid

View File

@@ -1,8 +1,9 @@
from unit.control import TestControl from unit.control import TestControl
prerequisites = {'modules': {'python': 'any'}}
class TestPythonBasic(TestControl): class TestPythonBasic(TestControl):
prerequisites = {'modules': {'python': 'any'}}
conf_app = { conf_app = {
"app": { "app": {

View File

@@ -1,9 +1,9 @@
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
prerequisites = {'modules': {'python': 'any'}}
class TestPythonEnvironment(TestApplicationPython): class TestPythonEnvironment(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}}
def test_python_environment_name_null(self): def test_python_environment_name_null(self):
self.load('environment') self.load('environment')

View File

@@ -10,10 +10,10 @@ from unit.utils import findmnt
from unit.utils import waitformount from unit.utils import waitformount
from unit.utils import waitforunmount from unit.utils import waitforunmount
prerequisites = {'modules': {'python': 'any'}, 'features': {'isolation': True}}
class TestPythonIsolation(TestApplicationPython): class TestPythonIsolation(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}, 'features': ['isolation']}
def get_cgroup(self, app_name): def get_cgroup(self, app_name):
output = subprocess.check_output( output = subprocess.check_output(
['ps', 'ax', '-o', 'pid', '-o', 'cmd'] ['ps', 'ax', '-o', 'pid', '-o', 'cmd']
@@ -31,25 +31,23 @@ class TestPythonIsolation(TestApplicationPython):
with open(cgroup, 'r') as f: with open(cgroup, 'r') as f:
return f.read().rstrip() return f.read().rstrip()
def test_python_isolation_rootfs(self, is_su, temp_dir): def test_python_isolation_rootfs(self, is_su, require, temp_dir):
isolation_features = option.available['features']['isolation'].keys()
if not is_su:
if not 'unprivileged_userns_clone' in isolation_features:
pytest.skip('requires unprivileged userns or root')
if 'user' not in isolation_features:
pytest.skip('user namespace is not supported')
if 'mnt' not in isolation_features:
pytest.skip('mnt namespace is not supported')
if 'pid' not in isolation_features:
pytest.skip('pid namespace is not supported')
isolation = {'rootfs': temp_dir} isolation = {'rootfs': temp_dir}
if not is_su: if not is_su:
require(
{
'features': {
'isolation': [
'unprivileged_userns_clone',
'user',
'mnt',
'pid',
]
}
}
)
isolation['namespaces'] = { isolation['namespaces'] = {
'mount': True, 'mount': True,
'credential': True, 'credential': True,
@@ -78,9 +76,8 @@ class TestPythonIsolation(TestApplicationPython):
assert ret['body']['FileExists'], 'application exists in rootfs' assert ret['body']['FileExists'], 'application exists in rootfs'
def test_python_isolation_rootfs_no_language_deps(self, is_su, temp_dir): def test_python_isolation_rootfs_no_language_deps(self, require, temp_dir):
if not is_su: require({'privileged_user': True})
pytest.skip('requires root')
isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}} isolation = {'rootfs': temp_dir, 'automount': {'language_deps': False}}
self.load('empty', isolation=isolation) self.load('empty', isolation=isolation)
@@ -103,9 +100,8 @@ class TestPythonIsolation(TestApplicationPython):
assert waitforunmount(python_path), 'language_deps unmount' assert waitforunmount(python_path), 'language_deps unmount'
def test_python_isolation_procfs(self, is_su, temp_dir): def test_python_isolation_procfs(self, require, temp_dir):
if not is_su: require({'privileged_user': True})
pytest.skip('requires root')
isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}} isolation = {'rootfs': temp_dir, 'automount': {'procfs': False}}
@@ -123,12 +119,10 @@ class TestPythonIsolation(TestApplicationPython):
'FileExists' 'FileExists'
], '/proc/self' ], '/proc/self'
def test_python_isolation_cgroup(self, is_su): def test_python_isolation_cgroup(self, require):
if not is_su: require(
pytest.skip('requires root') {'privileged_user': True, 'features': {'isolation': ['cgroup']}}
)
if not 'cgroup' in option.available['features']['isolation']:
pytest.skip('cgroup is not supported')
def set_cgroup_path(path): def set_cgroup_path(path):
isolation = {'cgroup': {'path': path}} isolation = {'cgroup': {'path': path}}
@@ -146,12 +140,10 @@ class TestPythonIsolation(TestApplicationPython):
assert len(cgroup_rel.parts) >= len(cgroup_abs.parts) assert len(cgroup_rel.parts) >= len(cgroup_abs.parts)
def test_python_isolation_cgroup_two(self, is_su): def test_python_isolation_cgroup_two(self, require):
if not is_su: require(
pytest.skip('requires root') {'privileged_user': True, 'features': {'isolation': ['cgroup']}}
)
if not 'cgroup' in option.available['features']['isolation']:
pytest.skip('cgroup is not supported')
def set_two_cgroup_path(path, path2): def set_two_cgroup_path(path, path2):
script_path = f'{option.test_dir}/python/empty' script_path = f'{option.test_dir}/python/empty'
@@ -193,12 +185,10 @@ class TestPythonIsolation(TestApplicationPython):
set_two_cgroup_path('/scope/python', '/scope2/python') set_two_cgroup_path('/scope/python', '/scope2/python')
assert self.get_cgroup('one') != self.get_cgroup('two') assert self.get_cgroup('one') != self.get_cgroup('two')
def test_python_isolation_cgroup_invalid(self, is_su): def test_python_isolation_cgroup_invalid(self, require):
if not is_su: require(
pytest.skip('requires root') {'privileged_user': True, 'features': {'isolation': ['cgroup']}}
)
if not 'cgroup' in option.available['features']['isolation']:
pytest.skip('cgroup is not supported')
def check_invalid(path): def check_invalid(path):
script_path = f'{option.test_dir}/python/empty' script_path = f'{option.test_dir}/python/empty'

View File

@@ -1,17 +1,11 @@
import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
prerequisites = {'modules': {'python': 'any'}, 'privileged_user': True}
class TestPythonIsolation(TestApplicationPython): class TestPythonIsolation(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} def test_python_isolation_chroot(self, temp_dir):
isolation = {'rootfs': temp_dir}
def test_python_isolation_chroot(self, is_su, temp_dir):
if not is_su:
pytest.skip('requires root')
isolation = {
'rootfs': temp_dir,
}
self.load('ns_inspect', isolation=isolation) self.load('ns_inspect', isolation=isolation)

View File

@@ -7,10 +7,10 @@ import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestPythonProcman(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestPythonProcman(TestApplicationPython):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
self.app_name = f'app-{temp_dir.split("/")[-1]}' self.app_name = f'app-{temp_dir.split("/")[-1]}'

View File

@@ -1,10 +1,10 @@
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestPythonTargets(TestApplicationPython):
prerequisites = {'modules': {'python': 'all'}} prerequisites = {'modules': {'python': 'all'}}
class TestPythonTargets(TestApplicationPython):
def test_python_targets(self): def test_python_targets(self):
python_dir = f'{option.test_dir}/python' python_dir = f'{option.test_dir}/python'

View File

@@ -5,8 +5,6 @@ from unit.applications.proto import TestApplicationProto
class TestReconfigure(TestApplicationProto): class TestReconfigure(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
assert 'success' in self.conf( assert 'success' in self.conf(

View File

@@ -5,10 +5,10 @@ import time
import pytest import pytest
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
class TestReconfigureTLS(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}} prerequisites = {'modules': {'openssl': 'any'}}
class TestReconfigureTLS(TestApplicationTLS):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
if 'HAS_TLSv1_2' not in dir(ssl) or not ssl.HAS_TLSv1_2: if 'HAS_TLSv1_2' not in dir(ssl) or not ssl.HAS_TLSv1_2:

View File

@@ -5,10 +5,10 @@ import time
import pytest import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
class TestRespawn(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestRespawn(TestApplicationPython):
PATTERN_ROUTER = 'unit: router' PATTERN_ROUTER = 'unit: router'
PATTERN_CONTROLLER = 'unit: controller' PATTERN_CONTROLLER = 'unit: controller'

View File

@@ -5,8 +5,6 @@ from unit.applications.proto import TestApplicationProto
class TestReturn(TestApplicationProto): class TestReturn(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
self._load_conf( self._load_conf(

View File

@@ -2,12 +2,9 @@ import os
import pytest import pytest
from unit.applications.proto import TestApplicationProto from unit.applications.proto import TestApplicationProto
from unit.option import option
class TestRewrite(TestApplicationProto): class TestRewrite(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
assert 'success' in self.conf( assert 'success' in self.conf(
@@ -97,9 +94,8 @@ class TestRewrite(TestApplicationProto):
) )
assert self.get(url='/foo?arg=val')['status'] == 200 assert self.get(url='/foo?arg=val')['status'] == 200
def test_rewrite_njs(self): def test_rewrite_njs(self, require):
if 'njs' not in option.available['modules'].keys(): require({'modules': {'njs': 'any'}})
pytest.skip('NJS is not available')
self.set_rewrite("`/${host}`", "/localhost") self.set_rewrite("`/${host}`", "/localhost")
assert self.get()['status'] == 200 assert self.get()['status'] == 200

View File

@@ -3,10 +3,10 @@ import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestRouting(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestRouting(TestApplicationPython):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
assert 'success' in self.conf( assert 'success' in self.conf(
@@ -232,9 +232,8 @@ class TestRouting(TestApplicationPython):
assert self.get(url='/aBCaBbc')['status'] == 200 assert self.get(url='/aBCaBbc')['status'] == 200
assert self.get(url='/ABc')['status'] == 404 assert self.get(url='/ABc')['status'] == 404
def test_routes_empty_regex(self): def test_routes_empty_regex(self, require):
if not option.available['modules']['regex']: require({'modules': {'regex': True}})
pytest.skip('requires regex')
self.route_match({"uri": "~"}) self.route_match({"uri": "~"})
assert self.get(url='/')['status'] == 200, 'empty regexp' assert self.get(url='/')['status'] == 200, 'empty regexp'
@@ -244,9 +243,8 @@ class TestRouting(TestApplicationPython):
assert self.get(url='/')['status'] == 404, 'empty regexp 2' assert self.get(url='/')['status'] == 404, 'empty regexp 2'
assert self.get(url='/nothing')['status'] == 404, '/nothing' assert self.get(url='/nothing')['status'] == 404, '/nothing'
def test_routes_bad_regex(self): def test_routes_bad_regex(self, require):
if not option.available['modules']['regex']: require({'modules': {'regex': True}})
pytest.skip('requires regex')
assert 'error' in self.route( assert 'error' in self.route(
{"match": {"uri": "~/bl[ah"}, "action": {"return": 200}} {"match": {"uri": "~/bl[ah"}, "action": {"return": 200}}
@@ -264,9 +262,8 @@ class TestRouting(TestApplicationPython):
if 'error' not in status: if 'error' not in status:
assert self.get(url='/nothing_z')['status'] == 500, '/nothing_z' assert self.get(url='/nothing_z')['status'] == 500, '/nothing_z'
def test_routes_match_regex_case_sensitive(self): def test_routes_match_regex_case_sensitive(self, require):
if not option.available['modules']['regex']: require({'modules': {'regex': True}})
pytest.skip('requires regex')
self.route_match({"uri": "~/bl[ah]"}) self.route_match({"uri": "~/bl[ah]"})
@@ -275,9 +272,8 @@ class TestRouting(TestApplicationPython):
assert self.get(url='/blh')['status'] == 200, '/blh' assert self.get(url='/blh')['status'] == 200, '/blh'
assert self.get(url='/BLAH')['status'] == 404, '/BLAH' assert self.get(url='/BLAH')['status'] == 404, '/BLAH'
def test_routes_match_regex_negative_case_sensitive(self): def test_routes_match_regex_negative_case_sensitive(self, require):
if not option.available['modules']['regex']: require({'modules': {'regex': True}})
pytest.skip('requires regex')
self.route_match({"uri": "!~/bl[ah]"}) self.route_match({"uri": "!~/bl[ah]"})

View File

@@ -1,9 +1,9 @@
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
prerequisites = {'modules': {'openssl': 'any'}}
class TestRoutingTLS(TestApplicationTLS): class TestRoutingTLS(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}}
def test_routes_match_scheme_tls(self): def test_routes_match_scheme_tls(self):
self.certificate() self.certificate()

View File

@@ -4,10 +4,10 @@ import subprocess
import pytest import pytest
from unit.applications.lang.ruby import TestApplicationRuby from unit.applications.lang.ruby import TestApplicationRuby
class TestRubyApplication(TestApplicationRuby):
prerequisites = {'modules': {'ruby': 'all'}} prerequisites = {'modules': {'ruby': 'all'}}
class TestRubyApplication(TestApplicationRuby):
def test_ruby_application(self, date_to_sec_epoch, sec_epoch): def test_ruby_application(self, date_to_sec_epoch, sec_epoch):
self.load('variables') self.load('variables')

View File

@@ -2,10 +2,10 @@ from unit.applications.lang.ruby import TestApplicationRuby
from unit.option import option from unit.option import option
from unit.utils import waitforglob from unit.utils import waitforglob
class TestRubyHooks(TestApplicationRuby):
prerequisites = {'modules': {'ruby': 'all'}} prerequisites = {'modules': {'ruby': 'all'}}
class TestRubyHooks(TestApplicationRuby):
def _wait_cookie(self, pattern, count): def _wait_cookie(self, pattern, count):
return waitforglob( return waitforglob(
f'{option.temp_dir}/ruby/hooks/cookie_{pattern}', count f'{option.temp_dir}/ruby/hooks/cookie_{pattern}', count

View File

@@ -1,30 +1,26 @@
import pytest
from unit.applications.lang.ruby import TestApplicationRuby from unit.applications.lang.ruby import TestApplicationRuby
from unit.option import option
prerequisites = {'modules': {'ruby': 'any'}, 'features': {'isolation': True}}
class TestRubyIsolation(TestApplicationRuby): class TestRubyIsolation(TestApplicationRuby):
prerequisites = {'modules': {'ruby': 'any'}, 'features': ['isolation']} def test_ruby_isolation_rootfs(self, is_su, require, temp_dir):
def test_ruby_isolation_rootfs(self, is_su, temp_dir):
isolation_features = option.available['features']['isolation'].keys()
if not is_su:
if not 'unprivileged_userns_clone' in isolation_features:
pytest.skip('requires unprivileged userns or root')
if 'user' not in isolation_features:
pytest.skip('user namespace is not supported')
if 'mnt' not in isolation_features:
pytest.skip('mnt namespace is not supported')
if 'pid' not in isolation_features:
pytest.skip('pid namespace is not supported')
isolation = {'rootfs': temp_dir} isolation = {'rootfs': temp_dir}
if not is_su: if not is_su:
require(
{
'features': {
'isolation': [
'unprivileged_userns_clone',
'user',
'mnt',
'pid',
]
}
}
)
isolation['namespaces'] = { isolation['namespaces'] = {
'mount': True, 'mount': True,
'credential': True, 'credential': True,

View File

@@ -6,10 +6,10 @@ import time
import pytest import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
class TestSettings(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestSettings(TestApplicationPython):
def sysctl(self): def sysctl(self):
try: try:
out = subprocess.check_output( out = subprocess.check_output(

View File

@@ -7,8 +7,6 @@ from unit.utils import waitforfiles
class TestStatic(TestApplicationProto): class TestStatic(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
os.makedirs(f'{temp_dir}/assets/dir') os.makedirs(f'{temp_dir}/assets/dir')

View File

@@ -5,10 +5,10 @@ import pytest
from unit.applications.proto import TestApplicationProto from unit.applications.proto import TestApplicationProto
from unit.option import option from unit.option import option
prerequisites = {'features': {'chroot': True}}
class TestStaticChroot(TestApplicationProto): class TestStaticChroot(TestApplicationProto):
prerequisites = {'features': ['chroot']}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
os.makedirs(f'{temp_dir}/assets/dir') os.makedirs(f'{temp_dir}/assets/dir')
@@ -62,9 +62,8 @@ class TestStaticChroot(TestApplicationProto):
) )
assert self.get()['status'] != 200, 'share array bad' assert self.get()['status'] != 200, 'share array bad'
def test_static_chroot_permission(self, is_su, temp_dir): def test_static_chroot_permission(self, require, temp_dir):
if is_su: require({'privileged_user': False})
pytest.skip("does't work under root")
os.chmod(f'{temp_dir}/assets/dir', 0o100) os.chmod(f'{temp_dir}/assets/dir', 0o100)
@@ -81,9 +80,8 @@ class TestStaticChroot(TestApplicationProto):
assert 'success' in self.update_action("", ".$uri") assert 'success' in self.update_action("", ".$uri")
assert self.get(url=self.test_path)['status'] == 200, 'empty relative' assert self.get(url=self.test_path)['status'] == 200, 'empty relative'
def test_static_chroot_relative(self, is_su): def test_static_chroot_relative(self, require):
if is_su: require({'privileged_user': False})
pytest.skip("Does't work under root.")
assert 'success' in self.update_action('.') assert 'success' in self.update_action('.')
assert self.get(url='/dir/file')['status'] == 403, 'relative chroot' assert self.get(url='/dir/file')['status'] == 403, 'relative chroot'

View File

@@ -6,8 +6,6 @@ from unit.applications.proto import TestApplicationProto
class TestStaticFallback(TestApplicationProto): class TestStaticFallback(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
assets_dir = f'{temp_dir}/assets' assets_dir = f'{temp_dir}/assets'

View File

@@ -5,15 +5,12 @@ from pathlib import Path
import pytest import pytest
from unit.applications.proto import TestApplicationProto from unit.applications.proto import TestApplicationProto
prerequisites = {'features': {'chroot': True}, 'privileged_user': True}
class TestStaticMount(TestApplicationProto): class TestStaticMount(TestApplicationProto):
prerequisites = {'features': ['chroot']}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, is_su, temp_dir): def setup_method_fixture(self, temp_dir):
if not is_su:
pytest.skip('requires root')
os.makedirs(f'{temp_dir}/assets/dir/mount') os.makedirs(f'{temp_dir}/assets/dir/mount')
os.makedirs(f'{temp_dir}/assets/dir/dir') os.makedirs(f'{temp_dir}/assets/dir/dir')
os.makedirs(f'{temp_dir}/assets/mount') os.makedirs(f'{temp_dir}/assets/mount')

View File

@@ -6,8 +6,6 @@ from unit.applications.proto import TestApplicationProto
class TestStaticShare(TestApplicationProto): class TestStaticShare(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
os.makedirs(f'{temp_dir}/assets/dir') os.makedirs(f'{temp_dir}/assets/dir')

View File

@@ -4,10 +4,10 @@ from pathlib import Path
import pytest import pytest
from unit.applications.proto import TestApplicationProto from unit.applications.proto import TestApplicationProto
prerequisites = {'features': {'chroot': True}}
class TestStaticSymlink(TestApplicationProto): class TestStaticSymlink(TestApplicationProto):
prerequisites = {'features': ['chroot']}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
os.makedirs(f'{temp_dir}/assets/dir/dir') os.makedirs(f'{temp_dir}/assets/dir/dir')

View File

@@ -5,8 +5,6 @@ from unit.applications.proto import TestApplicationProto
class TestStaticTypes(TestApplicationProto): class TestStaticTypes(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
Path(f'{temp_dir}/assets').mkdir() Path(f'{temp_dir}/assets').mkdir()

View File

@@ -6,8 +6,6 @@ from unit.applications.proto import TestApplicationProto
class TestStaticVariables(TestApplicationProto): class TestStaticVariables(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self, temp_dir): def setup_method_fixture(self, temp_dir):
os.makedirs(f'{temp_dir}/assets/dir') os.makedirs(f'{temp_dir}/assets/dir')

View File

@@ -4,10 +4,10 @@ from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
from unit.status import Status from unit.status import Status
class TestStatus(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestStatus(TestApplicationPython):
def check_connections(self, accepted, active, idle, closed): def check_connections(self, accepted, active, idle, closed):
assert Status.get('/connections') == { assert Status.get('/connections') == {
'accepted': accepted, 'accepted': accepted,

View File

@@ -1,10 +1,10 @@
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
from unit.status import Status from unit.status import Status
class TestStatusTLS(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}} prerequisites = {'modules': {'openssl': 'any'}}
class TestStatusTLS(TestApplicationTLS):
def test_status_tls_requests(self): def test_status_tls_requests(self):
self.certificate() self.certificate()

View File

@@ -7,10 +7,10 @@ import pytest
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
from unit.option import option from unit.option import option
class TestTLS(TestApplicationTLS):
prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}} prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
class TestTLS(TestApplicationTLS):
def add_tls(self, application='empty', cert='default', port=7080): def add_tls(self, application='empty', cert='default', port=7080):
assert 'success' in self.conf( assert 'success' in self.conf(
{ {

View File

@@ -3,10 +3,10 @@ import ssl
import pytest import pytest
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
class TestTLSConfCommand(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}} prerequisites = {'modules': {'openssl': 'any'}}
class TestTLSConfCommand(TestApplicationTLS):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
self.certificate() self.certificate()

View File

@@ -14,10 +14,10 @@ from OpenSSL.SSL import (
) )
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
class TestTLSSession(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}} prerequisites = {'modules': {'openssl': 'any'}}
class TestTLSSession(TestApplicationTLS):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
self.certificate() self.certificate()

View File

@@ -5,10 +5,10 @@ import pytest
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
from unit.option import option from unit.option import option
class TestTLSSNI(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}} prerequisites = {'modules': {'openssl': 'any'}}
class TestTLSSNI(TestApplicationTLS):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
self._load_conf( self._load_conf(

View File

@@ -11,10 +11,10 @@ from OpenSSL.SSL import (
) )
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
class TestTLSTicket(TestApplicationTLS):
prerequisites = {'modules': {'openssl': 'any'}} prerequisites = {'modules': {'openssl': 'any'}}
class TestTLSTicket(TestApplicationTLS):
ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE' ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE'
ticket2 = '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt' ticket2 = '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt'
ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\ ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\

View File

@@ -1,13 +1,13 @@
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestUnixAbstract(TestApplicationPython):
prerequisites = { prerequisites = {
'modules': {'python': 'any'}, 'modules': {'python': 'any'},
'features': ['unix_abstract'], 'features': {'unix_abstract': True},
} }
class TestUnixAbstract(TestApplicationPython):
def test_unix_abstract_source(self): def test_unix_abstract_source(self):
addr = '\0sock' addr = '\0sock'

View File

@@ -5,10 +5,10 @@ import pytest
from unit.applications.lang.python import TestApplicationPython from unit.applications.lang.python import TestApplicationPython
from unit.option import option from unit.option import option
class TestUpstreamsRR(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestUpstreamsRR(TestApplicationPython):
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
assert 'success' in self.conf( assert 'success' in self.conf(

View File

@@ -5,10 +5,10 @@ from unit.applications.lang.python import TestApplicationPython
from unit.log import Log from unit.log import Log
from unit.utils import waitforfiles from unit.utils import waitforfiles
class TestUSR1(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
class TestUSR1(TestApplicationPython):
def test_usr1_access_log( def test_usr1_access_log(
self, search_in_file, temp_dir, unit_pid, wait_for_record self, search_in_file, temp_dir, unit_pid, wait_for_record
): ):

View File

@@ -7,8 +7,6 @@ from unit.option import option
class TestVariables(TestApplicationProto): class TestVariables(TestApplicationProto):
prerequisites = {}
@pytest.fixture(autouse=True) @pytest.fixture(autouse=True)
def setup_method_fixture(self): def setup_method_fixture(self):
assert 'success' in self.conf( assert 'success' in self.conf(

View File

@@ -1,9 +1,6 @@
import os import os
import re
import time
from unit.control import TestControl from unit.control import TestControl
from unit.log import Log
from unit.option import option from unit.option import option

View File

@@ -0,0 +1,63 @@
import pytest
from unit.option import option
def check_prerequisites(prerequisites):
if 'privileged_user' in prerequisites:
if prerequisites['privileged_user'] and not option.is_privileged:
pytest.skip(
'privileged user required',
allow_module_level=True,
)
elif not prerequisites['privileged_user'] and option.is_privileged:
pytest.skip(
'unprivileged user required',
allow_module_level=True,
)
missed = []
# check modules
if 'modules' in prerequisites:
available = option.available['modules']
for module in prerequisites['modules']:
if module in available and available[module]:
continue
missed.append(module)
if missed:
pytest.skip(
f'Unit has no {", ".join(missed)} module(s)',
allow_module_level=True,
)
# check features
if 'features' in prerequisites:
available = option.available['features']
require = prerequisites['features']
for feature in require:
avail_feature = available[feature]
if feature in available and avail_feature:
if isinstance(require[feature], list) and isinstance(
avail_feature, dict
):
avail_keys = avail_feature.keys()
for key in require[feature]:
if key not in avail_keys:
missed.append(f'{feature}/{key}')
continue
missed.append(feature)
if missed:
pytest.skip(
f'{", ".join(missed)} feature(s) not supported',
allow_module_level=True,
)

View File

@@ -7,9 +7,9 @@ http = TestHTTP()
def check_chroot(): def check_chroot():
available = option.available return (
'success'
resp = http.put( in http.put(
url='/config', url='/config',
sock_type='unix', sock_type='unix',
addr=f'{option.temp_dir}/control.unit.sock', addr=f'{option.temp_dir}/control.unit.sock',
@@ -26,7 +26,5 @@ def check_chroot():
], ],
} }
), ),
)['body']
) )
if 'success' in resp['body']:
available['features']['chroot'] = True

View File

@@ -0,0 +1,47 @@
import subprocess
import sys
from unit.check.chroot import check_chroot
from unit.check.go import check_go
from unit.check.isolation import check_isolation
from unit.check.njs import check_njs
from unit.check.node import check_node
from unit.check.regex import check_regex
from unit.check.tls import check_openssl
from unit.check.unix_abstract import check_unix_abstract
from unit.log import Log
from unit.option import option
def discover_available(unit):
output_version = subprocess.check_output(
[unit['unitd'], '--version'], stderr=subprocess.STDOUT
).decode()
# wait for controller start
if Log.wait_for_record(r'controller started') is None:
Log.print_log()
sys.exit("controller didn't start")
# discover modules from log file
for module in Log.findall(r'module: ([a-zA-Z]+) (.*) ".*"$'):
versions = option.available['modules'].setdefault(module[0], [])
if module[1] not in versions:
versions.append(module[1])
# discover modules using check
option.available['modules']['go'] = check_go()
option.available['modules']['njs'] = check_njs(output_version)
option.available['modules']['node'] = check_node()
option.available['modules']['openssl'] = check_openssl(output_version)
option.available['modules']['regex'] = check_regex(output_version)
# Discover features using check. Features should be discovered after
# modules since some features can require modules.
option.available['features']['chroot'] = check_chroot()
option.available['features']['isolation'] = check_isolation()
option.available['features']['unix_abstract'] = check_unix_abstract()

View File

@@ -2,5 +2,4 @@ from unit.applications.lang.go import TestApplicationGo
def check_go(): def check_go():
if TestApplicationGo.prepare_env('empty') is not None: return TestApplicationGo.prepare_env('empty') is not None
return True

View File

@@ -127,7 +127,7 @@ def check_isolation():
} }
else: else:
return return False
resp = http.put( resp = http.put(
url='/config', url='/config',
@@ -137,23 +137,23 @@ def check_isolation():
) )
if 'success' not in resp['body']: if 'success' not in resp['body']:
return return False
userns = getns('user') userns = getns('user')
if not userns: if not userns:
return return False
available['features']['isolation'] = {'user': userns} isolation = {'user': userns}
unp_clone_path = '/proc/sys/kernel/unprivileged_userns_clone' unp_clone_path = '/proc/sys/kernel/unprivileged_userns_clone'
if os.path.exists(unp_clone_path): if os.path.exists(unp_clone_path):
with open(unp_clone_path, 'r') as f: with open(unp_clone_path, 'r') as f:
if str(f.read()).rstrip() == '1': if str(f.read()).rstrip() == '1':
available['features']['isolation'][ isolation['unprivileged_userns_clone'] = True
'unprivileged_userns_clone'
] = True
for ns in allns: for ns in allns:
ns_value = getns(ns) ns_value = getns(ns)
if ns_value: if ns_value:
available['features']['isolation'][ns] = ns_value isolation[ns] = ns_value
return isolation

View File

@@ -2,5 +2,4 @@ import re
def check_njs(output_version): def check_njs(output_version):
if re.search('--njs', output_version): return re.search('--njs', output_version)
return True

View File

@@ -1,10 +1,12 @@
import os import os
import subprocess import subprocess
from unit.option import option
def check_node(current_dir):
if not os.path.exists(f'{current_dir}/node/node_modules'): def check_node():
return None if not os.path.exists(f'{option.current_dir}/node/node_modules'):
return False
try: try:
v_bytes = subprocess.check_output(['/usr/bin/env', 'node', '-v']) v_bytes = subprocess.check_output(['/usr/bin/env', 'node', '-v'])
@@ -12,4 +14,4 @@ def check_node(current_dir):
return [str(v_bytes, 'utf-8').lstrip('v').rstrip()] return [str(v_bytes, 'utf-8').lstrip('v').rstrip()]
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return None return False

View File

@@ -2,7 +2,4 @@ import re
def check_regex(output_version): def check_regex(output_version):
if re.search('--no-regex', output_version): return not re.search('--no-regex', output_version)
return False
return True

View File

@@ -6,7 +6,6 @@ def check_openssl(output_version):
try: try:
subprocess.check_output(['which', 'openssl']) subprocess.check_output(['which', 'openssl'])
except subprocess.CalledProcessError: except subprocess.CalledProcessError:
return None return False
if re.search('--openssl', output_version): return re.search('--openssl', output_version)
return True

View File

@@ -7,19 +7,19 @@ http = TestHTTP()
def check_unix_abstract(): def check_unix_abstract():
available = option.available return (
'success'
resp = http.put( in http.put(
url='/config', url='/config',
sock_type='unix', sock_type='unix',
addr=f'{option.temp_dir}/control.unit.sock', addr=f'{option.temp_dir}/control.unit.sock',
body=json.dumps( body=json.dumps(
{ {
"listeners": {"unix:@sock": {"pass": "routes"}}, "listeners": {
f'unix:@{option.temp_dir}/sock': {"pass": "routes"}
},
"routes": [], "routes": [],
} }
), ),
)['body']
) )
if 'success' in resp['body']:
available['features']['unix_abstract'] = True

View File

@@ -1,6 +1,7 @@
import os import os
import re import re
import sys import sys
import time
from unit.option import option from unit.option import option
@@ -25,7 +26,7 @@ class Log:
@print_log_on_assert @print_log_on_assert
def check_alerts(log=None): def check_alerts(log=None):
if log is None: if log is None:
log = Log.read(encoding='utf-8') log = Log.read()
found = False found = False
alerts = re.findall(r'.+\[alert\].+', log) alerts = re.findall(r'.+\[alert\].+', log)
@@ -51,12 +52,16 @@ class Log:
if found and option.detailed: if found and option.detailed:
print('skipped.') print('skipped.')
@staticmethod
def findall(pattern, name=UNIT_LOG, flags=re.M):
return re.findall(pattern, Log.read(name), flags)
@staticmethod @staticmethod
def get_path(name=UNIT_LOG): def get_path(name=UNIT_LOG):
return f'{option.temp_dir}/{name}' return f'{option.temp_dir}/{name}'
@staticmethod @staticmethod
def open(name=UNIT_LOG, encoding=None): def open(name=UNIT_LOG, encoding='utf-8'):
file = open(Log.get_path(name), 'r', encoding=encoding, errors='ignore') file = open(Log.get_path(name), 'r', encoding=encoding, errors='ignore')
file.seek(Log.pos.get(name, 0)) file.seek(Log.pos.get(name, 0))
@@ -71,7 +76,7 @@ class Log:
sys.stdout.flush() sys.stdout.flush()
if log is None: if log is None:
log = Log.read(encoding='utf-8') log = Log.read()
sys.stdout.write(log) sys.stdout.write(log)
@@ -93,3 +98,16 @@ class Log:
pos = Log.pos.get(UNIT_LOG, 0) pos = Log.pos.get(UNIT_LOG, 0)
Log.pos[UNIT_LOG] = Log.pos.get(name, 0) Log.pos[UNIT_LOG] = Log.pos.get(name, 0)
Log.pos[name] = pos Log.pos[name] = pos
@staticmethod
def wait_for_record(pattern, name=UNIT_LOG, wait=150, flags=re.M):
with Log.open(name) as file:
for _ in range(wait):
found = re.search(pattern, file.read(), flags)
if found is not None:
break
time.sleep(0.1)
return found

View File

@@ -4,6 +4,7 @@ import platform
class Options: class Options:
_options = { _options = {
'architecture': platform.architecture()[0], 'architecture': platform.architecture()[0],
'available': {'modules': {}, 'features': {}},
'is_privileged': os.geteuid() == 0, 'is_privileged': os.geteuid() == 0,
'skip_alerts': [], 'skip_alerts': [],
'skip_sanitizer': False, 'skip_sanitizer': False,