Tests: more tests for processes.

This commit is contained in:
Andrey Zelenkov
2018-02-06 20:31:48 +03:00
parent 993f8f2d5c
commit 5c35d30cc8

View File

@@ -57,6 +57,41 @@ def application(env, start_response):
} }
}) })
def test_python_processes_access(self):
self.conf('1', '/applications/' + self.app_name + '/processes')
self.assertIn('error', self.conf_get('/applications/' + self.app_name +
'/processes/max'), 'max no access')
self.assertIn('error', self.conf_get('/applications/' + self.app_name +
'/processes/spare'), 'spare no access')
self.assertIn('error', self.conf_get('/applications/' + self.app_name +
'/processes/idle_timeout'), 'idle_timeout no access')
def test_python_processes_spare_gt_max(self):
self.assertIn('error', self.conf({
"spare": 2,
"max": 1,
"idle_timeout": 1
}, '/applications/' + self.app_name + '/processes'),
'spare greater than max')
def test_python_processes_max_zero(self):
self.assertIn('error', self.conf({
"spare": 0,
"max": 0,
"idle_timeout": 1
}, '/applications/' + self.app_name + '/processes'), 'max 0')
def test_python_processes_idle_timeout_zero(self):
self.conf({
"spare": 0,
"max": 2,
"idle_timeout": 0
}, '/applications/' + self.app_name + '/processes')
self.get()
self.assertEqual(len(self.pids_for_process()), 0, 'idle timeout 0')
def test_python_prefork(self): def test_python_prefork(self):
self.conf('2', '/applications/' + self.app_name + '/processes') self.conf('2', '/applications/' + self.app_name + '/processes')
@@ -76,6 +111,18 @@ def application(env, start_response):
self.stop_all() self.stop_all()
@unittest.expectedFailure
def test_python_prefork_same_processes(self):
self.conf('2', '/applications/' + self.app_name + '/processes')
pids = self.pids_for_process()
self.conf('4', '/applications/' + self.app_name + '/processes')
pids_new = self.pids_for_process()
self.assertTrue(pids.issubset(pids_new), 'prefork same processes')
def test_python_ondemand(self): def test_python_ondemand(self):
self.conf({ self.conf({
"spare": 0, "spare": 0,
@@ -153,6 +200,52 @@ def application(env, start_response):
self.stop_all() self.stop_all()
def test_python_idle_timeout(self):
self.conf({
"spare": 0,
"max": 6,
"idle_timeout": 2
}, '/applications/' + self.app_name + '/processes')
self.get()
pids = self.pids_for_process()
self.assertEqual(len(pids), 1, 'idle timeout 1')
time.sleep(1)
self.get()
time.sleep(1)
pids_new = self.pids_for_process()
self.assertEqual(len(pids_new), 1, 'idle timeout still 1')
self.assertSetEqual(self.pids_for_process(), pids,
'idle timeout still 1 same pid')
time.sleep(1)
self.assertEqual(len(self.pids_for_process()), 0, 'idle timed out')
def test_python_processes_connection_keepalive(self):
self.conf({
"spare": 0,
"max": 6,
"idle_timeout": 2
}, '/applications/' + self.app_name + '/processes')
(resp, sock) = self.get(headers={
'Host': 'localhost',
'Connection': 'keep-alive'
}, start=True)
self.assertEqual(len(self.pids_for_process()), 1,
'keepalive connection 1')
time.sleep(2)
self.assertEqual(len(self.pids_for_process()), 0, 'keepalive connection 0')
sock.close()
def stop_all(self): def stop_all(self):
self.conf({ self.conf({
"listeners": {}, "listeners": {},