Tests: migrated to the pytest.

This commit is contained in:
Andrei Zeliankou
2020-09-16 21:31:15 +01:00
parent 77ecb6ab49
commit d5e9159340
55 changed files with 4717 additions and 6262 deletions

View File

@@ -2,17 +2,18 @@ import os
import subprocess
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationGo(TestApplicationProto):
@classmethod
def setUpClass(cls, complete_check=True):
unit = super().setUpClass(complete_check=False)
def setup_class(cls, complete_check=True):
unit = super().setup_class(complete_check=False)
# check go module
go_app = TestApplicationGo()
go_app.testdir = unit.testdir
go_app.temp_dir = unit.temp_dir
proc = go_app.prepare_env('empty', 'app')
if proc and proc.returncode == 0:
cls.available['modules']['go'] = []
@@ -20,8 +21,8 @@ class TestApplicationGo(TestApplicationProto):
return unit if not complete_check else unit.complete()
def prepare_env(self, script, name, static=False):
if not os.path.exists(self.testdir + '/go'):
os.mkdir(self.testdir + '/go')
if not os.path.exists(self.temp_dir + '/go'):
os.mkdir(self.temp_dir + '/go')
env = os.environ.copy()
env['GOPATH'] = self.pardir + '/build/go'
@@ -35,16 +36,16 @@ class TestApplicationGo(TestApplicationProto):
'-ldflags',
'-extldflags "-static"',
'-o',
self.testdir + '/go/' + name,
self.current_dir + '/go/' + script + '/' + name + '.go',
self.temp_dir + '/go/' + name,
option.test_dir + '/go/' + script + '/' + name + '.go',
]
else:
args = [
'go',
'build',
'-o',
self.testdir + '/go/' + name,
self.current_dir + '/go/' + script + '/' + name + '.go',
self.temp_dir + '/go/' + name,
option.test_dir + '/go/' + script + '/' + name + '.go',
]
try:
@@ -59,8 +60,8 @@ class TestApplicationGo(TestApplicationProto):
def load(self, script, name='app', **kwargs):
static_build = False
wdir = self.current_dir + "/go/" + script
executable = self.testdir + "/go/" + name
wdir = option.test_dir + "/go/" + script
executable = self.temp_dir + "/go/" + name
if 'isolation' in kwargs and 'rootfs' in kwargs['isolation']:
wdir = "/go/"

View File

