Files
nginx-unit/test/unit/main.py
2019-04-22 18:52:46 +03:00

325 lines
8.4 KiB
Python

import os
import re
import sys
import time
import fcntl
import shutil
import argparse
import platform
import tempfile
import unittest
import subprocess
from multiprocessing import Process
class TestUnit(unittest.TestCase):
current_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir)
)
pardir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
)
architecture = platform.architecture()[0]
maxDiff = None
detailed = False
save_log = False
def __init__(self, methodName='runTest'):
super().__init__(methodName)
if re.match(r'.*\/run\.py$', sys.argv[0]):
args, rest = TestUnit._parse_args()
TestUnit._set_args(args)
@classmethod
def main(cls):
args, rest = TestUnit._parse_args()
for i, arg in enumerate(rest):
if arg[:5] == 'test_':
rest[i] = cls.__name__ + '.' + arg
sys.argv = sys.argv[:1] + rest
TestUnit._set_args(args)
unittest.main()
@classmethod
def setUpClass(cls):
TestUnit().check_modules(*cls.prerequisites)
def setUp(self):
self._run()
def tearDown(self):
self.stop()
# detect errors and failures for current test
def list2reason(exc_list):
if exc_list and exc_list[-1][0] is self:
return exc_list[-1][1]
if hasattr(self, '_outcome'):
result = self.defaultTestResult()
self._feedErrorsToResult(result, self._outcome.errors)
else:
result = getattr(
self, '_outcomeForDoCleanups', self._resultForDoCleanups
)
success = not list2reason(result.errors) and not list2reason(
result.failures
)
# check unit.log for alerts
unit_log = self.testdir + '/unit.log'
with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
self._check_alerts(f.read())
# remove unit.log
if not TestUnit.save_log and success:
shutil.rmtree(self.testdir)
else:
self._print_path_to_log()
def check_modules(self, *modules):
self._run()
for i in range(50):
with open(self.testdir + '/unit.log', 'r') as f:
log = f.read()
m = re.search('controller started', log)
if m is None:
time.sleep(0.1)
else:
break
if m is None:
self.stop()
exit("Unit is writing log too long")
missed_module = ''
for module in modules:
if module == 'go':
env = os.environ.copy()
env['GOPATH'] = self.pardir + '/go'
try:
process = subprocess.Popen(
[
'go',
'build',
'-o',
self.testdir + '/go/check_module',
self.current_dir + '/go/empty/app.go',
],
env=env,
)
process.communicate()
m = module if process.returncode == 0 else None
except:
m = None
elif module == 'node':
if os.path.isdir(self.pardir + '/node/node_modules'):
m = module
else:
m = None
elif module == 'openssl':
try:
subprocess.check_output(['which', 'openssl'])
output = subprocess.check_output(
[self.unitd, '--version'],
stderr=subprocess.STDOUT,
)
m = re.search('--openssl', output.decode())
except:
m = None
else:
m = re.search('module: ' + module, log)
if m is None:
missed_module = module
break
self.stop()
self._check_alerts(log)
shutil.rmtree(self.testdir)
if missed_module:
raise unittest.SkipTest('Unit has no ' + missed_module + ' module')
def stop(self):
if self._started:
self._stop()
def _run(self):
self.unitd = self.pardir + '/build/unitd'
if not os.path.isfile(self.unitd):
exit("Could not find unit")
self.testdir = tempfile.mkdtemp(prefix='unit-test-')
os.mkdir(self.testdir + '/state')
print()
def _run_unit():
subprocess.call(
[
self.unitd,
'--no-daemon',
'--modules', self.pardir + '/build',
'--state', self.testdir + '/state',
'--pid', self.testdir + '/unit.pid',
'--log', self.testdir + '/unit.log',
'--control', 'unix:' + self.testdir + '/control.unit.sock',
]
)
self._p = Process(target=_run_unit)
self._p.start()
if not self.waitforfiles(
self.testdir + '/unit.pid',
self.testdir + '/unit.log',
self.testdir + '/control.unit.sock',
):
exit("Could not start unit")
self._started = True
self.skip_alerts = [
r'read signalfd\(4\) failed',
r'sendmsg.+failed',
r'recvmsg.+failed',
]
self.skip_sanitizer = False
def _stop(self):
with open(self.testdir + '/unit.pid', 'r') as f:
pid = f.read().rstrip()
subprocess.call(['kill', '-s', 'QUIT', pid])
for i in range(150):
if not os.path.exists(self.testdir + '/unit.pid'):
break
time.sleep(0.1)
if os.path.exists(self.testdir + '/unit.pid'):
exit("Could not terminate unit")
self._started = False
self._p.join(timeout=1)
self._terminate_process(self._p)
def _terminate_process(self, process):
if process.is_alive():
process.terminate()
process.join(timeout=5)
if process.is_alive():
exit("Could not terminate process " + process.pid)
if process.exitcode:
exit("Child process terminated with code " + str(process.exitcode))
def _check_alerts(self, log):
found = False
alerts = re.findall('.+\[alert\].+', log)
if alerts:
print('All alerts/sanitizer errors found in log:')
[print(alert) for alert in alerts]
found = True
if self.skip_alerts:
for skip in self.skip_alerts:
alerts = [al for al in alerts if re.search(skip, al) is None]
if alerts:
self._print_path_to_log()
self.assertFalse(alerts, 'alert(s)')
if not self.skip_sanitizer:
sanitizer_errors = re.findall('.+Sanitizer.+', log)
if sanitizer_errors:
self._print_path_to_log()
self.assertFalse(sanitizer_errors, 'sanitizer error(s)')
if found:
print('skipped.')
def waitforfiles(self, *files):
for i in range(50):
wait = False
ret = False
for f in files:
if not os.path.exists(f):
wait = True
break
if wait:
time.sleep(0.1)
else:
ret = True
break
return ret
@staticmethod
def _parse_args():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-d',
'--detailed',
dest='detailed',
action='store_true',
help='Detailed output for tests',
)
parser.add_argument(
'-l',
'--log',
dest='save_log',
action='store_true',
help='Save unit.log after the test execution',
)
return parser.parse_known_args()
@staticmethod
def _set_args(args):
TestUnit.detailed = args.detailed
TestUnit.save_log = args.save_log
if TestUnit.detailed:
fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0)
def _print_path_to_log(self):
print('Path to unit.log:\n' + self.testdir + '/unit.log')