Tests: Log reworked.

All log-related code moved to the log.py.
This commit is contained in:
Andrei Zeliankou
2023-05-29 14:23:52 +01:00
parent b034bf6703
commit f55818059c
3 changed files with 90 additions and 80 deletions

View File

@@ -24,6 +24,7 @@ from unit.check.tls import check_openssl
from unit.check.unix_abstract import check_unix_abstract from unit.check.unix_abstract import check_unix_abstract
from unit.http import TestHTTP from unit.http import TestHTTP
from unit.log import Log from unit.log import Log
from unit.log import print_log_on_assert
from unit.option import option from unit.option import option
from unit.status import Status from unit.status import Status
from unit.utils import check_findmnt from unit.utils import check_findmnt
@@ -120,17 +121,6 @@ def pytest_configure(config):
fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0) fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0)
def print_log_on_assert(func):
def inner_function(*args, **kwargs):
try:
func(*args, **kwargs)
except AssertionError as e:
_print_log(kwargs.get('log', None))
raise e
return inner_function
def pytest_generate_tests(metafunc): def pytest_generate_tests(metafunc):
cls = metafunc.cls cls = metafunc.cls
if ( if (
@@ -196,7 +186,7 @@ def pytest_sessionstart():
break break
if m is None: if m is None:
_print_log(log) Log.print_log(log)
exit("Unit is writing log too long") exit("Unit is writing log too long")
# discover available modules from unit.log # discover available modules from unit.log
@@ -228,7 +218,7 @@ def pytest_sessionstart():
unit_stop() unit_stop()
_check_alerts() Log.check_alerts()
if option.restart: if option.restart:
shutil.rmtree(unit_instance['temp_dir']) shutil.rmtree(unit_instance['temp_dir'])
@@ -331,17 +321,17 @@ def run(request):
# print unit.log in case of error # print unit.log in case of error
if hasattr(request.node, 'rep_call') and request.node.rep_call.failed: if hasattr(request.node, 'rep_call') and request.node.rep_call.failed:
_print_log(log) Log.print_log(log)
if error_stop_unit or error_stop_processes: if error_stop_unit or error_stop_processes:
_print_log(log) Log.print_log(log)
# check unit.log for errors # check unit.log for errors
assert error_stop_unit is None, 'stop unit' assert error_stop_unit is None, 'stop unit'
assert error_stop_processes is None, 'stop processes' assert error_stop_processes is None, 'stop processes'
_check_alerts(log=log) Log.check_alerts(log=log)
def unit_run(state_dir=None): def unit_run(state_dir=None):
@@ -360,6 +350,7 @@ def unit_run(state_dir=None):
exit('Could not find unit') exit('Could not find unit')
temp_dir = tempfile.mkdtemp(prefix='unit-test-') temp_dir = tempfile.mkdtemp(prefix='unit-test-')
option.temp_dir = temp_dir
public_dir(temp_dir) public_dir(temp_dir)
if oct(stat.S_IMODE(os.stat(builddir).st_mode)) != '0o777': if oct(stat.S_IMODE(os.stat(builddir).st_mode)) != '0o777':
@@ -394,18 +385,14 @@ def unit_run(state_dir=None):
with open(f'{temp_dir}/unit.log', 'w') as log: with open(f'{temp_dir}/unit.log', 'w') as log:
unit_instance['process'] = subprocess.Popen(unitd_args, stderr=log) unit_instance['process'] = subprocess.Popen(unitd_args, stderr=log)
Log.temp_dir = temp_dir
if not waitforfiles(control_sock): if not waitforfiles(control_sock):
_print_log() Log.print_log()
exit('Could not start unit') exit('Could not start unit')
unit_instance['temp_dir'] = temp_dir unit_instance['temp_dir'] = temp_dir
unit_instance['control_sock'] = control_sock unit_instance['control_sock'] = control_sock
unit_instance['unitd'] = unitd unit_instance['unitd'] = unitd
option.temp_dir = temp_dir
with open(f'{temp_dir}/unit.pid', 'r') as f: with open(f'{temp_dir}/unit.pid', 'r') as f:
unit_instance['pid'] = f.read().rstrip() unit_instance['pid'] = f.read().rstrip()
@@ -465,53 +452,6 @@ def unit_stop():
return 'Could not terminate unit' return 'Could not terminate unit'
@print_log_on_assert
def _check_alerts(*, log=None):
if log is None:
with Log.open(encoding='utf-8') as f:
log = f.read()
found = False
alerts = re.findall(r'.+\[alert\].+', log)
if alerts:
found = True
if option.detailed:
print('\nAll alerts/sanitizer errors found in log:')
[print(alert) for alert in alerts]
if option.skip_alerts:
for skip in option.skip_alerts:
alerts = [al for al in alerts if re.search(skip, al) is None]
assert not alerts, 'alert(s)'
if not option.skip_sanitizer:
sanitizer_errors = re.findall('.+Sanitizer.+', log)
assert not sanitizer_errors, 'sanitizer error(s)'
if found and option.detailed:
print('skipped.')
def _print_log(log=None):
path = Log.get_path()
print(f'Path to unit.log:\n{path}\n')
if option.print_log:
os.set_blocking(sys.stdout.fileno(), True)
sys.stdout.flush()
if log is None:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
shutil.copyfileobj(f, sys.stdout)
else:
sys.stdout.write(log)
@print_log_on_assert @print_log_on_assert
def _clear_conf(sock, *, log=None): def _clear_conf(sock, *, log=None):
resp = http.put( resp = http.put(
@@ -769,7 +709,7 @@ def unit_pid():
def pytest_sessionfinish(): def pytest_sessionfinish():
if not option.restart and option.save_log: if not option.restart and option.save_log:
print(f'Path to unit.log:\n{Log.get_path()}\n') Log.print_path()
option.restart = True option.restart = True

View File

@@ -17,12 +17,10 @@ class TestApplicationProto(TestControl):
return time.mktime(time.strptime(date, template)) return time.mktime(time.strptime(date, template))
def findall(self, pattern, name='unit.log', flags=re.M): def findall(self, pattern, name='unit.log', flags=re.M):
with Log.open(name) as f: return re.findall(pattern, Log.read(name), flags)
return re.findall(pattern, f.read(), flags)
def search_in_log(self, pattern, name='unit.log', flags=re.M): def search_in_log(self, pattern, name='unit.log', flags=re.M):
with Log.open(name) as f: return re.search(pattern, Log.read(name), flags)
return re.search(pattern, f.read(), flags)
def wait_for_record(self, pattern, name='unit.log', wait=150, flags=re.M): def wait_for_record(self, pattern, name='unit.log', wait=150, flags=re.M):
with Log.open(name) as f: with Log.open(name) as f:

View File

@@ -1,23 +1,95 @@
import os
import re
import sys
from unit.option import option
UNIT_LOG = 'unit.log' UNIT_LOG = 'unit.log'
def print_log_on_assert(func):
def inner_function(*args, **kwargs):
try:
func(*args, **kwargs)
except AssertionError as exception:
Log.print_log(*args, **kwargs)
raise exception
return inner_function
class Log: class Log:
temp_dir = None
pos = {} pos = {}
@staticmethod
@print_log_on_assert
def check_alerts(log=None):
if log is None:
log = Log.read(encoding='utf-8')
found = False
alerts = re.findall(r'.+\[alert\].+', log)
if alerts:
found = True
if option.detailed:
print('\nAll alerts/sanitizer errors found in log:')
_ = [print(alert) for alert in alerts]
if option.skip_alerts:
for skip in option.skip_alerts:
alerts = [al for al in alerts if re.search(skip, al) is None]
assert not alerts, 'alert(s)'
if not option.skip_sanitizer:
sanitizer_errors = re.findall('.+Sanitizer.+', log)
assert not sanitizer_errors, 'sanitizer error(s)'
if found and option.detailed:
print('skipped.')
@staticmethod
def get_path(name=UNIT_LOG):
return f'{option.temp_dir}/{name}'
@staticmethod
def open(name=UNIT_LOG, encoding=None): def open(name=UNIT_LOG, encoding=None):
f = open(Log.get_path(name), 'r', encoding=encoding, errors='ignore') file = open(Log.get_path(name), 'r', encoding=encoding, errors='ignore')
f.seek(Log.pos.get(name, 0)) file.seek(Log.pos.get(name, 0))
return f return file
@staticmethod
def print_log(log=None):
Log.print_path()
if option.print_log:
os.set_blocking(sys.stdout.fileno(), True)
sys.stdout.flush()
if log is None:
log = Log.read(encoding='utf-8')
sys.stdout.write(log)
@staticmethod
def print_path():
print(f'Path to {UNIT_LOG}:\n{Log.get_path()}\n')
@staticmethod
def read(*args, **kwargs):
with Log.open(*args, **kwargs) as file:
return file.read()
@staticmethod
def set_pos(pos, name=UNIT_LOG): def set_pos(pos, name=UNIT_LOG):
Log.pos[name] = pos Log.pos[name] = pos
@staticmethod
def swap(name): def swap(name):
pos = Log.pos.get(UNIT_LOG, 0) pos = Log.pos.get(UNIT_LOG, 0)
Log.pos[UNIT_LOG] = Log.pos.get(name, 0) Log.pos[UNIT_LOG] = Log.pos.get(name, 0)
Log.pos[name] = pos Log.pos[name] = pos
def get_path(name=UNIT_LOG):
return f'{Log.temp_dir}/{name}'