Tests: style.

This commit is contained in:
Andrei Zeliankou
2021-04-05 14:03:05 +01:00
parent 46d8567dd7
commit 6c97a1a069
67 changed files with 689 additions and 614 deletions

View File

@@ -21,10 +21,14 @@ class TestApplicationTLS(TestApplicationProto):
'req',
'-x509',
'-new',
'-subj', '/CN=' + name + '/',
'-config', option.temp_dir + '/openssl.conf',
'-out', option.temp_dir + '/' + name + '.crt',
'-keyout', option.temp_dir + '/' + name + '.key',
'-subj',
'/CN=' + name + '/',
'-config',
option.temp_dir + '/openssl.conf',
'-out',
option.temp_dir + '/' + name + '.crt',
'-keyout',
option.temp_dir + '/' + name + '.key',
],
stderr=subprocess.STDOUT,
)
@@ -75,12 +79,14 @@ class TestApplicationTLS(TestApplicationProto):
a_names += "DNS.%d = %s\n" % (i, k)
# Generates section for sign request extension
a_sec = """req_extensions = myca_req_extensions
a_sec = """req_extensions = myca_req_extensions
[ myca_req_extensions ]
subjectAltName = @alt_names
{a_names}""".format(a_names=a_names)
{a_names}""".format(
a_names=a_names
)
with open(conf_path, 'w') as f:
f.write(
@@ -90,7 +96,9 @@ encrypt_key = no
distinguished_name = req_distinguished_name
{a_sec}
[ req_distinguished_name ]""".format(a_sec=a_sec if alt_names else "")
[ req_distinguished_name ]""".format(
a_sec=a_sec if alt_names else ""
)
)
def load(self, script, name=None):

View File

@@ -43,11 +43,7 @@ class TestApplicationWebsocket(TestApplicationProto):
'Sec-WebSocket-Version': 13,
}
_, sock = self.get(
headers=headers,
no_recv=True,
start=True,
)
_, sock = self.get(headers=headers, no_recv=True, start=True,)
resp = ''
while True:
@@ -57,7 +53,7 @@ class TestApplicationWebsocket(TestApplicationProto):
resp += sock.recv(4096).decode()
if (resp.startswith('HTTP/') and '\r\n\r\n' in resp):
if resp.startswith('HTTP/') and '\r\n\r\n' in resp:
resp = self._resp_to_dict(resp)
break
@@ -90,8 +86,8 @@ class TestApplicationWebsocket(TestApplicationProto):
frame = {}
head1, = struct.unpack('!B', recv_bytes(sock, 1))
head2, = struct.unpack('!B', recv_bytes(sock, 1))
(head1,) = struct.unpack('!B', recv_bytes(sock, 1))
(head2,) = struct.unpack('!B', recv_bytes(sock, 1))
frame['fin'] = bool(head1 & 0b10000000)
frame['rsv1'] = bool(head1 & 0b01000000)
@@ -103,10 +99,10 @@ class TestApplicationWebsocket(TestApplicationProto):
length = head2 & 0b01111111
if length == 126:
data = recv_bytes(sock, 2)
length, = struct.unpack('!H', data)
(length,) = struct.unpack('!H', data)
elif length == 127:
data = recv_bytes(sock, 8)
length, = struct.unpack('!Q', data)
(length,) = struct.unpack('!Q', data)
if frame['mask']:
mask_bits = recv_bytes(sock, 4)
@@ -121,7 +117,7 @@ class TestApplicationWebsocket(TestApplicationProto):
if frame['opcode'] == self.OP_CLOSE:
if length >= 2:
code, = struct.unpack('!H', data[:2])
(code,) = struct.unpack('!H', data[:2])
reason = data[2:].decode('utf-8')
if not (code in self.CLOSE_CODES or 3000 <= code < 5000):
pytest.fail('Invalid status code')

View File

@@ -12,6 +12,7 @@ from unit.utils import getns
allns = ['pid', 'mnt', 'ipc', 'uts', 'cgroup', 'net']
http = TestHTTP()
def check_isolation():
test_conf = {"namespaces": {"credential": True}}
available = option.available
@@ -117,8 +118,7 @@ def check_isolation():
"body_empty": {
"type": "perl",
"processes": {"spare": 0},
"working_directory": option.test_dir
+ "/perl/body_empty",
"working_directory": option.test_dir + "/perl/body_empty",
"script": option.test_dir + "/perl/body_empty/psgi.pl",
"isolation": {"namespaces": {"credential": True}},
}

View File

@@ -10,15 +10,16 @@ import pytest
from unit.option import option
class TestHTTP():
class TestHTTP:
def http(self, start_str, **kwargs):
sock_type = kwargs.get('sock_type', 'ipv4')
port = kwargs.get('port', 7080)
url = kwargs.get('url', '/')
http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
headers = kwargs.get('headers',
{'Host': 'localhost', 'Connection': 'close'})
headers = kwargs.get(
'headers', {'Host': 'localhost', 'Connection': 'close'}
)
body = kwargs.get('body', b'')
crlf = '\r\n'
@@ -305,8 +306,9 @@ class TestHTTP():
return body, content_type
def form_url_encode(self, fields):
data = "&".join("%s=%s" % (name, value)
for name, value in fields.items()).encode()
data = "&".join(
"%s=%s" % (name, value) for name, value in fields.items()
).encode()
return data, 'application/x-www-form-urlencoded'
def multipart_encode(self, fields):
@@ -326,7 +328,9 @@ class TestHTTP():
datatype = value['type']
if not isinstance(value['data'], io.IOBase):
pytest.fail('multipart encoding of file requires a stream.')
pytest.fail(
'multipart encoding of file requires a stream.'
)
data = value['data'].read()
@@ -336,9 +340,10 @@ class TestHTTP():
else:
pytest.fail('multipart requires a string or stream data')
body += (
"--%s\r\nContent-Disposition: form-data; name=\"%s\""
) % (boundary, field)
body += ("--%s\r\nContent-Disposition: form-data; name=\"%s\"") % (
boundary,
field,
)
if filename != '':
body += "; filename=\"%s\"" % filename

View File

@@ -1,4 +1,4 @@
class Options():
class Options:
_options = {
'skip_alerts': [],
'skip_sanitizer': False,
@@ -13,4 +13,5 @@ class Options():
raise AttributeError
option = Options()