Initial applications isolation support using Linux namespaces.
This commit is contained in:
135
test/test_go_isolation.py
Normal file
135
test/test_go_isolation.py
Normal file
@@ -0,0 +1,135 @@
|
||||
import os
|
||||
import json
|
||||
import unittest
|
||||
from unit.applications.lang.go import TestApplicationGo
|
||||
from unit.feature.isolation import TestFeatureIsolation
|
||||
|
||||
|
||||
class TestGoIsolation(TestApplicationGo):
|
||||
prerequisites = {'modules': ['go'], 'features': ['isolation']}
|
||||
|
||||
isolation = TestFeatureIsolation()
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls, complete_check=True):
|
||||
unit = super().setUpClass(complete_check=False)
|
||||
|
||||
TestFeatureIsolation().check(cls.available, unit.testdir)
|
||||
|
||||
return unit if not complete_check else unit.complete()
|
||||
|
||||
def isolation_key(self, key):
|
||||
return key in self.available['features']['isolation'].keys()
|
||||
|
||||
def conf_isolation(self, isolation):
|
||||
self.assertIn(
|
||||
'success',
|
||||
self.conf(isolation, 'applications/ns_inspect/isolation'),
|
||||
'configure isolation',
|
||||
)
|
||||
|
||||
def test_isolation_values(self):
|
||||
self.load('ns_inspect')
|
||||
|
||||
obj = self.isolation.parsejson(self.get()['body'])
|
||||
|
||||
for ns, ns_value in self.available['features']['isolation'].items():
|
||||
if ns.upper() in obj['NS']:
|
||||
self.assertEqual(
|
||||
obj['NS'][ns.upper()], ns_value, '%s match' % ns
|
||||
)
|
||||
|
||||
def test_isolation_user(self):
|
||||
if not self.isolation_key('unprivileged_userns_clone'):
|
||||
print('unprivileged clone is not available')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
self.load('ns_inspect')
|
||||
obj = self.isolation.parsejson(self.get()['body'])
|
||||
|
||||
self.assertTrue(obj['UID'] != 0, 'uid not zero')
|
||||
self.assertTrue(obj['GID'] != 0, 'gid not zero')
|
||||
self.assertEqual(obj['UID'], os.getuid(), 'uid match')
|
||||
self.assertEqual(obj['GID'], os.getgid(), 'gid match')
|
||||
|
||||
self.conf_isolation({"namespaces": {"credential": True}})
|
||||
|
||||
obj = self.isolation.parsejson(self.get()['body'])
|
||||
|
||||
# default uid and gid maps current user to nobody
|
||||
self.assertEqual(obj['UID'], 65534, 'uid nobody')
|
||||
self.assertEqual(obj['GID'], 65534, 'gid nobody')
|
||||
|
||||
self.conf_isolation(
|
||||
{
|
||||
"namespaces": {"credential": True},
|
||||
"uidmap": [
|
||||
{"container": 1000, "host": os.geteuid(), "size": 1}
|
||||
],
|
||||
"gidmap": [
|
||||
{"container": 1000, "host": os.getegid(), "size": 1}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
obj = self.isolation.parsejson(self.get()['body'])
|
||||
|
||||
# default uid and gid maps current user to root
|
||||
self.assertEqual(obj['UID'], 1000, 'uid root')
|
||||
self.assertEqual(obj['GID'], 1000, 'gid root')
|
||||
|
||||
def test_isolation_mnt(self):
|
||||
if not self.isolation_key('mnt'):
|
||||
print('mnt namespace is not supported')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
if not self.isolation_key('unprivileged_userns_clone'):
|
||||
print('unprivileged clone is not available')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
self.load('ns_inspect')
|
||||
self.conf_isolation(
|
||||
{"namespaces": {"mount": True, "credential": True}}
|
||||
)
|
||||
|
||||
obj = self.isolation.parsejson(self.get()['body'])
|
||||
|
||||
# all but user and mnt
|
||||
allns = list(self.available['features']['isolation'].keys())
|
||||
allns.remove('user')
|
||||
allns.remove('mnt')
|
||||
|
||||
for ns in allns:
|
||||
if ns.upper() in obj['NS']:
|
||||
self.assertEqual(
|
||||
obj['NS'][ns.upper()],
|
||||
self.available['features']['isolation'][ns],
|
||||
'%s match' % ns,
|
||||
)
|
||||
|
||||
self.assertNotEqual(
|
||||
obj['NS']['MNT'], self.isolation.getns('mnt'), 'mnt set'
|
||||
)
|
||||
self.assertNotEqual(
|
||||
obj['NS']['USER'], self.isolation.getns('user'), 'user set'
|
||||
)
|
||||
|
||||
def test_isolation_pid(self):
|
||||
if not self.isolation_key('pid'):
|
||||
print('pid namespace is not supported')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
if not self.isolation_key('unprivileged_userns_clone'):
|
||||
print('unprivileged clone is not available')
|
||||
raise unittest.SkipTest()
|
||||
|
||||
self.load('ns_inspect')
|
||||
self.conf_isolation({"namespaces": {"pid": True, "credential": True}})
|
||||
|
||||
obj = self.isolation.parsejson(self.get()['body'])
|
||||
|
||||
self.assertEqual(obj['PID'], 1, 'pid of container is 1')
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
TestGoIsolation.main()
|
||||
Reference in New Issue
Block a user