Tests: assertion related fixes.

This commit is contained in:
Andrei Zeliankou
2023-05-25 16:56:14 +01:00
parent 0132e552d0
commit b034bf6703
11 changed files with 90 additions and 95 deletions

View File

@@ -34,10 +34,10 @@ class TestASGIWebsockets(TestApplicationPython):
self.check_close(sock) self.check_close(sock)
def check_close(self, sock, code=1000, no_close=False, frame=None): def check_close(self, sock, code=1000, no_close=False, frame=None):
if frame == None: if frame is None:
frame = self.ws.frame_read(sock) frame = self.ws.frame_read(sock)
assert frame['fin'] == True, 'close fin' assert frame['fin'], 'close fin'
assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode'
assert frame['code'] == code, 'close code' assert frame['code'] == code, 'close code'

View File

@@ -289,10 +289,10 @@ class TestGoIsolation(TestApplicationGo):
obj = self.getjson(url='/?file=/go/app')['body'] obj = self.getjson(url='/?file=/go/app')['body']
assert obj['FileExists'] == True, 'app relative to rootfs' assert obj['FileExists'], 'app relative to rootfs'
obj = self.getjson(url='/?file=/bin/sh')['body'] obj = self.getjson(url='/?file=/bin/sh')['body']
assert obj['FileExists'] == False, 'file should not exists' assert not obj['FileExists'], 'file should not exists'
def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir): def test_go_isolation_rootfs_container_priv(self, is_su, temp_dir):
if not is_su: if not is_su:
@@ -310,10 +310,10 @@ class TestGoIsolation(TestApplicationGo):
obj = self.getjson(url='/?file=/go/app')['body'] obj = self.getjson(url='/?file=/go/app')['body']
assert obj['FileExists'] == True, 'app relative to rootfs' assert obj['FileExists'], 'app relative to rootfs'
obj = self.getjson(url='/?file=/bin/sh')['body'] obj = self.getjson(url='/?file=/bin/sh')['body']
assert obj['FileExists'] == False, 'file should not exists' assert not obj['FileExists'], 'file should not exists'
def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir): def test_go_isolation_rootfs_automount_tmpfs(self, is_su, temp_dir):
try: try:

View File

@@ -26,7 +26,7 @@ class TestGoIsolationRootfs(TestApplicationGo):
obj = self.getjson(url='/?file=/go/app')['body'] obj = self.getjson(url='/?file=/go/app')['body']
assert obj['FileExists'] == True, 'app relative to rootfs' assert obj['FileExists'], 'app relative to rootfs'
obj = self.getjson(url='/?file=/bin/sh')['body'] obj = self.getjson(url='/?file=/bin/sh')['body']
assert obj['FileExists'] == False, 'file should not exists' assert not obj['FileExists'], 'file should not exists'

View File

