Tests: pathlib used where appropriate
Also fixed various pylint errors and style issues.
This commit is contained in:
@@ -53,7 +53,7 @@ class ApplicationGo(ApplicationProto):
|
||||
|
||||
replace_path = f'{option.current_dir}/build/go/src/unit.nginx.org/go'
|
||||
|
||||
with open(f'{temp_dir}go.mod', 'w') as f:
|
||||
with open(f'{temp_dir}go.mod', 'w', encoding='utf-8') as f:
|
||||
f.write(
|
||||
f"""module test/app
|
||||
require unit.nginx.org/go v0.0.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
|
||||
from unit.applications.proto import ApplicationProto
|
||||
@@ -15,10 +15,9 @@ class ApplicationPHP(ApplicationProto):
|
||||
if kwargs.get('isolation') and kwargs['isolation'].get('rootfs'):
|
||||
rootfs = kwargs['isolation']['rootfs']
|
||||
|
||||
if not os.path.exists(f'{rootfs}/app/php/'):
|
||||
os.makedirs(f'{rootfs}/app/php/')
|
||||
Path(f'{rootfs}/app/php/').mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not os.path.exists(f'{rootfs}/app/php/{script}'):
|
||||
if not Path(f'{rootfs}/app/php/{script}').exists():
|
||||
shutil.copytree(script_path, f'{rootfs}/app/php/{script}')
|
||||
|
||||
script_path = f'/app/php/{script}'
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from urllib.parse import quote
|
||||
|
||||
@@ -26,10 +26,9 @@ class ApplicationPython(ApplicationProto):
|
||||
if kwargs.get('isolation') and kwargs['isolation'].get('rootfs'):
|
||||
rootfs = kwargs['isolation']['rootfs']
|
||||
|
||||
if not os.path.exists(f'{rootfs}/app/python/'):
|
||||
os.makedirs(f'{rootfs}/app/python/')
|
||||
Path(f'{rootfs}/app/python/').mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if not os.path.exists(f'{rootfs}/app/python/{name}'):
|
||||
if not Path(f'{rootfs}/app/python/{name}').exists():
|
||||
shutil.copytree(script_path, f'{rootfs}/app/python/{name}')
|
||||
|
||||
script_path = f'/app/python/{name}'
|
||||
|
||||
@@ -79,7 +79,7 @@ subjectAltName = @alt_names
|
||||
|
||||
{a_names}'''
|
||||
|
||||
with open(conf_path, 'w') as f:
|
||||
with open(conf_path, 'w', encoding='utf-8') as f:
|
||||
f.write(
|
||||
f'''[ req ]
|
||||
default_bits = 2048
|
||||
|
||||
@@ -6,6 +6,7 @@ import select
|
||||
import struct
|
||||
|
||||
import pytest
|
||||
|
||||
from unit.applications.proto import ApplicationProto
|
||||
|
||||
GUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
|
||||
@@ -69,7 +70,7 @@ class ApplicationWebsocket(ApplicationProto):
|
||||
return struct.pack('!H', code) + reason.encode('utf-8')
|
||||
|
||||
def frame_read(self, sock, read_timeout=60):
|
||||
def recv_bytes(sock, bytes):
|
||||
def recv_bytes(sock, bytes_len):
|
||||
data = b''
|
||||
while True:
|
||||
rlist = select.select([sock], [], [], read_timeout)[0]
|
||||
@@ -80,9 +81,9 @@ class ApplicationWebsocket(ApplicationProto):
|
||||
pytest.fail("Can't read response from server.")
|
||||
break
|
||||
|
||||
data += sock.recv(bytes - len(data))
|
||||
data += sock.recv(bytes_len - len(data))
|
||||
|
||||
if len(data) == bytes:
|
||||
if len(data) == bytes_len:
|
||||
break
|
||||
|
||||
return data
|
||||
@@ -206,18 +207,18 @@ class ApplicationWebsocket(ApplicationProto):
|
||||
end = frame_len
|
||||
pos = end
|
||||
|
||||
def message(self, sock, type, message, fragmention_size=None, **kwargs):
|
||||
def message(self, sock, mes_type, message, fragmention_size=None, **kwargs):
|
||||
message_len = len(message)
|
||||
|
||||
if fragmention_size is None:
|
||||
fragmention_size = message_len
|
||||
|
||||
if message_len <= fragmention_size:
|
||||
self.frame_write(sock, type, message, **kwargs)
|
||||
self.frame_write(sock, mes_type, message, **kwargs)
|
||||
return
|
||||
|
||||
pos = 0
|
||||
op_code = type
|
||||
op_code = mes_type
|
||||
while pos < message_len:
|
||||
end = min(pos + fragmention_size, message_len)
|
||||
fin = end == message_len
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import pytest
|
||||
|
||||
from unit.option import option
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import json
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
from unit.applications.lang.go import ApplicationGo
|
||||
from unit.applications.lang.java import ApplicationJava
|
||||
@@ -145,11 +145,12 @@ def check_isolation():
|
||||
|
||||
isolation = {'user': userns}
|
||||
|
||||
unp_clone_path = '/proc/sys/kernel/unprivileged_userns_clone'
|
||||
if os.path.exists(unp_clone_path):
|
||||
with open(unp_clone_path, 'r') as f:
|
||||
if str(f.read()).rstrip() == '1':
|
||||
isolation['unprivileged_userns_clone'] = True
|
||||
path_clone = Path('/proc/sys/kernel/unprivileged_userns_clone')
|
||||
if (
|
||||
path_clone.exists()
|
||||
and path_clone.read_text(encoding='utf-8').rstrip() == '1'
|
||||
):
|
||||
isolation['unprivileged_userns_clone'] = True
|
||||
|
||||
for ns in allns:
|
||||
ns_value = getns(ns)
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import os
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
|
||||
from unit.option import option
|
||||
|
||||
|
||||
def check_node():
|
||||
if not os.path.exists(f'{option.current_dir}/node/node_modules'):
|
||||
if not Path(f'{option.current_dir}/node/node_modules').exists():
|
||||
return False
|
||||
|
||||
try:
|
||||
|
||||
@@ -16,7 +16,7 @@ def args_handler(conf_func):
|
||||
elif argcount == 3:
|
||||
conf = args[0]
|
||||
|
||||
if isinstance(conf, dict) or isinstance(conf, list):
|
||||
if isinstance(conf, (dict, list)):
|
||||
conf = json.dumps(conf)
|
||||
|
||||
url = args[1] if len(args) == 2 else url_default
|
||||
|
||||
@@ -7,6 +7,7 @@ import select
|
||||
import socket
|
||||
|
||||
import pytest
|
||||
|
||||
from unit.option import option
|
||||
|
||||
|
||||
@@ -38,10 +39,7 @@ class HTTP1:
|
||||
if 'sock' not in kwargs:
|
||||
sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
|
||||
|
||||
if (
|
||||
sock_type == sock_types['ipv4']
|
||||
or sock_type == sock_types['ipv6']
|
||||
):
|
||||
if sock_type in (sock_types['ipv4'], sock_types['ipv6']):
|
||||
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
|
||||
|
||||
if 'wrapper' in kwargs:
|
||||
@@ -202,7 +200,7 @@ class HTTP1:
|
||||
|
||||
data += part
|
||||
|
||||
if not len(part):
|
||||
if not part:
|
||||
break
|
||||
|
||||
return data
|
||||
@@ -263,7 +261,7 @@ class HTTP1:
|
||||
size = int(chunks.pop(0), 16)
|
||||
|
||||
except ValueError:
|
||||
pytest.fail(f'Invalid chunk size {size}')
|
||||
pytest.fail('Invalid chunk size')
|
||||
|
||||
if size == 0:
|
||||
assert len(chunks) == 1, 'last zero size'
|
||||
|
||||
@@ -30,16 +30,16 @@ class Status:
|
||||
for k in d1
|
||||
if k in d2
|
||||
}
|
||||
else:
|
||||
return d1 - d2
|
||||
|
||||
return d1 - d2
|
||||
|
||||
return find_diffs(Status.control.conf_get('/status'), Status._status)
|
||||
|
||||
def get(path='/'):
|
||||
path = path.split('/')[1:]
|
||||
path_lst = path.split('/')[1:]
|
||||
diff = Status.diff()
|
||||
|
||||
for p in path:
|
||||
diff = diff[p]
|
||||
for part in path_lst:
|
||||
diff = diff[part]
|
||||
|
||||
return diff
|
||||
|
||||
Reference in New Issue
Block a user