Tests: Python exception tests.

This commit is contained in:
Andrei Zeliankou
2019-11-14 18:46:28 +03:00
parent c2976fb337
commit 1072c94829
2 changed files with 202 additions and 0 deletions

View File

@@ -0,0 +1,45 @@
class application:
def __init__(self, environ, start_response):
self.environ = environ
self.start = start_response
self.next = self.__next__
def __iter__(self):
self.__i = 0
self._skip_level = int(self.environ.get('HTTP_X_SKIP', 0))
self._not_skip_close = int(self.environ.get('HTTP_X_NOT_SKIP_CLOSE', 0))
self._is_chunked = self.environ.get('HTTP_X_CHUNKED')
headers = [(('Content-Length', '10'))]
if self._is_chunked is not None:
headers = []
if self._skip_level < 1:
raise Exception('first exception')
write = self.start('200', headers)
if self._skip_level < 2:
raise Exception('second exception')
write(b'XXXXX')
if self._skip_level < 3:
raise Exception('third exception')
return self
def __next__(self):
if self._skip_level < 4:
raise Exception('next exception')
self.__i += 1
if self.__i > 2:
raise StopIteration
return b'X'
def close(self):
if self._not_skip_close == 1:
raise Exception('close exception')

View File

@@ -1,3 +1,4 @@
import re
import time
import unittest
from unit.applications.lang.python import TestApplicationPython
@@ -6,6 +7,10 @@ from unit.applications.lang.python import TestApplicationPython
class TestPythonApplication(TestApplicationPython):
prerequisites = {'modules': ['python']}
def findall(self, pattern):
with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
return re.findall(pattern, f.read())
def test_python_application_variables(self):
self.load('variables')
@@ -521,5 +526,157 @@ Connection: close
self.wait_for_record(r'\(5\) Thread: 100'), 'last thread finished'
)
def test_python_application_iter_exception(self):
self.load('iter_exception')
# Default request doesn't lead to the exception.
resp = self.get(
headers={
'Host': 'localhost',
'X-Skip': '9',
'X-Chunked': '1',
'Connection': 'close',
}
)
self.assertEqual(resp['status'], 200, 'status')
self.assertEqual(resp['body'][-5:], '0\r\n\r\n', 'body')
# Exception before start_response().
self.assertEqual(self.get()['status'], 503, 'error')
self.assertIsNotNone(self.wait_for_record(r'Traceback'), 'traceback')
self.assertIsNotNone(
self.wait_for_record(r'raise Exception\(\'first exception\'\)'),
'first exception raise',
)
self.assertEqual(
len(self.findall(r'Traceback')), 1, 'traceback count 1'
)
# Exception after start_response(), before first write().
self.assertEqual(
self.get(
headers={
'Host': 'localhost',
'X-Skip': '1',
'Connection': 'close',
}
)['status'],
503,
'error 2',
)
self.assertIsNotNone(
self.wait_for_record(r'raise Exception\(\'second exception\'\)'),
'exception raise second',
)
self.assertEqual(
len(self.findall(r'Traceback')), 2, 'traceback count 2'
)
# Exception after first write(), before first __next__().
_, sock = self.get(
headers={
'Host': 'localhost',
'X-Skip': '2',
'Connection': 'keep-alive',
},
start=True,
)
self.assertIsNotNone(
self.wait_for_record(r'raise Exception\(\'third exception\'\)'),
'exception raise third',
)
self.assertEqual(
len(self.findall(r'Traceback')), 3, 'traceback count 3'
)
self.assertDictEqual(self.get(sock=sock), {}, 'closed connection')
# Exception after first write(), before first __next__(),
# chunked (incomplete body).
resp = self.get(
headers={
'Host': 'localhost',
'X-Skip': '2',
'X-Chunked': '1',
'Connection': 'close',
}
)
if 'body' in resp:
self.assertNotEqual(
resp['body'][-5:], '0\r\n\r\n', 'incomplete body'
)
self.assertEqual(
len(self.findall(r'Traceback')), 4, 'traceback count 4'
)
# Exception in __next__().
_, sock = self.get(
headers={
'Host': 'localhost',
'X-Skip': '3',
'Connection': 'keep-alive',
},
start=True,
)
self.assertIsNotNone(
self.wait_for_record(r'raise Exception\(\'next exception\'\)'),
'exception raise next',
)
self.assertEqual(
len(self.findall(r'Traceback')), 5, 'traceback count 5'
)
self.assertDictEqual(self.get(sock=sock), {}, 'closed connection 2')
# Exception in __next__(), chunked (incomplete body).
resp = self.get(
headers={
'Host': 'localhost',
'X-Skip': '3',
'X-Chunked': '1',
'Connection': 'close',
}
)
if 'body' in resp:
self.assertNotEqual(
resp['body'][-5:], '0\r\n\r\n', 'incomplete body 2'
)
self.assertEqual(
len(self.findall(r'Traceback')), 6, 'traceback count 6'
)
# Exception before start_response() and in close().
self.assertEqual(
self.get(
headers={
'Host': 'localhost',
'X-Not-Skip-Close': '1',
'Connection': 'close',
}
)['status'],
503,
'error',
)
self.assertIsNotNone(
self.wait_for_record(r'raise Exception\(\'close exception\'\)'),
'exception raise close',
)
self.assertEqual(
len(self.findall(r'Traceback')), 8, 'traceback count 8'
)
if __name__ == '__main__':
TestPythonApplication.main()