Tests: more fixtures.

Common methods from applications/proto.py converted to the fixtures.
sysctl check moved to the specific file where it is using.
Some options moved to the constructor to have early access.
This commit is contained in:
Andrei Zeliankou
2023-05-29 16:45:49 +01:00
parent f55818059c
commit 31ff94add9
24 changed files with 340 additions and 341 deletions

View File

@@ -2,7 +2,6 @@ import fcntl
import inspect
import json
import os
import platform
import re
import shutil
import signal
@@ -109,8 +108,6 @@ def pytest_configure(config):
os.path.join(os.path.dirname(__file__), os.pardir)
)
option.test_dir = f'{option.current_dir}/test'
option.architecture = platform.architecture()[0]
option.system = platform.system()
option.cache_dir = tempfile.mkdtemp(prefix='unit-test-cache-')
public_dir(option.cache_dir)
@@ -173,25 +170,15 @@ def pytest_sessionstart():
[unit['unitd'], '--version'], stderr=subprocess.STDOUT
).decode()
# read unit.log
for _ in range(50):
with open(Log.get_path(), 'r') as f:
log = f.read()
m = re.search('controller started', log)
if m is None:
time.sleep(0.1)
else:
break
if m is None:
Log.print_log(log)
if not _wait_for_record(r'controller started'):
Log.print_log()
exit("Unit is writing log too long")
# discover available modules from unit.log
for module in re.findall(r'module: ([a-zA-Z]+) (.*) ".*"$', log, re.M):
for module in re.findall(
r'module: ([a-zA-Z]+) (.*) ".*"$', Log.read(), re.M
):
versions = option.available['modules'].setdefault(module[0], [])
if module[1] not in versions:
versions.append(module[1])
@@ -489,6 +476,7 @@ def _clear_conf(sock, *, log=None):
for script in scripts:
assert 'success' in delete(f'/js_modules/{script}'), 'delete script'
def _clear_temp_dir():
temp_dir = unit_instance['temp_dir']
@@ -633,6 +621,19 @@ def _count_fds(pid):
return 0
def _wait_for_record(pattern, name='unit.log', wait=150, flags=re.M):
with Log.open(name) as file:
for _ in range(wait):
found = re.search(pattern, file.read(), flags)
if found is not None:
break
time.sleep(0.1)
return found
def run_process(target, *args):
global _processes
@@ -669,6 +670,61 @@ def find_proc(name, ps_output):
return re.findall(f'{unit_instance["pid"]}.*{name}', ps_output)
def pytest_sessionfinish():
if not option.restart and option.save_log:
Log.print_path()
option.restart = True
unit_stop()
public_dir(option.cache_dir)
shutil.rmtree(option.cache_dir)
if not option.save_log and os.path.isdir(option.temp_dir):
public_dir(option.temp_dir)
shutil.rmtree(option.temp_dir)
@pytest.fixture
def date_to_sec_epoch():
def _date_to_sec_epoch(date, template='%a, %d %b %Y %X %Z'):
return time.mktime(time.strptime(date, template))
return _date_to_sec_epoch
@pytest.fixture
def findall():
def _findall(pattern, name='unit.log', flags=re.M):
return re.findall(pattern, Log.read(name), flags)
return _findall
@pytest.fixture
def is_su():
return option.is_privileged
@pytest.fixture
def is_unsafe(request):
return request.config.getoption("--unsafe")
@pytest.fixture
def search_in_file():
def _search_in_file(pattern, name='unit.log', flags=re.M):
return re.search(pattern, Log.read(name), flags)
return _search_in_file
@pytest.fixture
def sec_epoch():
return time.mktime(time.gmtime())
@pytest.fixture()
def skip_alert():
def _skip(*alerts):
@@ -687,37 +743,21 @@ def skip_fds_check():
return _skip
@pytest.fixture()
def system():
return option.system
@pytest.fixture
def temp_dir():
return unit_instance['temp_dir']
@pytest.fixture
def is_unsafe(request):
return request.config.getoption("--unsafe")
@pytest.fixture
def is_su():
return os.geteuid() == 0
@pytest.fixture
def unit_pid():
return unit_instance['process'].pid
def pytest_sessionfinish():
if not option.restart and option.save_log:
Log.print_path()
option.restart = True
unit_stop()
public_dir(option.cache_dir)
shutil.rmtree(option.cache_dir)
if not option.save_log and os.path.isdir(option.temp_dir):
public_dir(option.temp_dir)
shutil.rmtree(option.temp_dir)
@pytest.fixture
def wait_for_record():
return _wait_for_record

View File

