Tests: rearranging functions in main.py.
This commit is contained in:
@@ -152,47 +152,6 @@ class TestUnit(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self._run()
|
||||
|
||||
def tearDown(self):
|
||||
self.stop()
|
||||
|
||||
# detect errors and failures for current test
|
||||
|
||||
def list2reason(exc_list):
|
||||
if exc_list and exc_list[-1][0] is self:
|
||||
return exc_list[-1][1]
|
||||
|
||||
if hasattr(self, '_outcome'):
|
||||
result = self.defaultTestResult()
|
||||
self._feedErrorsToResult(result, self._outcome.errors)
|
||||
else:
|
||||
result = getattr(
|
||||
self, '_outcomeForDoCleanups', self._resultForDoCleanups
|
||||
)
|
||||
|
||||
success = not list2reason(result.errors) and not list2reason(
|
||||
result.failures
|
||||
)
|
||||
|
||||
# check unit.log for alerts
|
||||
|
||||
unit_log = self.testdir + '/unit.log'
|
||||
|
||||
with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
self._check_alerts(f.read())
|
||||
|
||||
# remove unit.log
|
||||
|
||||
if not TestUnit.save_log and success:
|
||||
shutil.rmtree(self.testdir)
|
||||
|
||||
else:
|
||||
self._print_log()
|
||||
|
||||
def stop(self):
|
||||
self._stop()
|
||||
self.stop_processes()
|
||||
atexit.unregister(self.stop)
|
||||
|
||||
def _run(self):
|
||||
build_dir = self.pardir + '/build'
|
||||
self.unitd = build_dir + '/unitd'
|
||||
@@ -237,6 +196,47 @@ class TestUnit(unittest.TestCase):
|
||||
]
|
||||
self.skip_sanitizer = False
|
||||
|
||||
def tearDown(self):
|
||||
self.stop()
|
||||
|
||||
# detect errors and failures for current test
|
||||
|
||||
def list2reason(exc_list):
|
||||
if exc_list and exc_list[-1][0] is self:
|
||||
return exc_list[-1][1]
|
||||
|
||||
if hasattr(self, '_outcome'):
|
||||
result = self.defaultTestResult()
|
||||
self._feedErrorsToResult(result, self._outcome.errors)
|
||||
else:
|
||||
result = getattr(
|
||||
self, '_outcomeForDoCleanups', self._resultForDoCleanups
|
||||
)
|
||||
|
||||
success = not list2reason(result.errors) and not list2reason(
|
||||
result.failures
|
||||
)
|
||||
|
||||
# check unit.log for alerts
|
||||
|
||||
unit_log = self.testdir + '/unit.log'
|
||||
|
||||
with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
self._check_alerts(f.read())
|
||||
|
||||
# remove unit.log
|
||||
|
||||
if not TestUnit.save_log and success:
|
||||
shutil.rmtree(self.testdir)
|
||||
|
||||
else:
|
||||
self._print_log()
|
||||
|
||||
def stop(self):
|
||||
self._stop()
|
||||
self.stop_processes()
|
||||
atexit.unregister(self.stop)
|
||||
|
||||
def _stop(self):
|
||||
if self._p.poll() is not None:
|
||||
return
|
||||
@@ -254,34 +254,6 @@ class TestUnit(unittest.TestCase):
|
||||
p.kill()
|
||||
self.fail("Could not terminate unit")
|
||||
|
||||
def _check_alerts(self, log):
|
||||
found = False
|
||||
|
||||
alerts = re.findall('.+\[alert\].+', log)
|
||||
|
||||
if alerts:
|
||||
print('All alerts/sanitizer errors found in log:')
|
||||
[print(alert) for alert in alerts]
|
||||
found = True
|
||||
|
||||
if self.skip_alerts:
|
||||
for skip in self.skip_alerts:
|
||||
alerts = [al for al in alerts if re.search(skip, al) is None]
|
||||
|
||||
if alerts:
|
||||
self._print_log(log)
|
||||
self.assertFalse(alerts, 'alert(s)')
|
||||
|
||||
if not self.skip_sanitizer:
|
||||
sanitizer_errors = re.findall('.+Sanitizer.+', log)
|
||||
|
||||
if sanitizer_errors:
|
||||
self._print_log(log)
|
||||
self.assertFalse(sanitizer_errors, 'sanitizer error(s)')
|
||||
|
||||
if found:
|
||||
print('skipped.')
|
||||
|
||||
def run_process(self, target, *args):
|
||||
if not hasattr(self, '_processes'):
|
||||
self._processes = []
|
||||
@@ -331,6 +303,34 @@ class TestUnit(unittest.TestCase):
|
||||
for f in files:
|
||||
os.chmod(os.path.join(root, f), 0o777)
|
||||
|
||||
def _check_alerts(self, log):
|
||||
found = False
|
||||
|
||||
alerts = re.findall('.+\[alert\].+', log)
|
||||
|
||||
if alerts:
|
||||
print('All alerts/sanitizer errors found in log:')
|
||||
[print(alert) for alert in alerts]
|
||||
found = True
|
||||
|
||||
if self.skip_alerts:
|
||||
for skip in self.skip_alerts:
|
||||
alerts = [al for al in alerts if re.search(skip, al) is None]
|
||||
|
||||
if alerts:
|
||||
self._print_log(log)
|
||||
self.assertFalse(alerts, 'alert(s)')
|
||||
|
||||
if not self.skip_sanitizer:
|
||||
sanitizer_errors = re.findall('.+Sanitizer.+', log)
|
||||
|
||||
if sanitizer_errors:
|
||||
self._print_log(log)
|
||||
self.assertFalse(sanitizer_errors, 'sanitizer error(s)')
|
||||
|
||||
if found:
|
||||
print('skipped.')
|
||||
|
||||
@staticmethod
|
||||
def _parse_args():
|
||||
parser = argparse.ArgumentParser(add_help=False)
|
||||
|
||||
Reference in New Issue
Block a user