Tests: fixed test_respawn.py to act upon test processes.
Running `test_respawn_` test cases on a machine with Unit daemon in background would fail tests because `ps ax` was used without filtering out other unit instances. This patch also prevents from tests killing other Unit processes not related to tests.
This commit is contained in:
@@ -443,6 +443,10 @@ def is_unsafe(request):
|
|||||||
def is_su(request):
|
def is_su(request):
|
||||||
return os.geteuid() == 0
|
return os.geteuid() == 0
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def unit_pid(request):
|
||||||
|
return unit_instance['process'].pid
|
||||||
|
|
||||||
def pytest_sessionfinish(session):
|
def pytest_sessionfinish(session):
|
||||||
unit_stop()
|
unit_stop()
|
||||||
shutil.rmtree(option.cache_dir)
|
shutil.rmtree(option.cache_dir)
|
||||||
|
|||||||
@@ -21,17 +21,17 @@ class TestRespawn(TestApplicationPython):
|
|||||||
'1', 'applications/' + self.app_name + '/processes'
|
'1', 'applications/' + self.app_name + '/processes'
|
||||||
)
|
)
|
||||||
|
|
||||||
def pid_by_name(self, name):
|
def pid_by_name(self, name, ppid):
|
||||||
output = subprocess.check_output(['ps', 'ax']).decode()
|
output = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode()
|
||||||
m = re.search(r'\s*(\d+).*' + name, output)
|
m = re.search(r'\s*(\d+)\s*' + str(ppid) + r'.*' + name, output)
|
||||||
return m if m is None else m.group(1)
|
return None if m is None else m.group(1)
|
||||||
|
|
||||||
def kill_pids(self, *pids):
|
def kill_pids(self, *pids):
|
||||||
subprocess.call(['kill', '-9'] + list(pids))
|
subprocess.call(['kill', '-9'] + list(pids))
|
||||||
|
|
||||||
def wait_for_process(self, process):
|
def wait_for_process(self, process, unit_pid):
|
||||||
for i in range(50):
|
for i in range(50):
|
||||||
found = self.pid_by_name(process)
|
found = self.pid_by_name(process, unit_pid)
|
||||||
|
|
||||||
if found is not None:
|
if found is not None:
|
||||||
break
|
break
|
||||||
@@ -40,7 +40,10 @@ class TestRespawn(TestApplicationPython):
|
|||||||
|
|
||||||
return found
|
return found
|
||||||
|
|
||||||
def smoke_test(self):
|
def find_proc(self, name, ppid, ps_output):
|
||||||
|
return re.findall(str(ppid) + r'.*' + name, ps_output)
|
||||||
|
|
||||||
|
def smoke_test(self, unit_pid):
|
||||||
for _ in range(5):
|
for _ in range(5):
|
||||||
assert 'success' in self.conf(
|
assert 'success' in self.conf(
|
||||||
'1', 'applications/' + self.app_name + '/processes'
|
'1', 'applications/' + self.app_name + '/processes'
|
||||||
@@ -50,39 +53,41 @@ class TestRespawn(TestApplicationPython):
|
|||||||
# Check if the only one router, controller,
|
# Check if the only one router, controller,
|
||||||
# and application processes running.
|
# and application processes running.
|
||||||
|
|
||||||
output = subprocess.check_output(['ps', 'ax']).decode()
|
out = subprocess.check_output(['ps', 'ax', '-O', 'ppid']).decode()
|
||||||
assert len(re.findall(self.PATTERN_ROUTER, output)) == 1
|
assert len(self.find_proc(self.PATTERN_ROUTER, unit_pid, out)) == 1
|
||||||
assert len(re.findall(self.PATTERN_CONTROLLER, output)) == 1
|
assert len(self.find_proc(self.PATTERN_CONTROLLER, unit_pid, out)) == 1
|
||||||
assert len(re.findall(self.app_name, output)) == 1
|
assert len(self.find_proc(self.app_name, unit_pid, out)) == 1
|
||||||
|
|
||||||
def test_respawn_router(self, skip_alert):
|
def test_respawn_router(self, skip_alert, unit_pid):
|
||||||
pid = self.pid_by_name(self.PATTERN_ROUTER)
|
pid = self.pid_by_name(self.PATTERN_ROUTER, unit_pid)
|
||||||
|
|
||||||
self.kill_pids(pid)
|
self.kill_pids(pid)
|
||||||
skip_alert(r'process %s exited on signal 9' % pid)
|
skip_alert(r'process %s exited on signal 9' % pid)
|
||||||
|
|
||||||
assert self.wait_for_process(self.PATTERN_ROUTER) is not None
|
assert self.wait_for_process(self.PATTERN_ROUTER, unit_pid) is not None
|
||||||
|
|
||||||
self.smoke_test()
|
self.smoke_test(unit_pid)
|
||||||
|
|
||||||
def test_respawn_controller(self, skip_alert):
|
def test_respawn_controller(self, skip_alert, unit_pid):
|
||||||
pid = self.pid_by_name(self.PATTERN_CONTROLLER)
|
pid = self.pid_by_name(self.PATTERN_CONTROLLER, unit_pid)
|
||||||
|
|
||||||
self.kill_pids(pid)
|
self.kill_pids(pid)
|
||||||
skip_alert(r'process %s exited on signal 9' % pid)
|
skip_alert(r'process %s exited on signal 9' % pid)
|
||||||
|
|
||||||
assert self.wait_for_process(self.PATTERN_CONTROLLER) is not None
|
assert self.wait_for_process(
|
||||||
|
self.PATTERN_CONTROLLER, unit_pid
|
||||||
|
) is not None
|
||||||
|
|
||||||
assert self.get()['status'] == 200
|
assert self.get()['status'] == 200
|
||||||
|
|
||||||
self.smoke_test()
|
self.smoke_test(unit_pid)
|
||||||
|
|
||||||
def test_respawn_application(self, skip_alert):
|
def test_respawn_application(self, skip_alert, unit_pid):
|
||||||
pid = self.pid_by_name(self.app_name)
|
pid = self.pid_by_name(self.app_name, unit_pid)
|
||||||
|
|
||||||
self.kill_pids(pid)
|
self.kill_pids(pid)
|
||||||
skip_alert(r'process %s exited on signal 9' % pid)
|
skip_alert(r'process %s exited on signal 9' % pid)
|
||||||
|
|
||||||
assert self.wait_for_process(self.app_name) is not None
|
assert self.wait_for_process(self.app_name, unit_pid) is not None
|
||||||
|
|
||||||
self.smoke_test()
|
self.smoke_test(unit_pid)
|
||||||
|
|||||||
Reference in New Issue
Block a user