@@ -1,17 +1,19 @@
import glob
import os
import pytest
import shutil
import subprocess
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationJava(TestApplicationProto):
def load(self, script, name='app', **kwargs):
app_path = self.testdir + '/java'
app_path = self.temp_dir + '/java'
web_inf_path = app_path + '/WEB-INF/'
classes_path = web_inf_path + 'classes/'
script_path = self.current_dir + '/java/' + script + '/'
script_path = option.test_dir + '/java/' + script + '/'
if not os.path.isdir(app_path):
os.makedirs(app_path)
@@ -54,7 +56,7 @@ class TestApplicationJava(TestApplicationProto):
)
if not ws_jars:
self.fail('websocket api jar not found.')
pytest.fail('websocket api jar not found.')
javac = [
'javac',
@@ -69,7 +71,7 @@ class TestApplicationJava(TestApplicationProto):
process.communicate()
except:
self.fail('Cann\'t run javac process.')
pytest.fail('Cann\'t run javac process.')
self._load_conf(
{

View File

@@ -3,12 +3,13 @@ import shutil
from urllib.parse import quote
from unit.applications.proto import TestApplicationProto
from conftest import option, public_dir
class TestApplicationNode(TestApplicationProto):
@classmethod
def setUpClass(cls, complete_check=True):
unit = super().setUpClass(complete_check=False)
def setup_class(cls, complete_check=True):
unit = super().setup_class(complete_check=False)
# check node module
@@ -21,17 +22,17 @@ class TestApplicationNode(TestApplicationProto):
# copy application
shutil.copytree(
self.current_dir + '/node/' + script, self.testdir + '/node'
option.test_dir + '/node/' + script, self.temp_dir + '/node'
)
# copy modules
shutil.copytree(
self.pardir + '/node/node_modules',
self.testdir + '/node/node_modules',
self.temp_dir + '/node/node_modules',
)
self.public_dir(self.testdir + '/node')
public_dir(self.temp_dir + '/node')
self._load_conf(
{
@@ -42,7 +43,7 @@ class TestApplicationNode(TestApplicationProto):
script: {
"type": "external",
"processes": {"spare": 0},
"working_directory": self.testdir + '/node',
"working_directory": self.temp_dir + '/node',
"executable": name,
}
},

View File

@@ -1,11 +1,12 @@
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationPerl(TestApplicationProto):
application_type = "perl"
def load(self, script, name='psgi.pl', **kwargs):
script_path = self.current_dir + '/perl/' + script
script_path = option.test_dir + '/perl/' + script
self._load_conf(
{

View File

@@ -1,11 +1,12 @@
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationPHP(TestApplicationProto):
application_type = "php"
def load(self, script, index='index.php', **kwargs):
script_path = self.current_dir + '/php/' + script
script_path = option.test_dir + '/php/' + script
self._load_conf(
{

View File

@@ -1,20 +1,23 @@
import os
import shutil
import pytest
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationPython(TestApplicationProto):
application_type = "python"
def load(self, script, name=None, **kwargs):
print()
if name is None:
name = script
if script[0] == '/':
script_path = script
else:
script_path = self.current_dir + '/python/' + script
script_path = option.test_dir + '/python/' + script
if kwargs.get('isolation') and kwargs['isolation'].get('rootfs'):
rootfs = kwargs['isolation']['rootfs']
@@ -27,12 +30,17 @@ class TestApplicationPython(TestApplicationProto):
script_path = '/app/python/' + name
appication_type = self.get_appication_type()
if appication_type is None:
appication_type = self.application_type
self._load_conf(
{
"listeners": {"*:7080": {"pass": "applications/" + name}},
"applications": {
name: {
"type": self.application_type,
"type": appication_type,
"processes": {"spare": 0},
"path": script_path,
"working_directory": script_path,

View File

@@ -1,11 +1,12 @@
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationRuby(TestApplicationProto):
application_type = "ruby"
def load(self, script, name='config.ru', **kwargs):
script_path = self.current_dir + '/ruby/' + script
script_path = option.test_dir + '/ruby/' + script
self._load_conf(
{

View File

@@ -1,7 +1,9 @@
import os
import re
import time
from unit.control import TestControl
from conftest import option
class TestApplicationProto(TestControl):
@@ -12,7 +14,7 @@ class TestApplicationProto(TestControl):
return time.mktime(time.strptime(date, template))
def search_in_log(self, pattern, name='unit.log'):
with open(self.testdir + '/' + name, 'r', errors='ignore') as f:
with open(self.temp_dir + '/' + name, 'r', errors='ignore') as f:
return re.search(pattern, f.read())
def wait_for_record(self, pattern, name='unit.log'):
@@ -26,6 +28,16 @@ class TestApplicationProto(TestControl):
return found
def get_appication_type(self):
current_test = (
os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]
)
if current_test in option.generated_tests:
return option.generated_tests[current_test]
return None
def _load_conf(self, conf, **kwargs):
if 'applications' in conf:
for app in conf['applications'].keys():
@@ -39,6 +51,4 @@ class TestApplicationProto(TestControl):
if 'isolation' in kwargs:
app_conf['isolation'] = kwargs['isolation']
self.assertIn(
'success', self.conf(conf), 'load application configuration'
)
assert 'success' in self.conf(conf), 'load application configuration'

View File

@@ -4,19 +4,20 @@ import ssl
import subprocess
from unit.applications.proto import TestApplicationProto
from conftest import option
class TestApplicationTLS(TestApplicationProto):
def __init__(self, test):
super().__init__(test)
def setup_method(self):
super().setup_method()
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_NONE
@classmethod
def setUpClass(cls, complete_check=True):
unit = super().setUpClass(complete_check=False)
def setup_class(cls, complete_check=True):
unit = super().setup_class(complete_check=False)
# check tls module
@@ -45,9 +46,9 @@ class TestApplicationTLS(TestApplicationProto):
'-x509',
'-new',
'-subj', '/CN=' + name + '/',
'-config', self.testdir + '/openssl.conf',
'-out', self.testdir + '/' + name + '.crt',
'-keyout', self.testdir + '/' + name + '.key',
'-config', self.temp_dir + '/openssl.conf',
'-out', self.temp_dir + '/' + name + '.crt',
'-keyout', self.temp_dir + '/' + name + '.key',
],
stderr=subprocess.STDOUT,
)
@@ -59,8 +60,8 @@ class TestApplicationTLS(TestApplicationProto):
if key is None:
key = crt
key_path = self.testdir + '/' + key + '.key'
crt_path = self.testdir + '/' + crt + '.crt'
key_path = self.temp_dir + '/' + key + '.key'
crt_path = self.temp_dir + '/' + crt + '.crt'
with open(key_path, 'rb') as k, open(crt_path, 'rb') as c:
return self.conf(k.read() + c.read(), '/certificates/' + crt)
@@ -87,7 +88,7 @@ class TestApplicationTLS(TestApplicationProto):
return ssl.get_server_certificate(addr, ssl_version=ssl_version)
def openssl_conf(self):
conf_path = self.testdir + '/openssl.conf'
conf_path = self.temp_dir + '/openssl.conf'
if os.path.exists(conf_path):
return
@@ -105,7 +106,7 @@ distinguished_name = req_distinguished_name
if name is None:
name = script
script_path = self.current_dir + '/python/' + script
script_path = option.test_dir + '/python/' + script
self._load_conf(
{

View File

@@ -1,6 +1,7 @@
import base64
import hashlib
import itertools
import pytest
import random
import re
import select
@@ -21,9 +22,6 @@ class TestApplicationWebsocket(TestApplicationProto):
OP_PONG = 0x0A
CLOSE_CODES = [1000, 1001, 1002, 1003, 1007, 1008, 1009, 1010, 1011]
def __init__(self, preinit=False):
self.preinit = preinit
def key(self):
raw_key = bytes(random.getrandbits(8) for _ in range(16))
return base64.b64encode(raw_key).decode()
@@ -56,7 +54,7 @@ class TestApplicationWebsocket(TestApplicationProto):
while True:
rlist = select.select([sock], [], [], 60)[0]
if not rlist:
self.fail('Can\'t read response from server.')
pytest.fail('Can\'t read response from server.')
resp += sock.recv(4096).decode()
@@ -84,7 +82,7 @@ class TestApplicationWebsocket(TestApplicationProto):
# For all current cases if the "read_timeout" was changed
# than test do not expect to get a response from server.
if read_timeout == 60:
self.fail('Can\'t read response from server.')
pytest.fail('Can\'t read response from server.')
break
data += sock.recv(bytes - len(data))
@@ -130,19 +128,19 @@ class TestApplicationWebsocket(TestApplicationProto):
code, = struct.unpack('!H', data[:2])
reason = data[2:].decode('utf-8')
if not (code in self.CLOSE_CODES or 3000 <= code < 5000):
self.fail('Invalid status code')
pytest.fail('Invalid status code')
frame['code'] = code
frame['reason'] = reason
elif length == 0:
frame['code'] = 1005
frame['reason'] = ''
else:
self.fail('Close frame too short')
pytest.fail('Close frame too short')
frame['data'] = data
if frame['mask']:
self.fail('Received frame with mask')
pytest.fail('Received frame with mask')
return frame

View File

@@ -53,7 +53,7 @@ class TestControl(TestHTTP):
args = {
'url': url,
'sock_type': 'unix',
'addr': self.testdir + '/control.unit.sock',
'addr': self.temp_dir + '/control.unit.sock',
}
if conf is not None:

View File

@@ -13,7 +13,7 @@ from unit.applications.proto import TestApplicationProto
class TestFeatureIsolation(TestApplicationProto):
allns = ['pid', 'mnt', 'ipc', 'uts', 'cgroup', 'net']
def check(self, available, testdir):
def check(self, available, temp_dir):
test_conf = {"namespaces": {"credential": True}}
module = ''
@@ -45,7 +45,7 @@ class TestFeatureIsolation(TestApplicationProto):
if not module:
return
module.testdir = testdir
module.temp_dir = temp_dir
module.load(app)
resp = module.conf(test_conf, 'applications/' + app + '/isolation')

View File

@@ -2,12 +2,14 @@ import binascii
import io
import json
import os
import pytest
import re
import select
import socket
import time
from unit.main import TestUnit
from conftest import option
class TestHTTP(TestUnit):
@@ -56,7 +58,7 @@ class TestHTTP(TestUnit):
sock.connect(connect_args)
except ConnectionRefusedError:
sock.close()
self.fail('Client can\'t connect to the server.')
pytest.fail('Client can\'t connect to the server.')
else:
sock = kwargs['sock']
@@ -128,7 +130,7 @@ class TestHTTP(TestUnit):
return (resp, sock)
def log_out(self, log, encoding):
if TestUnit.detailed:
if option.detailed:
print('>>>')
log = self.log_truncate(log)
try:
@@ -137,7 +139,7 @@ class TestHTTP(TestUnit):
print(log)
def log_in(self, log):
if TestUnit.detailed:
if option.detailed:
print('<<<')
log = self.log_truncate(log)
try:
@@ -190,7 +192,7 @@ class TestHTTP(TestUnit):
# For all current cases if the "read_timeout" was changed
# than test do not expect to get a response from server.
if timeout == timeout_default:
self.fail('Can\'t read response from server.')
pytest.fail('Can\'t read response from server.')
break
try:
@@ -243,28 +245,28 @@ class TestHTTP(TestUnit):
chunks = raw_body.split(crlf)
if len(chunks) < 3:
self.fail('Invalid chunked body')
pytest.fail('Invalid chunked body')
if chunks.pop() != b'':
self.fail('No CRLF at the end of the body')
pytest.fail('No CRLF at the end of the body')
try:
last_size = int(chunks[-2], 16)
except:
self.fail('Invalid zero size chunk')
pytest.fail('Invalid zero size chunk')
if last_size != 0 or chunks[-1] != b'':
self.fail('Incomplete body')
pytest.fail('Incomplete body')
body = b''
while len(chunks) >= 2:
try:
size = int(chunks.pop(0), 16)
except:
self.fail('Invalid chunk size %s' % str(size))
pytest.fail('Invalid chunk size %s' % str(size))
if size == 0:
self.assertEqual(len(chunks), 1, 'last zero size')
assert len(chunks) == 1, 'last zero size'
break
temp_body = crlf.join(chunks)
@@ -280,8 +282,8 @@ class TestHTTP(TestUnit):
def _parse_json(self, resp):
headers = resp['headers']
self.assertIn('Content-Type', headers)
self.assertEqual(headers['Content-Type'], 'application/json')
assert 'Content-Type' in headers
assert headers['Content-Type'] == 'application/json'
resp['body'] = json.loads(resp['body'])
@@ -305,7 +307,7 @@ class TestHTTP(TestUnit):
sock.close()
self.assertTrue(ret, 'socket connected')
assert ret, 'socket connected'
def form_encode(self, fields):
is_multipart = False
@@ -345,7 +347,7 @@ class TestHTTP(TestUnit):
datatype = value['type']
if not isinstance(value['data'], io.IOBase):
self.fail('multipart encoding of file requires a stream.')
pytest.fail('multipart encoding of file requires a stream.')
data = value['data'].read()
@@ -353,7 +355,7 @@ class TestHTTP(TestUnit):
data = value
else:
self.fail('multipart requires a string or stream data')
pytest.fail('multipart requires a string or stream data')
body += (
"--%s\r\nContent-Disposition: form-data; name=\"%s\""

View File

@@ -1,8 +1,8 @@
import argparse
import atexit
import fcntl
import os
import platform
import pytest
import re
import shutil
import signal
@@ -11,80 +11,19 @@ import subprocess
import sys
import tempfile
import time
import unittest
from conftest import option, public_dir, waitforfiles, _check_alerts, _print_log
from multiprocessing import Process
class TestUnit(unittest.TestCase):
class TestUnit():
current_dir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir)
)
pardir = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
)
is_su = os.geteuid() == 0
uid = os.geteuid()
gid = os.getegid()
architecture = platform.architecture()[0]
system = platform.system()
maxDiff = None
detailed = False
save_log = False
print_log = False
unsafe = False
def __init__(self, methodName='runTest'):
super().__init__(methodName)
if re.match(r'.*\/run\.py$', sys.argv[0]):
args, rest = TestUnit._parse_args()
TestUnit._set_args(args)
def run(self, result=None):
if not hasattr(self, 'application_type'):
return super().run(result)
# rerun test for each available module version
type = self.application_type
for module in self.prerequisites['modules']:
if module in self.available['modules']:
prereq_version = self.prerequisites['modules'][module]
available_versions = self.available['modules'][module]
if prereq_version == 'all':
for version in available_versions:
self.application_type = type + ' ' + version
super().run(result)
elif prereq_version == 'any':
self.application_type = type + ' ' + available_versions[0]
super().run(result)
else:
for version in available_versions:
if version.startswith(prereq_version):
self.application_type = type + ' ' + version
super().run(result)
@classmethod
def main(cls):
args, rest = TestUnit._parse_args()
for i, arg in enumerate(rest):
if arg[:5] == 'test_':
rest[i] = cls.__name__ + '.' + arg
sys.argv = sys.argv[:1] + rest
TestUnit._set_args(args)
unittest.main()
@classmethod
def setUpClass(cls, complete_check=True):
cls.available = {'modules': {}, 'features': {}}
def setup_class(cls, complete_check=True):
cls.available = option.available
unit = TestUnit()
unit._run()
@@ -92,7 +31,7 @@ class TestUnit(unittest.TestCase):
# read unit.log
for i in range(50):
with open(unit.testdir + '/unit.log', 'r') as f:
with open(unit.temp_dir + '/unit.log', 'r') as f:
log = f.read()
m = re.search('controller started', log)
@@ -102,7 +41,7 @@ class TestUnit(unittest.TestCase):
break
if m is None:
unit._print_log()
_print_log()
exit("Unit is writing log too long")
# discover available modules from unit.log
@@ -128,8 +67,7 @@ class TestUnit(unittest.TestCase):
missed.append(module)
if missed:
print('Unit has no ' + ', '.join(missed) + ' module(s)')
raise unittest.SkipTest()
pytest.skip('Unit has no ' + ', '.join(missed) + ' module(s)')
# check features
@@ -143,13 +81,12 @@ class TestUnit(unittest.TestCase):
missed.append(feature)
if missed:
print(', '.join(missed) + ' feature(s) not supported')
raise unittest.SkipTest()
pytest.skip(', '.join(missed) + ' feature(s) not supported')
def destroy():
unit.stop()
unit._check_alerts(log)
shutil.rmtree(unit.testdir)
_check_alerts(log)
shutil.rmtree(unit.temp_dir)
def complete():
destroy()
@@ -161,7 +98,7 @@ class TestUnit(unittest.TestCase):
unit.complete = complete
return unit
def setUp(self):
def setup_method(self):
self._run()
def _run(self):
@@ -171,82 +108,56 @@ class TestUnit(unittest.TestCase):
if not os.path.isfile(self.unitd):
exit("Could not find unit")
self.testdir = tempfile.mkdtemp(prefix='unit-test-')
self.temp_dir = tempfile.mkdtemp(prefix='unit-test-')
self.public_dir(self.testdir)
public_dir(self.temp_dir)
if oct(stat.S_IMODE(os.stat(build_dir).st_mode)) != '0o777':
self.public_dir(build_dir)
public_dir(build_dir)
os.mkdir(self.testdir + '/state')
os.mkdir(self.temp_dir + '/state')
with open(self.testdir + '/unit.log', 'w') as log:
with open(self.temp_dir + '/unit.log', 'w') as log:
self._p = subprocess.Popen(
[
self.unitd,
'--no-daemon',
'--modules', self.pardir + '/build',
'--state', self.testdir + '/state',
'--pid', self.testdir + '/unit.pid',
'--log', self.testdir + '/unit.log',
'--control', 'unix:' + self.testdir + '/control.unit.sock',
'--tmp', self.testdir,
'--state', self.temp_dir + '/state',
'--pid', self.temp_dir + '/unit.pid',
'--log', self.temp_dir + '/unit.log',
'--control', 'unix:' + self.temp_dir + '/control.unit.sock',
'--tmp', self.temp_dir,
],
stderr=log,
)
atexit.register(self.stop)
if not self.waitforfiles(self.testdir + '/control.unit.sock'):
self._print_log()
if not waitforfiles(self.temp_dir + '/control.unit.sock'):
_print_log()
exit("Could not start unit")
self._started = True
self.skip_alerts = [
r'read signalfd\(4\) failed',
r'sendmsg.+failed',
r'recvmsg.+failed',
]
self.skip_sanitizer = False
def tearDown(self):
def teardown_method(self):
self.stop()
# detect errors and failures for current test
def list2reason(exc_list):
if exc_list and exc_list[-1][0] is self:
return exc_list[-1][1]
if hasattr(self, '_outcome'):
result = self.defaultTestResult()
self._feedErrorsToResult(result, self._outcome.errors)
else:
result = getattr(
self, '_outcomeForDoCleanups', self._resultForDoCleanups
)
success = not list2reason(result.errors) and not list2reason(
result.failures
)
# check unit.log for alerts
unit_log = self.testdir + '/unit.log'
unit_log = self.temp_dir + '/unit.log'
with open(unit_log, 'r', encoding='utf-8', errors='ignore') as f:
self._check_alerts(f.read())
_check_alerts(f.read())
# remove unit.log
if not TestUnit.save_log and success:
shutil.rmtree(self.testdir)
if not option.save_log:
shutil.rmtree(self.temp_dir)
else:
self._print_log()
_print_log()
self.assertListEqual(self.stop_errors, [None, None], 'stop errors')
assert self.stop_errors == [None, None], 'stop errors'
def stop(self):
if not self._started:
@@ -301,121 +212,3 @@ class TestUnit(unittest.TestCase):
if fail:
return 'Fail to stop process'
def waitforfiles(self, *files):
for i in range(50):
wait = False
ret = False
for f in files:
if not os.path.exists(f):
wait = True
break
if wait:
time.sleep(0.1)
else:
ret = True
break
return ret
def public_dir(self, path):
os.chmod(path, 0o777)
for root, dirs, files in os.walk(path):
for d in dirs:
os.chmod(os.path.join(root, d), 0o777)
for f in files:
os.chmod(os.path.join(root, f), 0o777)
def _check_alerts(self, log):
found = False
alerts = re.findall('.+\[alert\].+', log)
if alerts:
print('All alerts/sanitizer errors found in log:')
[print(alert) for alert in alerts]
found = True
if self.skip_alerts:
for skip in self.skip_alerts:
alerts = [al for al in alerts if re.search(skip, al) is None]
if alerts:
self._print_log(log)
self.assertFalse(alerts, 'alert(s)')
if not self.skip_sanitizer:
sanitizer_errors = re.findall('.+Sanitizer.+', log)
if sanitizer_errors:
self._print_log(log)
self.assertFalse(sanitizer_errors, 'sanitizer error(s)')
if found:
print('skipped.')
@staticmethod
def _parse_args():
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument(
'-d',
'--detailed',
dest='detailed',
action='store_true',
help='Detailed output for tests',
)
parser.add_argument(
'-l',
'--log',
dest='save_log',
action='store_true',
help='Save unit.log after the test execution',
)
parser.add_argument(
'-r',
'--reprint_log',
dest='print_log',
action='store_true',
help='Print unit.log to stdout in case of errors',
)
parser.add_argument(
'-u',
'--unsafe',
dest='unsafe',
action='store_true',
help='Run unsafe tests',
)
return parser.parse_known_args()
@staticmethod
def _set_args(args):
TestUnit.detailed = args.detailed
TestUnit.save_log = args.save_log
TestUnit.print_log = args.print_log
TestUnit.unsafe = args.unsafe
# set stdout to non-blocking
if TestUnit.detailed or TestUnit.print_log:
fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, 0)
def _print_log(self, data=None):
path = self.testdir + '/unit.log'
print('Path to unit.log:\n' + path + '\n')
if TestUnit.print_log:
os.set_blocking(sys.stdout.fileno(), True)
sys.stdout.flush()
if data is None:
with open(path, 'r', encoding='utf-8', errors='ignore') as f:
shutil.copyfileobj(f, sys.stdout)
else:
sys.stdout.write(data)