@@ -24,10 +24,7 @@ class TestAccessLog(TestApplicationPython):
'access_log',
), 'access_log format'
def wait_for_record(self, pattern, name='access.log'):
return super().wait_for_record(pattern, name)
def test_access_log_keepalive(self):
def test_access_log_keepalive(self, wait_for_record):
self.load('mirror')
assert self.get()['status'] == 200, 'init'
@@ -43,16 +40,18 @@ class TestAccessLog(TestApplicationPython):
)
assert (
self.wait_for_record(r'"POST / HTTP/1.1" 200 5') is not None
wait_for_record(r'"POST / HTTP/1.1" 200 5', 'access.log')
is not None
), 'keepalive 1'
_ = self.post(sock=sock, body='0123456789')
assert (
self.wait_for_record(r'"POST / HTTP/1.1" 200 10') is not None
wait_for_record(r'"POST / HTTP/1.1" 200 10', 'access.log')
is not None
), 'keepalive 2'
def test_access_log_pipeline(self):
def test_access_log_pipeline(self, wait_for_record):
self.load('empty')
self.http(
@@ -75,19 +74,25 @@ Connection: close
)
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "Referer-1" "-"')
wait_for_record(
r'"GET / HTTP/1.1" 200 0 "Referer-1" "-"', 'access.log'
)
is not None
), 'pipeline 1'
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "Referer-2" "-"')
wait_for_record(
r'"GET / HTTP/1.1" 200 0 "Referer-2" "-"', 'access.log'
)
is not None
), 'pipeline 2'
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "Referer-3" "-"')
wait_for_record(
r'"GET / HTTP/1.1" 200 0 "Referer-3" "-"', 'access.log'
)
is not None
), 'pipeline 3'
def test_access_log_ipv6(self):
def test_access_log_ipv6(self, wait_for_record):
self.load('empty')
assert 'success' in self.conf(
@@ -97,13 +102,13 @@ Connection: close
self.get(sock_type='ipv6')
assert (
self.wait_for_record(
r'::1 - - \[.+\] "GET / HTTP/1.1" 200 0 "-" "-"'
wait_for_record(
r'::1 - - \[.+\] "GET / HTTP/1.1" 200 0 "-" "-"', 'access.log'
)
is not None
), 'ipv6'
def test_access_log_unix(self, temp_dir):
def test_access_log_unix(self, temp_dir, wait_for_record):
self.load('empty')
addr = f'{temp_dir}/sock'
@@ -115,13 +120,13 @@ Connection: close
self.get(sock_type='unix', addr=addr)
assert (
self.wait_for_record(
r'unix: - - \[.+\] "GET / HTTP/1.1" 200 0 "-" "-"'
wait_for_record(
r'unix: - - \[.+\] "GET / HTTP/1.1" 200 0 "-" "-"', 'access.log'
)
is not None
), 'unix'
def test_access_log_referer(self):
def test_access_log_referer(self, wait_for_record):
self.load('empty')
self.get(
@@ -133,11 +138,13 @@ Connection: close
)
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "referer-value" "-"')
wait_for_record(
r'"GET / HTTP/1.1" 200 0 "referer-value" "-"', 'access.log'
)
is not None
), 'referer'
def test_access_log_user_agent(self):
def test_access_log_user_agent(self, wait_for_record):
self.load('empty')
self.get(
@@ -149,22 +156,23 @@ Connection: close
)
assert (
self.wait_for_record(
r'"GET / HTTP/1.1" 200 0 "-" "user-agent-value"'
wait_for_record(
r'"GET / HTTP/1.1" 200 0 "-" "user-agent-value"', 'access.log'
)
is not None
), 'user agent'
def test_access_log_http10(self):
def test_access_log_http10(self, wait_for_record):
self.load('empty')
self.get(http_10=True)
assert (
self.wait_for_record(r'"GET / HTTP/1.0" 200 0 "-" "-"') is not None
wait_for_record(r'"GET / HTTP/1.0" 200 0 "-" "-"', 'access.log')
is not None
), 'http 1.0'
def test_access_log_partial(self):
def test_access_log_partial(self, wait_for_record):
self.load('empty')
assert self.post()['status'] == 200, 'init'
@@ -174,10 +182,10 @@ Connection: close
time.sleep(1)
assert (
self.wait_for_record(r'"-" 400 0 "-" "-"') is not None
wait_for_record(r'"-" 400 0 "-" "-"', 'access.log') is not None
), 'partial'
def test_access_log_partial_2(self):
def test_access_log_partial_2(self, wait_for_record):
self.load('empty')
assert self.post()['status'] == 200, 'init'
@@ -185,10 +193,10 @@ Connection: close
self.http(b"""GET /\n""", raw=True)
assert (
self.wait_for_record(r'"-" 400 \d+ "-" "-"') is not None
wait_for_record(r'"-" 400 \d+ "-" "-"', 'access.log') is not None
), 'partial 2'
def test_access_log_partial_3(self):
def test_access_log_partial_3(self, wait_for_record):
self.load('empty')
assert self.post()['status'] == 200, 'init'
@@ -198,10 +206,10 @@ Connection: close
time.sleep(1)
assert (
self.wait_for_record(r'"-" 400 0 "-" "-"') is not None
wait_for_record(r'"-" 400 0 "-" "-"', 'access.log') is not None
), 'partial 3'
def test_access_log_partial_4(self):
def test_access_log_partial_4(self, wait_for_record):
self.load('empty')
assert self.post()['status'] == 200, 'init'
@@ -211,11 +219,11 @@ Connection: close
time.sleep(1)
assert (
self.wait_for_record(r'"-" 400 0 "-" "-"') is not None
wait_for_record(r'"-" 400 0 "-" "-"', 'access.log') is not None
), 'partial 4'
@pytest.mark.skip('not yet')
def test_access_log_partial_5(self):
def test_access_log_partial_5(self, wait_for_record):
self.load('empty')
assert self.post()['status'] == 200, 'init'
@@ -223,32 +231,32 @@ Connection: close
self.get(headers={'Connection': 'close'})
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 400 \d+ "-" "-"')
wait_for_record(r'"GET / HTTP/1.1" 400 \d+ "-" "-"', 'access.log')
is not None
), 'partial 5'
def test_access_log_get_parameters(self):
def test_access_log_get_parameters(self, wait_for_record):
self.load('empty')
self.get(url='/?blah&var=val')
assert (
self.wait_for_record(
r'"GET /\?blah&var=val HTTP/1.1" 200 0 "-" "-"'
wait_for_record(
r'"GET /\?blah&var=val HTTP/1.1" 200 0 "-" "-"', 'access.log'
)
is not None
), 'get parameters'
def test_access_log_delete(self):
def test_access_log_delete(self, search_in_file):
self.load('empty')
assert 'success' in self.conf_delete('access_log')
self.get(url='/delete')
assert self.search_in_log(r'/delete', 'access.log') is None, 'delete'
assert search_in_file(r'/delete', 'access.log') is None, 'delete'
def test_access_log_change(self, temp_dir):
def test_access_log_change(self, temp_dir, wait_for_record):
self.load('empty')
self.get()
@@ -258,24 +266,24 @@ Connection: close
self.get()
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', 'new.log')
wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', 'new.log')
is not None
), 'change'
def test_access_log_format(self):
def test_access_log_format(self, wait_for_record):
self.load('empty')
def check_format(format, expect, url='/'):
self.set_format(format)
assert self.get(url=url)['status'] == 200
assert self.wait_for_record(expect) is not None, 'found'
assert wait_for_record(expect, 'access.log') is not None, 'found'
format = 'BLAH\t0123456789'
check_format(format, format)
check_format('$uri $status $uri $status', '/ 200 / 200')
def test_access_log_variables(self):
def test_access_log_variables(self, wait_for_record):
self.load('mirror')
# $body_bytes_sent
@@ -284,7 +292,7 @@ Connection: close
body = '0123456789' * 50
self.post(url='/bbs', body=body, read_timeout=1)
assert (
self.wait_for_record(fr'^\/bbs {len(body)}$') is not None
wait_for_record(fr'^\/bbs {len(body)}$', 'access.log') is not None
), '$body_bytes_sent'
def test_access_log_incorrect(self, temp_dir, skip_alert):

View File

@@ -14,7 +14,7 @@ class TestASGIApplication(TestApplicationPython):
}
load_module = 'asgi'
def test_asgi_application_variables(self):
def test_asgi_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -40,9 +40,7 @@ custom-header: BLAH
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
assert headers == {
'Connection': 'close',
@@ -382,7 +380,7 @@ Connection: close
assert self.get()['status'] == 503, 'loading error'
def test_asgi_application_threading(self):
def test_asgi_application_threading(self, wait_for_record):
"""wait_for_record() timeouts after 5s while every thread works at
least 3s. So without releasing GIL test should fail.
"""
@@ -393,7 +391,7 @@ Connection: close
self.get(no_recv=True)
assert (
self.wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
), 'last thread finished'
def test_asgi_application_threads(self):

View File

@@ -98,27 +98,27 @@ class TestASGILifespan(TestApplicationPython):
self.assert_cookies('')
self.assert_cookies('app2_')
def test_asgi_lifespan_failed(self):
def test_asgi_lifespan_failed(self, wait_for_record):
self.load('lifespan/failed')
assert self.get()['status'] == 503
assert (
self.wait_for_record(r'\[error\].*Application startup failed')
wait_for_record(r'\[error\].*Application startup failed')
is not None
), 'error message'
assert self.wait_for_record(r'Exception blah') is not None, 'exception'
assert wait_for_record(r'Exception blah') is not None, 'exception'
def test_asgi_lifespan_error(self):
def test_asgi_lifespan_error(self, wait_for_record):
self.load('lifespan/error')
self.get()
assert self.wait_for_record(r'Exception blah') is not None, 'exception'
assert wait_for_record(r'Exception blah') is not None, 'exception'
def test_asgi_lifespan_error_auto(self):
def test_asgi_lifespan_error_auto(self, wait_for_record):
self.load('lifespan/error_auto')
self.get()
assert self.wait_for_record(r'AssertionError') is not None, 'assertion'
assert wait_for_record(r'AssertionError') is not None, 'assertion'

View File

@@ -5,7 +5,6 @@ import pytest
from packaging import version
from unit.applications.lang.python import TestApplicationPython
from unit.applications.websockets import TestApplicationWebsocket
from unit.option import option
class TestASGIWebsockets(TestApplicationPython):
@@ -1314,7 +1313,7 @@ class TestASGIWebsockets(TestApplicationPython):
self.ws.frame_write(sock, self.ws.OP_CLOSE, payload)
self.check_close(sock, 1002)
def test_asgi_websockets_9_1_1__9_6_6(self, is_unsafe):
def test_asgi_websockets_9_1_1__9_6_6(self, is_unsafe, system):
if not is_unsafe:
pytest.skip('unsafe, long run')
@@ -1371,7 +1370,7 @@ class TestASGIWebsockets(TestApplicationPython):
check_payload(op_binary, 8 * 2**20) # 9_2_5
check_payload(op_binary, 16 * 2**20) # 9_2_6
if option.system != 'Darwin' and option.system != 'FreeBSD':
if system not in ['Darwin', 'FreeBSD']:
check_message(op_text, 64) # 9_3_1
check_message(op_text, 256) # 9_3_2
check_message(op_text, 2**10) # 9_3_3

View File

@@ -2,7 +2,6 @@ import socket
import pytest
from unit.control import TestControl
from unit.option import option
class TestConfiguration(TestControl):
@@ -227,8 +226,8 @@ class TestConfiguration(TestControl):
{"*:7080": {"pass": "applications/app"}}, 'listeners'
), 'listeners no app'
def test_listeners_unix_abstract(self):
if option.system != 'Linux':
def test_listeners_unix_abstract(self, system):
if system != 'Linux':
assert 'error' in self.try_addr("unix:@sock"), 'abstract at'
pytest.skip('not yet')

View File

@@ -11,7 +11,7 @@ class TestGoApplication(TestApplicationGo):
def setup_method_fixture(self, skip_alert):
skip_alert(r'\[unit\] close\(\d+\) failed: Bad file descriptor')
def test_go_application_variables(self):
def test_go_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -33,9 +33,7 @@ class TestGoApplication(TestApplicationGo):
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
assert headers == {
'Content-Length': str(len(body)),

View File

@@ -128,7 +128,9 @@ class TestJavaApplication(TestApplicationJava):
assert headers['X-Session-New'] == 'false', 'session resume'
assert session_id == headers['X-Session-Id'], 'session same id'
def test_java_application_session_active(self):
def test_java_application_session_active(
self, date_to_sec_epoch, sec_epoch
):
self.load('session_inactive')
resp = self.get(
@@ -144,10 +146,8 @@ class TestJavaApplication(TestApplicationJava):
assert resp['headers']['X-Session-Interval'] == '4', 'session interval'
assert (
abs(
self.date_to_sec_epoch(
resp['headers']['X-Session-Last-Access-Time']
)
- self.sec_epoch()
date_to_sec_epoch(resp['headers']['X-Session-Last-Access-Time'])
- sec_epoch
)
< 5
), 'session last access time'
@@ -943,7 +943,7 @@ class TestJavaApplication(TestApplicationJava):
), 'set date header'
assert headers['X-Get-Date'] == date, 'get date header'
def test_java_application_multipart(self, temp_dir):
def test_java_application_multipart(self, search_in_file, temp_dir):
self.load('multipart')
reldst = '/uploads'
@@ -979,7 +979,7 @@ class TestJavaApplication(TestApplicationJava):
assert resp['status'] == 200, 'multipart status'
assert re.search(r'sample\.txt created', resp['body']), 'multipart body'
assert (
self.search_in_log(
search_in_file(
r'^Data from sample file$', name=f'{reldst}/sample.txt'
)
is not None

View File

@@ -4,7 +4,6 @@ import time
import pytest
from unit.applications.lang.java import TestApplicationJava
from unit.applications.websockets import TestApplicationWebsocket
from unit.option import option
class TestJavaWebsockets(TestApplicationJava):
@@ -1241,7 +1240,7 @@ class TestJavaWebsockets(TestApplicationJava):
self.ws.frame_write(sock, self.ws.OP_CLOSE, payload)
self.check_close(sock, 1002)
def test_java_websockets_9_1_1__9_6_6(self, is_unsafe):
def test_java_websockets_9_1_1__9_6_6(self, is_unsafe, system):
if not is_unsafe:
pytest.skip('unsafe, long run')
@@ -1298,7 +1297,7 @@ class TestJavaWebsockets(TestApplicationJava):
check_payload(op_binary, 8 * 2**20) # 9_2_5
check_payload(op_binary, 16 * 2**20) # 9_2_6
if option.system != 'Darwin' and option.system != 'FreeBSD':
if system not in ['Darwin', 'FreeBSD']:
check_message(op_text, 64) # 9_3_1
check_message(op_text, 256) # 9_3_2
check_message(op_text, 2**10) # 9_3_3

View File

@@ -34,7 +34,7 @@ class TestNodeApplication(TestApplicationNode):
assert self.get()['status'] == 200, 'seq'
assert self.get()['status'] == 200, 'seq 2'
def test_node_application_variables(self):
def test_node_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -56,9 +56,7 @@ class TestNodeApplication(TestApplicationNode):
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
raw_headers = headers.pop('Request-Raw-Headers')
assert re.search(

View File

@@ -4,7 +4,6 @@ import time
import pytest
from unit.applications.lang.node import TestApplicationNode
from unit.applications.websockets import TestApplicationWebsocket
from unit.option import option
class TestNodeWebsockets(TestApplicationNode):
@@ -1260,7 +1259,7 @@ class TestNodeWebsockets(TestApplicationNode):
self.ws.frame_write(sock, self.ws.OP_CLOSE, payload)
self.check_close(sock, 1002)
def test_node_websockets_9_1_1__9_6_6(self, is_unsafe):
def test_node_websockets_9_1_1__9_6_6(self, is_unsafe, system):
if not is_unsafe:
pytest.skip('unsafe, long run')
@@ -1317,7 +1316,7 @@ class TestNodeWebsockets(TestApplicationNode):
check_payload(op_binary, 8 * 2**20) # 9_2_5
check_payload(op_binary, 16 * 2**20) # 9_2_6
if option.system != 'Darwin' and option.system != 'FreeBSD':
if system not in ['Darwin', 'FreeBSD']:
check_message(op_text, 64) # 9_3_1
check_message(op_text, 256) # 9_3_2
check_message(op_text, 2**10) # 9_3_3

View File

@@ -7,7 +7,7 @@ from unit.applications.lang.perl import TestApplicationPerl
class TestPerlApplication(TestApplicationPerl):
prerequisites = {'modules': {'perl': 'all'}}
def test_perl_application(self):
def test_perl_application(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -32,9 +32,7 @@ class TestPerlApplication(TestApplicationPerl):
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
assert headers == {
'Connection': 'close',
@@ -128,13 +126,13 @@ class TestPerlApplication(TestApplicationPerl):
body = '0123456789'
assert self.post(body=body)['body'] == body, 'input copy'
def test_perl_application_errors_print(self):
def test_perl_application_errors_print(self, wait_for_record):
self.load('errors_print')
assert self.get()['body'] == '1', 'errors result'
assert (
self.wait_for_record(r'\[error\].+Error in application') is not None
wait_for_record(r'\[error\].+Error in application') is not None
), 'errors print'
def test_perl_application_header_equal_names(self):
@@ -223,19 +221,18 @@ class TestPerlApplication(TestApplicationPerl):
assert resp['body'] == body, 'keep-alive 2'
def test_perl_body_io_fake(self):
def test_perl_body_io_fake(self, wait_for_record):
self.load('body_io_fake')
assert self.get()['body'] == '21', 'body io fake'
assert (
self.wait_for_record(r'\[error\].+IOFake getline\(\) \$\/ is \d+')
wait_for_record(r'\[error\].+IOFake getline\(\) \$\/ is \d+')
is not None
), 'body io fake $/ value'
assert (
self.wait_for_record(r'\[error\].+IOFake close\(\) called')
is not None
wait_for_record(r'\[error\].+IOFake close\(\) called') is not None
), 'body io fake close'
def test_perl_delayed_response(self):

View File

@@ -53,7 +53,7 @@ opcache.preload_user = {option.user or getpass.getuser()}
'applications/opcache/options',
)
def test_php_application_variables(self):
def test_php_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -79,9 +79,7 @@ opcache.preload_user = {option.user or getpass.getuser()}
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
if 'X-Powered-By' in headers:
headers.pop('X-Powered-By')
@@ -116,7 +114,7 @@ opcache.preload_user = {option.user or getpass.getuser()}
assert resp['status'] == 200, 'query string empty status'
assert resp['headers']['Query-String'] == '', 'query string empty'
def test_php_application_fastcgi_finish_request(self, unit_pid):
def test_php_application_fastcgi_finish_request(self, findall, unit_pid):
self.load('fastcgi_finish_request')
assert 'success' in self.conf(
@@ -128,11 +126,11 @@ opcache.preload_user = {option.user or getpass.getuser()}
os.kill(unit_pid, signal.SIGUSR1)
errs = self.findall(r'Error in fastcgi_finish_request')
errs = findall(r'Error in fastcgi_finish_request')
assert len(errs) == 0, 'no error'
def test_php_application_fastcgi_finish_request_2(self, unit_pid):
def test_php_application_fastcgi_finish_request_2(self, findall, unit_pid):
self.load('fastcgi_finish_request')
assert 'success' in self.conf(
@@ -146,7 +144,7 @@ opcache.preload_user = {option.user or getpass.getuser()}
os.kill(unit_pid, signal.SIGUSR1)
errs = self.findall(r'Error in fastcgi_finish_request')
errs = findall(r'Error in fastcgi_finish_request')
assert len(errs) == 0, 'no error'
@@ -556,7 +554,7 @@ opcache.preload_user = {option.user or getpass.getuser()}
r'012345', self.get()['body']
), 'disable_classes before'
def test_php_application_error_log(self):
def test_php_application_error_log(self, findall, wait_for_record):
self.load('error_log')
assert self.get()['status'] == 200, 'status'
@@ -567,9 +565,9 @@ opcache.preload_user = {option.user or getpass.getuser()}
pattern = r'\d{4}\/\d\d\/\d\d\s\d\d:.+\[notice\].+Error in application'
assert self.wait_for_record(pattern) is not None, 'errors print'
assert wait_for_record(pattern) is not None, 'errors print'
errs = self.findall(pattern)
errs = findall(pattern)
assert len(errs) == 2, 'error_log count'

View File

@@ -14,7 +14,7 @@ from unit.applications.lang.python import TestApplicationPython
class TestPythonApplication(TestApplicationPython):
prerequisites = {'modules': {'python': 'all'}}
def test_python_application_variables(self):
def test_python_application_variables(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -43,9 +43,7 @@ custom-header: BLAH
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
assert headers == {
'Connection': 'close',
@@ -175,7 +173,7 @@ custom-header: BLAH
'Transfer-Encoding' not in self.get()['headers']
), '204 header transfer encoding'
def test_python_application_ctx_iter_atexit(self):
def test_python_application_ctx_iter_atexit(self, wait_for_record):
self.load('ctx_iter_atexit')
resp = self.post(body='0123456789')
@@ -185,9 +183,7 @@ custom-header: BLAH
assert 'success' in self.conf({"listeners": {}, "applications": {}})
assert (
self.wait_for_record(r'RuntimeError') is not None
), 'ctx iter atexit'
assert wait_for_record(r'RuntimeError') is not None, 'ctx iter atexit'
def test_python_keepalive_body(self):
self.load('mirror')
@@ -297,14 +293,14 @@ custom-header: BLAH
assert resp == {}, 'reconfigure 2 keep-alive 3'
def test_python_atexit(self):
def test_python_atexit(self, wait_for_record):
self.load('atexit')
self.get()
assert 'success' in self.conf({"listeners": {}, "applications": {}})
assert self.wait_for_record(r'At exit called\.') is not None, 'atexit'
assert wait_for_record(r'At exit called\.') is not None, 'atexit'
def test_python_process_switch(self):
self.load('delayed', processes=2)
@@ -456,14 +452,13 @@ last line: 987654321
assert resp['body'] == body, 'input read length negative'
@pytest.mark.skip('not yet')
def test_python_application_errors_write(self):
def test_python_application_errors_write(self, wait_for_record):
self.load('errors_write')
self.get()
assert (
self.wait_for_record(r'\[error\].+Error in application\.')
is not None
wait_for_record(r'\[error\].+Error in application\.') is not None
), 'errors write'
def test_python_application_body_array(self):
@@ -495,29 +490,27 @@ last line: 987654321
assert self.get()['status'] == 503, 'loading error'
def test_python_application_close(self):
def test_python_application_close(self, wait_for_record):
self.load('close')
self.get()
assert self.wait_for_record(r'Close called\.') is not None, 'close'
assert wait_for_record(r'Close called\.') is not None, 'close'
def test_python_application_close_error(self):
def test_python_application_close_error(self, wait_for_record):
self.load('close_error')
self.get()
assert (
self.wait_for_record(r'Close called\.') is not None
), 'close error'
assert wait_for_record(r'Close called\.') is not None, 'close error'
def test_python_application_not_iterable(self):
def test_python_application_not_iterable(self, wait_for_record):
self.load('not_iterable')
self.get()
assert (
self.wait_for_record(
wait_for_record(
r'\[error\].+the application returned not an iterable object'
)
is not None
@@ -603,7 +596,7 @@ last line: 987654321
== 200
)
def test_python_application_threading(self):
def test_python_application_threading(self, wait_for_record):
"""wait_for_record() timeouts after 5s while every thread works at
least 3s. So without releasing GIL test should fail.
"""
@@ -614,10 +607,10 @@ last line: 987654321
self.get(no_recv=True)
assert (
self.wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
wait_for_record(r'\(5\) Thread: 100', wait=50) is not None
), 'last thread finished'
def test_python_application_iter_exception(self):
def test_python_application_iter_exception(self, findall, wait_for_record):
self.load('iter_exception')
# Default request doesn't lead to the exception.
@@ -637,12 +630,11 @@ last line: 987654321
assert self.get()['status'] == 503, 'error'
assert self.wait_for_record(r'Traceback') is not None, 'traceback'
assert wait_for_record(r'Traceback') is not None, 'traceback'
assert (
self.wait_for_record(r"raise Exception\('first exception'\)")
is not None
wait_for_record(r"raise Exception\('first exception'\)") is not None
), 'first exception raise'
assert len(self.findall(r'Traceback')) == 1, 'traceback count 1'
assert len(findall(r'Traceback')) == 1, 'traceback count 1'
# Exception after start_response(), before first write().
@@ -658,10 +650,10 @@ last line: 987654321
), 'error 2'
assert (
self.wait_for_record(r"raise Exception\('second exception'\)")
wait_for_record(r"raise Exception\('second exception'\)")
is not None
), 'exception raise second'
assert len(self.findall(r'Traceback')) == 2, 'traceback count 2'
assert len(findall(r'Traceback')) == 2, 'traceback count 2'
# Exception after first write(), before first __next__().
@@ -675,10 +667,9 @@ last line: 987654321
)
assert (
self.wait_for_record(r"raise Exception\('third exception'\)")
is not None
wait_for_record(r"raise Exception\('third exception'\)") is not None
), 'exception raise third'
assert len(self.findall(r'Traceback')) == 3, 'traceback count 3'
assert len(findall(r'Traceback')) == 3, 'traceback count 3'
assert self.get(sock=sock) == {}, 'closed connection'
@@ -696,7 +687,7 @@ last line: 987654321
)
if resp:
assert resp[-5:] != '0\r\n\r\n', 'incomplete body'
assert len(self.findall(r'Traceback')) == 4, 'traceback count 4'
assert len(findall(r'Traceback')) == 4, 'traceback count 4'
# Exception in __next__().
@@ -710,10 +701,9 @@ last line: 987654321
)
assert (
self.wait_for_record(r"raise Exception\('next exception'\)")
is not None
wait_for_record(r"raise Exception\('next exception'\)") is not None
), 'exception raise next'
assert len(self.findall(r'Traceback')) == 5, 'traceback count 5'
assert len(findall(r'Traceback')) == 5, 'traceback count 5'
assert self.get(sock=sock) == {}, 'closed connection 2'
@@ -730,7 +720,7 @@ last line: 987654321
)
if resp:
assert resp[-5:] != '0\r\n\r\n', 'incomplete body 2'
assert len(self.findall(r'Traceback')) == 6, 'traceback count 6'
assert len(findall(r'Traceback')) == 6, 'traceback count 6'
# Exception before start_response() and in close().
@@ -746,10 +736,9 @@ last line: 987654321
), 'error'
assert (
self.wait_for_record(r"raise Exception\('close exception'\)")
is not None
wait_for_record(r"raise Exception\('close exception'\)") is not None
), 'exception raise close'
assert len(self.findall(r'Traceback')) == 8, 'traceback count 8'
assert len(findall(r'Traceback')) == 8, 'traceback count 8'
def test_python_user_group(self, is_su):
if not is_su:

View File

@@ -37,14 +37,11 @@ class TestRewrite(TestApplicationProto):
'routes',
)
def test_rewrite(self):
def test_rewrite(self, findall, wait_for_record):
assert self.get()['status'] == 200
assert (
self.wait_for_record(rf'\[notice\].*"routes/1" selected')
is not None
)
assert len(self.findall(rf'\[notice\].*URI rewritten to "/new"')) == 1
assert len(self.findall(rf'\[notice\].*URI rewritten')) == 1
assert wait_for_record(rf'\[notice\].*"routes/1" selected') is not None
assert len(findall(rf'\[notice\].*URI rewritten to "/new"')) == 1
assert len(findall(rf'\[notice\].*URI rewritten')) == 1
self.set_rewrite("", "")
assert self.get()['status'] == 200

View File

@@ -8,7 +8,7 @@ from unit.applications.lang.ruby import TestApplicationRuby
class TestRubyApplication(TestApplicationRuby):
prerequisites = {'modules': {'ruby': 'all'}}
def test_ruby_application(self):
def test_ruby_application(self, date_to_sec_epoch, sec_epoch):
self.load('variables')
body = 'Test body string.'
@@ -33,9 +33,7 @@ class TestRubyApplication(TestApplicationRuby):
date = headers.pop('Date')
assert date[-4:] == ' GMT', 'date header timezone'
assert (
abs(self.date_to_sec_epoch(date) - self.sec_epoch()) < 5
), 'date header'
assert abs(date_to_sec_epoch(date) - sec_epoch) < 5, 'date header'
assert headers == {
'Connection': 'close',
@@ -170,30 +168,30 @@ class TestRubyApplication(TestApplicationRuby):
assert self.get()['status'] == 500, 'syntax error'
def test_ruby_application_errors_puts(self):
def test_ruby_application_errors_puts(self, wait_for_record):
self.load('errors_puts')
assert self.get()['status'] == 200
assert (
self.wait_for_record(r'\[error\].+Error in application') is not None
wait_for_record(r'\[error\].+Error in application') is not None
), 'errors puts'
def test_ruby_application_errors_puts_int(self):
def test_ruby_application_errors_puts_int(self, wait_for_record):
self.load('errors_puts_int')
assert self.get()['status'] == 200
assert (
self.wait_for_record(r'\[error\].+1234567890') is not None
wait_for_record(r'\[error\].+1234567890') is not None
), 'errors puts int'
def test_ruby_application_errors_write(self):
def test_ruby_application_errors_write(self, wait_for_record):
self.load('errors_write')
assert self.get()['status'] == 200
assert (
self.wait_for_record(r'\[error\].+Error in application') is not None
wait_for_record(r'\[error\].+Error in application') is not None
), 'errors write'
def test_ruby_application_errors_write_to_s_custom(self):
@@ -201,15 +199,15 @@ class TestRubyApplication(TestApplicationRuby):
assert self.get()['status'] == 200, 'errors write to_s custom'
def test_ruby_application_errors_write_int(self):
def test_ruby_application_errors_write_int(self, wait_for_record):
self.load('errors_write_int')
assert self.get()['status'] == 200
assert (
self.wait_for_record(r'\[error\].+1234567890') is not None
wait_for_record(r'\[error\].+1234567890') is not None
), 'errors write int'
def test_ruby_application_at_exit(self):
def test_ruby_application_at_exit(self, wait_for_record):
self.load('at_exit')
assert self.get()['status'] == 200
@@ -217,7 +215,7 @@ class TestRubyApplication(TestApplicationRuby):
assert 'success' in self.conf({"listeners": {}, "applications": {}})
assert (
self.wait_for_record(r'\[error\].+At exit called\.') is not None
wait_for_record(r'\[error\].+At exit called\.') is not None
), 'at exit'
def test_ruby_application_encoding(self):
@@ -322,14 +320,13 @@ class TestRubyApplication(TestApplicationRuby):
assert self.post(body=body)['body'] == body, 'body large'
@pytest.mark.skip('not yet')
def test_ruby_application_body_each_error(self):
def test_ruby_application_body_each_error(self, wait_for_record):
self.load('body_each_error')
assert self.get()['status'] == 500, 'body each error status'
assert (
self.wait_for_record(r'\[error\].+Failed to run ruby script')
is not None
wait_for_record(r'\[error\].+Failed to run ruby script') is not None
), 'body each error'
def test_ruby_application_body_file(self):

View File

@@ -1,15 +1,25 @@
import re
import socket
import subprocess
import time
import pytest
from unit.applications.lang.python import TestApplicationPython
from unit.utils import sysctl
class TestSettings(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}}
def sysctl(self):
try:
out = subprocess.check_output(
['sysctl', '-a'], stderr=subprocess.STDOUT
).decode()
except FileNotFoundError:
pytest.skip('requires sysctl')
return out
def test_settings_large_header_buffer_size(self):
self.load('empty')
@@ -263,7 +273,7 @@ Connection: close
return data
sysctl_out = sysctl()
sysctl_out = self.sysctl()
values = re.findall(
r'net.core.[rw]mem_(?:max|default).*?(\d+)', sysctl_out
)
@@ -409,15 +419,15 @@ Connection: close
assert resp['status'] == 200, 'status 4'
assert resp['body'] == body, 'body 4'
def test_settings_log_route(self):
def test_settings_log_route(self, findall, search_in_file, wait_for_record):
def count_fallbacks():
return len(self.findall(r'"fallback" taken'))
return len(findall(r'"fallback" taken'))
def check_record(template):
assert self.search_in_log(template) is not None
assert search_in_file(template) is not None
def check_no_record(template):
assert self.search_in_log(template) is None
assert search_in_file(template) is None
def template_req_line(url):
return rf'\[notice\].*http request line "GET {url} HTTP/1\.1"'
@@ -430,8 +440,8 @@ Connection: close
def wait_for_request_log(status, uri, route):
assert self.get(url=uri)['status'] == status
assert self.wait_for_record(template_req_line(uri)) is not None
assert self.wait_for_record(template_selected(route)) is not None
assert wait_for_record(template_req_line(uri)) is not None
assert wait_for_record(template_selected(route)) is not None
# routes array
@@ -559,6 +569,6 @@ Connection: close
# total
assert len(self.findall(r'\[notice\].*http request line')) == 7
assert len(self.findall(r'\[notice\].*selected')) == 10
assert len(self.findall(r'\[info\].*discarded')) == 2
assert len(findall(r'\[notice\].*http request line')) == 7
assert len(findall(r'\[notice\].*selected')) == 10
assert len(findall(r'\[info\].*discarded')) == 2

View File

@@ -11,9 +11,6 @@ from unit.option import option
class TestTLS(TestApplicationTLS):
prerequisites = {'modules': {'python': 'any', 'openssl': 'any'}}
def openssl_date_to_sec_epoch(self, date):
return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')
def add_tls(self, application='empty', cert='default', port=7080):
assert 'success' in self.conf(
{
@@ -254,8 +251,9 @@ basicConstraints = critical,CA:TRUE"""
self.conf_get('/certificates/ec/key') == 'ECDH'
), 'certificate key ec'
def test_tls_certificate_chain_options(self):
def test_tls_certificate_chain_options(self, date_to_sec_epoch, sec_epoch):
self.load('empty')
date_format = '%b %d %X %Y %Z'
self.certificate()
@@ -274,14 +272,14 @@ basicConstraints = critical,CA:TRUE"""
assert (
abs(
self.sec_epoch()
- self.openssl_date_to_sec_epoch(cert['validity']['since'])
sec_epoch
- date_to_sec_epoch(cert['validity']['since'], date_format)
)
< 60
), 'certificate validity since'
assert (
self.openssl_date_to_sec_epoch(cert['validity']['until'])
- self.openssl_date_to_sec_epoch(cert['validity']['since'])
date_to_sec_epoch(cert['validity']['until'], date_format)
- date_to_sec_epoch(cert['validity']['since'], date_format)
== 2592000
), 'certificate validity until'
@@ -583,7 +581,9 @@ basicConstraints = critical,CA:TRUE"""
'/certificates'
), 'remove all certificates'
def test_tls_application_respawn(self, skip_alert):
def test_tls_application_respawn(
self, findall, skip_alert, wait_for_record
):
self.load('mirror')
self.certificate()
@@ -602,13 +602,13 @@ basicConstraints = critical,CA:TRUE"""
read_timeout=1,
)
app_id = self.findall(r'(\d+)#\d+ "mirror" application started')[0]
app_id = findall(r'(\d+)#\d+ "mirror" application started')[0]
subprocess.check_output(['kill', '-9', app_id])
skip_alert(fr'process {app_id} exited on signal 9')
self.wait_for_record(
wait_for_record(
fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started'
)

View File

@@ -19,9 +19,6 @@ class TestTLSSNI(TestApplicationTLS):
}
)
def openssl_date_to_sec_epoch(self, date):
return self.date_to_sec_epoch(date, '%b %d %X %Y %Z')
def add_tls(self, cert='default'):
assert 'success' in self.conf(
{"pass": "routes", "tls": {"certificate": cert}},

View File

@@ -9,7 +9,9 @@ from unit.utils import waitforfiles
class TestUSR1(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}}
def test_usr1_access_log(self, temp_dir, unit_pid):
def test_usr1_access_log(
self, search_in_file, temp_dir, unit_pid, wait_for_record
):
self.load('empty')
log = 'access.log'
@@ -27,7 +29,7 @@ class TestUSR1(TestApplicationPython):
assert self.get()['status'] == 200
assert (
self.wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', log_new)
wait_for_record(r'"GET / HTTP/1.1" 200 0 "-" "-"', log_new)
is not None
), 'rename new'
assert not os.path.isfile(log_path), 'rename old'
@@ -39,12 +41,14 @@ class TestUSR1(TestApplicationPython):
assert self.get(url='/usr1')['status'] == 200
assert (
self.wait_for_record(r'"GET /usr1 HTTP/1.1" 200 0 "-" "-"', log)
wait_for_record(r'"GET /usr1 HTTP/1.1" 200 0 "-" "-"', log)
is not None
), 'reopen 2'
assert self.search_in_log(r'/usr1', log_new) is None, 'rename new 2'
assert search_in_file(r'/usr1', log_new) is None, 'rename new 2'
def test_usr1_unit_log(self, temp_dir, unit_pid):
def test_usr1_unit_log(
self, search_in_file, temp_dir, unit_pid, wait_for_record
):
self.load('log_body')
log_new = 'new.log'
@@ -59,7 +63,7 @@ class TestUSR1(TestApplicationPython):
body = 'body_for_a_log_new\n'
assert self.post(body=body)['status'] == 200
assert self.wait_for_record(body, log_new) is not None, 'rename new'
assert wait_for_record(body, log_new) is not None, 'rename new'
assert not os.path.isfile(log_path), 'rename old'
os.kill(unit_pid, signal.SIGUSR1)
@@ -69,8 +73,8 @@ class TestUSR1(TestApplicationPython):
body = 'body_for_a_log_unit\n'
assert self.post(body=body)['status'] == 200
assert self.wait_for_record(body) is not None, 'rename new'
assert self.search_in_log(body, log_new) is None, 'rename new 2'
assert wait_for_record(body) is not None, 'rename new'
assert search_in_file(body, log_new) is None, 'rename new 2'
finally:
# merge two log files into unit.log to check alerts

View File

@@ -27,12 +27,6 @@ class TestVariables(TestApplicationProto):
'access_log',
), 'access_log format'
def wait_for_record(self, pattern, name='access.log'):
return super().wait_for_record(pattern, name)
def search_in_log(self, pattern, name='access.log'):
return super().search_in_log(pattern, name)
def test_variables_dollar(self):
assert 'success' in self.conf("301", 'routes/0/action/return')
@@ -49,7 +43,7 @@ class TestVariables(TestApplicationProto):
)
check_dollar('path$dollar${dollar}', 'path$$')
def test_variables_request_time(self):
def test_variables_request_time(self, wait_for_record):
self.set_format('$uri $request_time')
sock = self.http(b'', raw=True, no_recv=True)
@@ -57,7 +51,7 @@ class TestVariables(TestApplicationProto):
time.sleep(1)
assert self.get(url='/r_time_1', sock=sock)['status'] == 200
assert self.wait_for_record(r'\/r_time_1 0\.\d{3}') is not None
assert wait_for_record(r'\/r_time_1 0\.\d{3}', 'access.log') is not None
sock = self.http(
b"""G""",
@@ -76,67 +70,70 @@ Connection: close
sock=sock,
raw=True,
)
assert self.wait_for_record(r'\/r_time_2 [1-9]\.\d{3}') is not None
assert (
wait_for_record(r'\/r_time_2 [1-9]\.\d{3}', 'access.log')
is not None
)
def test_variables_method(self):
def test_variables_method(self, search_in_file, wait_for_record):
self.set_format('$method')
reg = r'^GET$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get()['status'] == 200
assert self.wait_for_record(reg) is not None, 'method GET'
assert wait_for_record(reg, 'access.log') is not None, 'method GET'
reg = r'^POST$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.post()['status'] == 200
assert self.wait_for_record(reg) is not None, 'method POST'
assert wait_for_record(reg, 'access.log') is not None, 'method POST'
def test_variables_request_uri(self):
def test_variables_request_uri(self, search_in_file, wait_for_record):
self.set_format('$request_uri')
def check_request_uri(req_uri):
reg = fr'^{re.escape(req_uri)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get(url=req_uri)['status'] == 200
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
check_request_uri('/3')
check_request_uri('/4*')
check_request_uri('/4%2A')
check_request_uri('/9?q#a')
def test_variables_uri(self):
def test_variables_uri(self, search_in_file, wait_for_record):
self.set_format('$uri')
def check_uri(uri, expect=None):
expect = uri if expect is None else expect
reg = fr'^{re.escape(expect)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get(url=uri)['status'] == 200
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
check_uri('/3')
check_uri('/4*')
check_uri('/5%2A', '/5*')
check_uri('/9?q#a', '/9')
def test_variables_host(self):
def test_variables_host(self, search_in_file, wait_for_record):
self.set_format('$host')
def check_host(host, expect=None):
expect = host if expect is None else expect
reg = fr'^{re.escape(expect)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert (
self.get(headers={'Host': host, 'Connection': 'close'})[
'status'
]
== 200
)
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
check_host('localhost')
check_host('localhost1.', 'localhost1')
@@ -144,63 +141,67 @@ Connection: close
check_host('.localhost')
check_host('www.localhost')
def test_variables_remote_addr(self):
def test_variables_remote_addr(self, search_in_file, wait_for_record):
self.set_format('$remote_addr')
assert self.get()['status'] == 200
assert self.wait_for_record(r'^127\.0\.0\.1$') is not None
assert wait_for_record(r'^127\.0\.0\.1$', 'access.log') is not None
assert 'success' in self.conf(
{"[::1]:7080": {"pass": "routes"}}, 'listeners'
)
reg = r'^::1$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get(sock_type='ipv6')['status'] == 200
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
def test_variables_time_local(self):
def test_variables_time_local(
self, date_to_sec_epoch, search_in_file, wait_for_record
):
self.set_format('$uri $time_local $uri')
assert self.search_in_log(r'/time_local') is None
assert search_in_file(r'/time_local', 'access.log') is None
assert self.get(url='/time_local')['status'] == 200
assert self.wait_for_record(r'/time_local') is not None, 'time log'
date = self.search_in_log(
assert (
wait_for_record(r'/time_local', 'access.log') is not None
), 'time log'
date = search_in_file(
r'^\/time_local (.*) \/time_local$', 'access.log'
)[1]
assert (
abs(
self.date_to_sec_epoch(date, '%d/%b/%Y:%X %z')
date_to_sec_epoch(date, '%d/%b/%Y:%X %z')
- time.mktime(time.localtime())
)
< 5
), '$time_local'
def test_variables_request_line(self):
def test_variables_request_line(self, search_in_file, wait_for_record):
self.set_format('$request_line')
reg = r'^GET \/r_line HTTP\/1\.1$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get(url='/r_line')['status'] == 200
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
def test_variables_status(self):
def test_variables_status(self, search_in_file, wait_for_record):
self.set_format('$status')
assert 'success' in self.conf("418", 'routes/0/action/return')
reg = r'^418$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get()['status'] == 418
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
def test_variables_header_referer(self):
def test_variables_header_referer(self, search_in_file, wait_for_record):
self.set_format('$method $header_referer')
def check_referer(referer):
reg = fr'^GET {re.escape(referer)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert (
self.get(
headers={
@@ -211,19 +212,19 @@ Connection: close
)['status']
== 200
)
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
check_referer('referer-value')
check_referer('')
check_referer('no')
def test_variables_header_user_agent(self):
def test_variables_header_user_agent(self, search_in_file, wait_for_record):
self.set_format('$method $header_user_agent')
def check_user_agent(user_agent):
reg = fr'^GET {re.escape(user_agent)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert (
self.get(
headers={
@@ -234,19 +235,19 @@ Connection: close
)['status']
== 200
)
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
check_user_agent('MSIE')
check_user_agent('')
check_user_agent('no')
def test_variables_many(self):
def test_variables_many(self, search_in_file, wait_for_record):
def check_vars(uri, expect):
reg = fr'^{re.escape(expect)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get(url=uri)['status'] == 200
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
self.set_format('$uri$method')
check_vars('/1', '/1GET')
@@ -260,7 +261,7 @@ Connection: close
self.set_format('$method$method')
check_vars('/', 'GETGET')
def test_variables_dynamic(self):
def test_variables_dynamic(self, wait_for_record):
self.set_format('$header_foo$cookie_foo$arg_foo')
assert (
@@ -270,20 +271,20 @@ Connection: close
)['status']
== 200
)
assert self.wait_for_record(r'^blah$') is not None
assert wait_for_record(r'^blah$', 'access.log') is not None
def test_variables_dynamic_arguments(self):
def test_variables_dynamic_arguments(self, search_in_file, wait_for_record):
def check_arg(url, expect=None):
expect = url if expect is None else expect
reg = fr'^{re.escape(expect)}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert self.get(url=url)['status'] == 200
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
def check_no_arg(url):
assert self.get(url=url)['status'] == 200
assert self.search_in_log(r'^0$') is None
assert search_in_file(r'^0$', 'access.log') is None
self.set_format('$arg_foo_bar')
check_arg('/?foo_bar=1', '1')
@@ -304,25 +305,25 @@ Connection: close
check_no_arg('/?f=0')
check_no_arg('/?f!~=0')
def test_variables_dynamic_headers(self):
def test_variables_dynamic_headers(self, search_in_file, wait_for_record):
def check_header(header, value):
reg = fr'^{value}$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert (
self.get(headers={header: value, 'Connection': 'close'})[
'status'
]
== 200
)
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
def check_no_header(header):
assert (
self.get(headers={header: '0', 'Connection': 'close'})['status']
== 200
)
assert self.search_in_log(r'^0$') is None
assert search_in_file(r'^0$', 'access.log') is None
self.set_format('$header_foo_bar')
check_header('foo-bar', '1')
@@ -336,7 +337,7 @@ Connection: close
check_no_header('foo_bar')
check_no_header('foobar')
def test_variables_dynamic_cookies(self):
def test_variables_dynamic_cookies(self, search_in_file, wait_for_record):
def check_no_cookie(cookie):
assert (
self.get(
@@ -348,12 +349,12 @@ Connection: close
)['status']
== 200
)
assert self.search_in_log(r'^0$') is None
assert search_in_file(r'^0$', 'access.log') is None
self.set_format('$cookie_foo_bar')
reg = r'^1$'
assert self.search_in_log(reg) is None
assert search_in_file(reg, 'access.log') is None
assert (
self.get(
headers={
@@ -364,7 +365,7 @@ Connection: close
)['status']
== 200
)
assert self.wait_for_record(reg) is not None
assert wait_for_record(reg, 'access.log') is not None
check_no_cookie('fOo_bar=0')
check_no_cookie('foo_bar=')

View File

@@ -10,30 +10,6 @@ from unit.option import option
class TestApplicationProto(TestControl):
application_type = None
def sec_epoch(self):
return time.mktime(time.gmtime())
def date_to_sec_epoch(self, date, template='%a, %d %b %Y %X %Z'):
return time.mktime(time.strptime(date, template))
def findall(self, pattern, name='unit.log', flags=re.M):
return re.findall(pattern, Log.read(name), flags)
def search_in_log(self, pattern, name='unit.log', flags=re.M):
return re.search(pattern, Log.read(name), flags)
def wait_for_record(self, pattern, name='unit.log', wait=150, flags=re.M):
with Log.open(name) as f:
for _ in range(wait):
found = re.search(pattern, f.read(), flags)
if found is not None:
break
time.sleep(0.1)
return found
def get_application_type(self):
current_test = (
os.environ.get('PYTEST_CURRENT_TEST').split(':')[-1].split(' ')[0]

View File

@@ -1,7 +1,13 @@
import os
import platform
class Options:
_options = {
'architecture': platform.architecture()[0],
'is_privileged': os.geteuid() == 0,
'skip_alerts': [],
'skip_sanitizer': False,
'system': platform.system()
}
def __setattr__(self, name, value):

View File

@@ -90,17 +90,6 @@ def findmnt():
return out
def sysctl():
try:
out = subprocess.check_output(
['sysctl', '-a'], stderr=subprocess.STDOUT
).decode()
except FileNotFoundError:
pytest.skip('requires sysctl')
return out
def waitformount(template, timeout=50):
for _ in range(timeout):
if findmnt().find(template) != -1: