Tests: refactored prerequisites model.
This commit is contained in:
@@ -4,6 +4,19 @@ from unit.applications.proto import TestApplicationProto
|
||||
|
||||
|
||||
class TestApplicationGo(TestApplicationProto):
|
||||
@classmethod
|
||||
def setUpClass(cls, complete_check=True):
|
||||
unit = super().setUpClass(complete_check=False)
|
||||
|
||||
# check go module
|
||||
|
||||
go_app = TestApplicationGo()
|
||||
go_app.testdir = unit.testdir
|
||||
if go_app.prepare_env('empty', 'app').returncode == 0:
|
||||
cls.available['modules']['go'] = []
|
||||
|
||||
return unit if not complete_check else unit.complete()
|
||||
|
||||
def prepare_env(self, script, name):
|
||||
if not os.path.exists(self.testdir + '/go'):
|
||||
os.mkdir(self.testdir + '/go')
|
||||
|
||||
@@ -4,6 +4,17 @@ from unit.applications.proto import TestApplicationProto
|
||||
|
||||
|
||||
class TestApplicationNode(TestApplicationProto):
|
||||
@classmethod
|
||||
def setUpClass(cls, complete_check=True):
|
||||
unit = super().setUpClass(complete_check=False)
|
||||
|
||||
# check node module
|
||||
|
||||
if os.path.exists(unit.pardir + '/node/node_modules'):
|
||||
cls.available['modules']['node'] = []
|
||||
|
||||
return unit if not complete_check else unit.complete()
|
||||
|
||||
def load(self, script, name='app.js'):
|
||||
# copy application
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import os
|
||||
import re
|
||||
import ssl
|
||||
import subprocess
|
||||
from unit.applications.proto import TestApplicationProto
|
||||
@@ -12,6 +13,27 @@ class TestApplicationTLS(TestApplicationProto):
|
||||
self.context.check_hostname = False
|
||||
self.context.verify_mode = ssl.CERT_NONE
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, complete_check=True):
|
||||
unit = super().setUpClass(complete_check=False)
|
||||
|
||||
# check tls module
|
||||
|
||||
try:
|
||||
subprocess.check_output(['which', 'openssl'])
|
||||
|
||||
output = subprocess.check_output(
|
||||
[unit.unitd, '--version'], stderr=subprocess.STDOUT
|
||||
)
|
||||
|
||||
if re.search('--openssl', output.decode()):
|
||||
cls.available['modules']['openssl'] = []
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
return unit if not complete_check else unit.complete()
|
||||
|
||||
def certificate(self, name='default', load=True):
|
||||
self.openssl_conf()
|
||||
|
||||
|
||||
@@ -12,8 +12,6 @@ import subprocess
|
||||
from multiprocessing import Process
|
||||
|
||||
|
||||
available_modules = {}
|
||||
|
||||
class TestUnit(unittest.TestCase):
|
||||
|
||||
current_dir = os.path.abspath(
|
||||
@@ -45,9 +43,9 @@ class TestUnit(unittest.TestCase):
|
||||
# rerun test for each available module version
|
||||
|
||||
type = self.application_type
|
||||
for prerequisite in self.prerequisites:
|
||||
if prerequisite in available_modules:
|
||||
for version in available_modules[prerequisite]:
|
||||
for module in self.prerequisites['modules']:
|
||||
if module in self.available['modules']:
|
||||
for version in self.available['modules'][module]:
|
||||
self.application_type = type + ' ' + version
|
||||
super().run(result)
|
||||
|
||||
@@ -66,8 +64,83 @@ class TestUnit(unittest.TestCase):
|
||||
unittest.main()
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
TestUnit().check_modules(*cls.prerequisites)
|
||||
def setUpClass(cls, complete_check=True):
|
||||
cls.available = {'modules': {}, 'features': {}}
|
||||
unit = TestUnit()
|
||||
|
||||
unit._run()
|
||||
|
||||
# read unit.log
|
||||
|
||||
for i in range(50):
|
||||
with open(unit.testdir + '/unit.log', 'r') as f:
|
||||
log = f.read()
|
||||
m = re.search('controller started', log)
|
||||
|
||||
if m is None:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
break
|
||||
|
||||
if m is None:
|
||||
unit.stop()
|
||||
exit("Unit is writing log too long")
|
||||
|
||||
# discover available modules from unit.log
|
||||
|
||||
for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M):
|
||||
if module[0] not in cls.available['modules']:
|
||||
cls.available['modules'][module[0]] = [module[1]]
|
||||
else:
|
||||
cls.available['modules'][module[0]].append(module[1])
|
||||
|
||||
def check(available, prerequisites):
|
||||
missed = []
|
||||
|
||||
# check modules
|
||||
|
||||
if 'modules' in prerequisites:
|
||||
available_modules = list(available['modules'].keys())
|
||||
|
||||
for module in prerequisites['modules']:
|
||||
if module in available_modules:
|
||||
continue
|
||||
|
||||
missed.append(module)
|
||||
|
||||
if missed:
|
||||
print('Unit has no ' + ', '.join(missed) + ' module(s)')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
# check features
|
||||
|
||||
if 'features' in prerequisites:
|
||||
available_features = list(available['modules'].keys())
|
||||
|
||||
for feature in prerequisites['features']:
|
||||
if feature in available_features:
|
||||
continue
|
||||
|
||||
missed.append(feature)
|
||||
|
||||
if missed:
|
||||
print(', '.join(missed) + ' feature(s) not supported')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
def destroy():
|
||||
unit.stop()
|
||||
unit._check_alerts(log)
|
||||
shutil.rmtree(unit.testdir)
|
||||
|
||||
def complete():
|
||||
destroy()
|
||||
check(cls.available, cls.prerequisites)
|
||||
|
||||
if complete_check:
|
||||
complete()
|
||||
else:
|
||||
unit.complete = complete
|
||||
return unit
|
||||
|
||||
def setUp(self):
|
||||
self._run()
|
||||
@@ -108,92 +181,6 @@ class TestUnit(unittest.TestCase):
|
||||
else:
|
||||
self._print_path_to_log()
|
||||
|
||||
def check_modules(self, *modules):
|
||||
self._run()
|
||||
|
||||
for i in range(50):
|
||||
with open(self.testdir + '/unit.log', 'r') as f:
|
||||
log = f.read()
|
||||
m = re.search('controller started', log)
|
||||
|
||||
if m is None:
|
||||
time.sleep(0.1)
|
||||
else:
|
||||
break
|
||||
|
||||
if m is None:
|
||||
self.stop()
|
||||
exit("Unit is writing log too long")
|
||||
|
||||
# discover all available modules
|
||||
|
||||
global available_modules
|
||||
available_modules = {}
|
||||
for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M):
|
||||
if module[0] not in available_modules:
|
||||
available_modules[module[0]] = [module[1]]
|
||||
else:
|
||||
available_modules[module[0]].append(module[1])
|
||||
|
||||
missed_module = ''
|
||||
for module in modules:
|
||||
if module == 'go':
|
||||
env = os.environ.copy()
|
||||
env['GOPATH'] = self.pardir + '/go'
|
||||
|
||||
try:
|
||||
process = subprocess.Popen(
|
||||
[
|
||||
'go',
|
||||
'build',
|
||||
'-o',
|
||||
self.testdir + '/go/check_module',
|
||||
self.current_dir + '/go/empty/app.go',
|
||||
],
|
||||
env=env,
|
||||
)
|
||||
process.communicate()
|
||||
|
||||
m = module if process.returncode == 0 else None
|
||||
|
||||
except:
|
||||
m = None
|
||||
|
||||
elif module == 'node':
|
||||
if os.path.isdir(self.pardir + '/node/node_modules'):
|
||||
m = module
|
||||
else:
|
||||
m = None
|
||||
|
||||
elif module == 'openssl':
|
||||
try:
|
||||
subprocess.check_output(['which', 'openssl'])
|
||||
|
||||
output = subprocess.check_output(
|
||||
[self.unitd, '--version'],
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
m = re.search('--openssl', output.decode())
|
||||
|
||||
except:
|
||||
m = None
|
||||
|
||||
else:
|
||||
if module not in available_modules:
|
||||
m = None
|
||||
|
||||
if m is None:
|
||||
missed_module = module
|
||||
break
|
||||
|
||||
self.stop()
|
||||
self._check_alerts(log)
|
||||
shutil.rmtree(self.testdir)
|
||||
|
||||
if missed_module:
|
||||
raise unittest.SkipTest('Unit has no ' + missed_module + ' module')
|
||||
|
||||
def stop(self):
|
||||
if self._started:
|
||||
self._stop()
|
||||
|
||||
Reference in New Issue
Block a user