Tests: added Python threading tests.

This commit is contained in:
Max Romanov
2020-11-05 00:05:02 +03:00
parent 8e37b1cbf5
commit f27953af61
4 changed files with 124 additions and 0 deletions

View File

@@ -401,3 +401,44 @@ Connection: close
assert (
self.wait_for_record(r'\(5\) Thread: 100') is not None
), 'last thread finished'
def test_asgi_application_threads(self):
self.load('threads')
assert 'success' in self.conf(
'4', 'applications/threads/threads'
), 'configure 4 threads'
socks = []
for i in range(4):
(_, sock) = self.get(
headers={
'Host': 'localhost',
'X-Delay': '2',
'Connection': 'close',
},
no_recv=True,
start=True,
)
socks.append(sock)
time.sleep(0.25) # required to avoid greedy request reading
threads = set()
for sock in socks:
resp = self.recvall(sock).decode('utf-8')
self.log_in(resp)
resp = self._resp_to_dict(resp)
assert resp['status'] == 200, 'status'
threads.add(resp['headers']['x-thread'])
sock.close()
assert len(socks) == len(threads), 'threads differs'