@@ -374,7 +374,7 @@ class TestJavaApplication(TestApplicationJava):
assert headers['X-Add-Utf8-Value'] == '????', 'add Utf8 header value' assert headers['X-Add-Utf8-Value'] == '????', 'add Utf8 header value'
assert headers['X-Add-Utf8-Name-???'] == 'y', 'add Utf8 header name' assert headers['X-Add-Utf8-Name-???'] == 'y', 'add Utf8 header name'
assert headers['X-Add-Test'] == 'v1', 'add null header' assert headers['X-Add-Test'] == 'v1', 'add null header'
assert ('X-Set-Test1' in headers) == False, 'set null header' assert 'X-Set-Test1' not in headers, 'set null header'
assert headers['X-Set-Test2'] == '', 'set empty header' assert headers['X-Set-Test2'] == '', 'set empty header'
def test_java_application_content_type(self): def test_java_application_content_type(self):
@@ -440,10 +440,8 @@ class TestJavaApplication(TestApplicationJava):
headers = self.get(url='/6')['headers'] headers = self.get(url='/6')['headers']
assert ('Content-Type' in headers) == False, '#6 no Content-Type header' assert 'Content-Type' not in headers, '#6 no Content-Type header'
assert ( assert 'X-Content-Type' not in headers, '#6 no response Content-Type'
'X-Content-Type' in headers
) == False, '#6 no response Content-Type'
assert headers['X-Character-Encoding'] == 'utf-8', '#6 response charset' assert headers['X-Character-Encoding'] == 'utf-8', '#6 response charset'
headers = self.get(url='/7')['headers'] headers = self.get(url='/7')['headers']
@@ -477,7 +475,7 @@ class TestJavaApplication(TestApplicationJava):
resp = self.get(url='/dir1/') resp = self.get(url='/dir1/')
assert ('This is index.txt.' in resp['body']) == True, 'dir1 index body' assert 'This is index.txt.' in resp['body'], 'dir1 index body'
assert resp['headers']['X-TXT-Filter'] == '1', 'TXT Filter header' assert resp['headers']['X-TXT-Filter'] == '1', 'TXT Filter header'
headers = self.get(url='/dir2/')['headers'] headers = self.get(url='/dir2/')['headers']
@@ -494,8 +492,8 @@ class TestJavaApplication(TestApplicationJava):
headers = self.get(url='/dir4/')['headers'] headers = self.get(url='/dir4/')['headers']
assert ( assert (
'X-App-Servlet' in headers 'X-App-Servlet' not in headers
) == False, 'Static welcome file served first' ), 'Static welcome file served first'
headers = self.get(url='/dir5/')['headers'] headers = self.get(url='/dir5/')['headers']
@@ -617,14 +615,14 @@ class TestJavaApplication(TestApplicationJava):
), 'original request query' ), 'original request query'
assert ( assert (
'Before forwarding' in resp['body'] 'Before forwarding' not in resp['body']
) == False, 'discarded data added before forward() call' ), 'discarded data added before forward() call'
assert ( assert (
'X-After-Forwarding' in headers 'X-After-Forwarding' not in headers
) == False, 'cannot add headers after forward() call' ), 'cannot add headers after forward() call'
assert ( assert (
'After forwarding' in resp['body'] 'After forwarding' not in resp['body']
) == False, 'cannot add data after forward() call' ), 'cannot add data after forward() call'
def test_java_application_named_dispatcher_forward(self): def test_java_application_named_dispatcher_forward(self):
self.load('forward') self.load('forward')
@@ -668,14 +666,14 @@ class TestJavaApplication(TestApplicationJava):
), 'original request query' ), 'original request query'
assert ( assert (
'Before forwarding' in resp['body'] 'Before forwarding' not in resp['body']
) == False, 'discarded data added before forward() call' ), 'discarded data added before forward() call'
assert ( assert (
'X-After-Forwarding' in headers 'X-After-Forwarding' not in headers
) == False, 'cannot add headers after forward() call' ), 'cannot add headers after forward() call'
assert ( assert (
'After forwarding' in resp['body'] 'After forwarding' not in resp['body']
) == False, 'cannot add data after forward() call' ), 'cannot add data after forward() call'
def test_java_application_request_uri_include(self): def test_java_application_request_uri_include(self):
self.load('include') self.load('include')
@@ -690,34 +688,32 @@ class TestJavaApplication(TestApplicationJava):
assert headers['X-Include'] == '/data/test', 'including triggered' assert headers['X-Include'] == '/data/test', 'including triggered'
assert ( assert (
'X-INCLUDE-Id' in headers 'X-INCLUDE-Id' not in headers
) == False, 'unable to add headers in include request' ), 'unable to add headers in include request'
assert ( assert (
'javax.servlet.include.request_uri: /data/test' in body 'javax.servlet.include.request_uri: /data/test' in body
) == True, 'include request uri' ), 'include request uri'
# assert ( # assert (
# 'javax.servlet.include.context_path: ' in body # 'javax.servlet.include.context_path: ' in body
# ) == True, 'include request context path' # ) == True, 'include request context path'
assert ( assert (
'javax.servlet.include.servlet_path: /data' in body 'javax.servlet.include.servlet_path: /data' in body
) == True, 'include request servlet path' ), 'include request servlet path'
assert ( assert (
'javax.servlet.include.path_info: /test' in body 'javax.servlet.include.path_info: /test' in body
) == True, 'include request path info' ), 'include request path info'
assert ( assert (
'javax.servlet.include.query_string: null' in body 'javax.servlet.include.query_string: null' in body
) == True, 'include request query' ), 'include request query'
assert ( assert (
'Before include' in body 'Before include' in body
) == True, 'preserve data added before include() call' ), 'preserve data added before include() call'
assert ( assert (
headers['X-After-Include'] == 'you-should-see-this' headers['X-After-Include'] == 'you-should-see-this'
), 'add headers after include() call' ), 'add headers after include() call'
assert ( assert 'After include' in body, 'add data after include() call'
'After include' in body
) == True, 'add data after include() call'
def test_java_application_named_dispatcher_include(self): def test_java_application_named_dispatcher_include(self):
self.load('include') self.load('include')
@@ -732,34 +728,32 @@ class TestJavaApplication(TestApplicationJava):
assert headers['X-Include'] == 'data', 'including triggered' assert headers['X-Include'] == 'data', 'including triggered'
assert ( assert (
'X-INCLUDE-Id' in headers 'X-INCLUDE-Id' not in headers
) == False, 'unable to add headers in include request' ), 'unable to add headers in include request'
assert ( assert (
'javax.servlet.include.request_uri: null' in body 'javax.servlet.include.request_uri: null' in body
) == True, 'include request uri' ), 'include request uri'
# assert ( # assert (
# 'javax.servlet.include.context_path: null' in body # 'javax.servlet.include.context_path: null' in body
# ) == True, 'include request context path' # ) == True, 'include request context path'
assert ( assert (
'javax.servlet.include.servlet_path: null' in body 'javax.servlet.include.servlet_path: null' in body
) == True, 'include request servlet path' ), 'include request servlet path'
assert ( assert (
'javax.servlet.include.path_info: null' in body 'javax.servlet.include.path_info: null' in body
) == True, 'include request path info' ), 'include request path info'
assert ( assert (
'javax.servlet.include.query_string: null' in body 'javax.servlet.include.query_string: null' in body
) == True, 'include request query' ), 'include request query'
assert ( assert (
'Before include' in body 'Before include' in body
) == True, 'preserve data added before include() call' ), 'preserve data added before include() call'
assert ( assert (
headers['X-After-Include'] == 'you-should-see-this' headers['X-After-Include'] == 'you-should-see-this'
), 'add headers after include() call' ), 'add headers after include() call'
assert ( assert 'After include' in body, 'add data after include() call'
'After include' in body
) == True, 'add data after include() call'
def test_java_application_path_translation(self): def test_java_application_path_translation(self):
self.load('path_translation') self.load('path_translation')
@@ -772,9 +766,8 @@ class TestJavaApplication(TestApplicationJava):
headers['X-Path-Translated'] headers['X-Path-Translated']
== f"{headers['X-Real-Path']}{headers['X-Path-Info']}" == f"{headers['X-Real-Path']}{headers['X-Path-Info']}"
), 'translated path is the app root + path info' ), 'translated path is the app root + path info'
assert ( assert headers['X-Resource-Paths'].endswith(
headers['X-Resource-Paths'].endswith('/WEB-INF/, /index.html]') '/WEB-INF/, /index.html]'
== True
), 'app root directory content' ), 'app root directory content'
assert ( assert (
headers['X-Resource-As-Stream'] == 'null' headers['X-Resource-As-Stream'] == 'null'
@@ -789,9 +782,7 @@ class TestJavaApplication(TestApplicationJava):
assert ( assert (
headers['X-Path-Translated'] == 'null' headers['X-Path-Translated'] == 'null'
), 'translated path is null because path info is null' ), 'translated path is null because path info is null'
assert ( assert headers['X-Real-Path'].endswith('/none'), 'read path is not null'
headers['X-Real-Path'].endswith('/none') == True
), 'read path is not null'
assert headers['X-Resource-Paths'] == 'null', 'no resource found' assert headers['X-Resource-Paths'] == 'null', 'no resource found'
assert headers['X-Resource-As-Stream'] == 'null', 'no resource stream' assert headers['X-Resource-As-Stream'] == 'null', 'no resource stream'

View File

@@ -28,10 +28,10 @@ class TestJavaWebsockets(TestApplicationJava):
self.check_close(sock) self.check_close(sock)
def check_close(self, sock, code=1000, no_close=False, frame=None): def check_close(self, sock, code=1000, no_close=False, frame=None):
if frame == None: if frame is None:
frame = self.ws.frame_read(sock) frame = self.ws.frame_read(sock)
assert frame['fin'] == True, 'close fin' assert frame['fin'], 'close fin'
assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode'
assert frame['code'] == code, 'close code' assert frame['code'] == code, 'close code'

View File

@@ -28,10 +28,10 @@ class TestNodeWebsockets(TestApplicationNode):
self.check_close(sock) self.check_close(sock)
def check_close(self, sock, code=1000, no_close=False, frame=None): def check_close(self, sock, code=1000, no_close=False, frame=None):
if frame == None: if frame is None:
frame = self.ws.frame_read(sock) frame = self.ws.frame_read(sock)
assert frame['fin'] == True, 'close fin' assert frame['fin'], 'close fin'
assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode' assert frame['opcode'] == self.ws.OP_CLOSE, 'close opcode'
assert frame['code'] == code, 'close code' assert frame['code'] == code, 'close code'

View File

@@ -58,27 +58,25 @@ class TestPythonIsolation(TestApplicationPython):
self.load('ns_inspect', isolation=isolation) self.load('ns_inspect', isolation=isolation)
assert ( assert not (
self.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] self.getjson(url=f'/?path={temp_dir}')['body']['FileExists']
== False
), 'temp_dir does not exists in rootfs' ), 'temp_dir does not exists in rootfs'
assert ( assert self.getjson(url='/?path=/proc/self')['body'][
self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True 'FileExists'
), 'no /proc/self' ], 'no /proc/self'
assert ( assert not (
self.getjson(url='/?path=/dev/pts')['body']['FileExists'] == False self.getjson(url='/?path=/dev/pts')['body']['FileExists']
), 'no /dev/pts' ), 'no /dev/pts'
assert ( assert not (
self.getjson(url='/?path=/sys/kernel')['body']['FileExists'] self.getjson(url='/?path=/sys/kernel')['body']['FileExists']
== False
), 'no /sys/kernel' ), 'no /sys/kernel'
ret = self.getjson(url='/?path=/app/python/ns_inspect') ret = self.getjson(url='/?path=/app/python/ns_inspect')
assert ret['body']['FileExists'] == True, 'application exists in rootfs' assert ret['body']['FileExists'], 'application exists in rootfs'
def test_python_isolation_rootfs_no_language_deps(self, is_su, temp_dir): def test_python_isolation_rootfs_no_language_deps(self, is_su, temp_dir):
if not is_su: if not is_su:
@@ -113,17 +111,17 @@ class TestPythonIsolation(TestApplicationPython):
self.load('ns_inspect', isolation=isolation) self.load('ns_inspect', isolation=isolation)
assert ( assert not (
self.getjson(url='/?path=/proc/self')['body']['FileExists'] == False self.getjson(url='/?path=/proc/self')['body']['FileExists']
), 'no /proc/self' ), 'no /proc/self'
isolation['automount']['procfs'] = True isolation['automount']['procfs'] = True
self.load('ns_inspect', isolation=isolation) self.load('ns_inspect', isolation=isolation)
assert ( assert self.getjson(url='/?path=/proc/self')['body'][
self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True 'FileExists'
), '/proc/self' ], '/proc/self'
def test_python_isolation_cgroup(self, is_su): def test_python_isolation_cgroup(self, is_su):
if not is_su: if not is_su:

View File

@@ -15,24 +15,22 @@ class TestPythonIsolation(TestApplicationPython):
self.load('ns_inspect', isolation=isolation) self.load('ns_inspect', isolation=isolation)
assert ( assert not (
self.getjson(url=f'/?path={temp_dir}')['body']['FileExists'] self.getjson(url=f'/?path={temp_dir}')['body']['FileExists']
== False
), 'temp_dir does not exists in rootfs' ), 'temp_dir does not exists in rootfs'
assert ( assert self.getjson(url='/?path=/proc/self')['body'][
self.getjson(url='/?path=/proc/self')['body']['FileExists'] == True 'FileExists'
), 'no /proc/self' ], 'no /proc/self'
assert ( assert not (
self.getjson(url='/?path=/dev/pts')['body']['FileExists'] == False self.getjson(url='/?path=/dev/pts')['body']['FileExists']
), 'no /dev/pts' ), 'no /dev/pts'
assert ( assert not (
self.getjson(url='/?path=/sys/kernel')['body']['FileExists'] self.getjson(url='/?path=/sys/kernel')['body']['FileExists']
== False
), 'no /sys/kernel' ), 'no /sys/kernel'
ret = self.getjson(url='/?path=/app/python/ns_inspect') ret = self.getjson(url='/?path=/app/python/ns_inspect')
assert ret['body']['FileExists'] == True, 'application exists in rootfs' assert ret['body']['FileExists'], 'application exists in rootfs'

View File

@@ -9,7 +9,7 @@ class TestStatus(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}} prerequisites = {'modules': {'python': 'any'}}
def check_connections(self, accepted, active, idle, closed): def check_connections(self, accepted, active, idle, closed):
Status.get('/connections') == { assert Status.get('/connections') == {
'accepted': accepted, 'accepted': accepted,
'active': active, 'active': active,
'idle': idle, 'idle': idle,
@@ -112,7 +112,12 @@ Connection: close
# idle # idle
sock = self.http(b'', raw=True, no_recv=True) (_, sock) = self.get(
headers={'Host': 'localhost', 'Connection': 'keep-alive'},
start=True,
read_timeout=1,
)
self.check_connections(2, 0, 1, 1) self.check_connections(2, 0, 1, 1)
self.get(sock=sock) self.get(sock=sock)
@@ -141,7 +146,7 @@ Connection: close
assert apps == expert.sort() assert apps == expert.sort()
def check_application(name, running, starting, idle, active): def check_application(name, running, starting, idle, active):
Status.get(f'/applications/{name}') == { assert Status.get(f'/applications/{name}') == {
'processes': { 'processes': {
'running': running, 'running': running,
'starting': starting, 'starting': starting,

View File

@@ -331,7 +331,7 @@ basicConstraints = critical,CA:TRUE"""
except ssl.SSLError: except ssl.SSLError:
resp = None resp = None
assert resp == None, 'certificate chain incomplete chain' assert resp is None, 'certificate chain incomplete chain'
# intermediate # intermediate
@@ -571,7 +571,7 @@ basicConstraints = critical,CA:TRUE"""
except: except:
resp = None resp = None
assert resp == None, 'keepalive remove certificate' assert resp is None, 'keepalive remove certificate'
@pytest.mark.skip('not yet') @pytest.mark.skip('not yet')
def test_tls_certificates_remove_all(self): def test_tls_certificates_remove_all(self):

View File

@@ -354,13 +354,16 @@ Connection: close
reg = r'^1$' reg = r'^1$'
assert self.search_in_log(reg) is None assert self.search_in_log(reg) is None
assert (
self.get( self.get(
headers={ headers={
'Host': 'localhost', 'Host': 'localhost',
'Cookie': 'foo_bar=1', 'Cookie': 'foo_bar=1',
'Connection': 'close', 'Connection': 'close',
}, },
)['status'] == 200 )['status']
== 200
)
assert self.wait_for_record(reg) is not None assert self.wait_for_record(reg) is not None
check_no_cookie('fOo_bar=0') check_no_cookie('fOo_bar=0')