Tests: Added rootfs tests.

This commit is contained in:
Tiago Natel de Moura
2020-05-28 14:59:52 +01:00
parent e2b53e16c6
commit 08b765ae42
12 changed files with 542 additions and 34 deletions

View File

@@ -21,10 +21,11 @@ type (
} }
Output struct { Output struct {
PID int PID int
UID int UID int
GID int GID int
NS NS NS NS
FileExists bool
} }
) )
@@ -64,6 +65,18 @@ func handler(w http.ResponseWriter, r *http.Request) {
CGROUP: getns("cgroup"), CGROUP: getns("cgroup"),
}, },
} }
err := r.ParseForm()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
if fname := r.Form.Get("file"); fname != "" {
_, err = os.Stat(fname);
out.FileExists = err == nil
}
data, err := json.Marshal(out) data, err := json.Marshal(out)
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)

View File

@@ -0,0 +1,31 @@
import json
import os
try:
# Python 3
from urllib.parse import parse_qs
except ImportError:
# Python 2
from urlparse import parse_qs
def application(environ, start_response):
ret = {
'FileExists': False,
}
d = parse_qs(environ['QUERY_STRING'])
ret['FileExists'] = os.path.exists(d.get('path')[0])
out = json.dumps(ret)
start_response(
'200',
[
('Content-Type', 'application/json'),
('Content-Length', str(len(out))),
],
)
return out.encode('utf-8')

View File

@@ -281,6 +281,52 @@ class TestGoIsolation(TestApplicationGo):
'%s match' % ns, '%s match' % ns,
) )
def test_go_isolation_rootfs_container(self):
if not self.isolation_key('unprivileged_userns_clone'):
print('unprivileged clone is not available')
raise unittest.SkipTest()
if not self.isolation_key('mnt'):
print('mnt namespace is not supported')
raise unittest.SkipTest()
isolation = {
'namespaces': {'mount': True, 'credential': True},
'rootfs': self.testdir,
}
self.load('ns_inspect', isolation=isolation)
obj = self.getjson(url='/?file=/go/app')['body']
self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
obj = self.getjson(url='/?file=/bin/sh')['body']
self.assertEqual(obj['FileExists'], False, 'file should not exists')
def test_go_isolation_rootfs_container_priv(self):
if not self.is_su:
print("requires root")
raise unittest.SkipTest()
if not self.isolation_key('mnt'):
print('mnt namespace is not supported')
raise unittest.SkipTest()
isolation = {
'namespaces': {'mount': True},
'rootfs': self.testdir,
}
self.load('ns_inspect', isolation=isolation)
obj = self.getjson(url='/?file=/go/app')['body']
self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
obj = self.getjson(url='/?file=/bin/sh')['body']
self.assertEqual(obj['FileExists'], False, 'file should not exists')
if __name__ == '__main__': if __name__ == '__main__':
TestGoIsolation.main() TestGoIsolation.main()

View File

@@ -0,0 +1,34 @@
import os
import unittest
from unit.applications.lang.go import TestApplicationGo
class TestGoIsolationRootfs(TestApplicationGo):
prerequisites = {'modules': {'go': 'all'}}
def test_go_isolation_rootfs_chroot(self):
if not self.is_su:
print("requires root")
raise unittest.SkipTest()
if os.uname().sysname == 'Darwin':
print('chroot tests not supported on OSX')
raise unittest.SkipTest()
isolation = {
'rootfs': self.testdir,
}
self.load('ns_inspect', isolation=isolation)
obj = self.getjson(url='/?file=/go/app')['body']
self.assertEqual(obj['FileExists'], True, 'app relative to rootfs')
obj = self.getjson(url='/?file=/bin/sh')['body']
self.assertEqual(obj['FileExists'], False, 'file should not exists')
if __name__ == '__main__':
TestGoIsolationRootfs.main()

View File

@@ -0,0 +1,85 @@
import os
import subprocess
import unittest
from unit.applications.lang.java import TestApplicationJava
class TestJavaIsolationRootfs(TestApplicationJava):
prerequisites = {'modules': {'java': 'all'}}
def setUp(self):
if not self.is_su:
return
super().setUp()
os.makedirs(self.testdir + '/jars')
os.makedirs(self.testdir + '/tmp')
os.chmod(self.testdir + '/tmp', 0o777)
try:
process = subprocess.Popen(
[
"mount",
"--bind",
self.pardir + "/build",
self.testdir + "/jars",
],
stderr=subprocess.STDOUT,
)
process.communicate()
except:
self.fail('Cann\'t run mount process.')
def tearDown(self):
if not self.is_su:
return
try:
process = subprocess.Popen(
["umount", "--lazy", self.testdir + "/jars"],
stderr=subprocess.STDOUT,
)
process.communicate()
except:
self.fail('Cann\'t run mount process.')
# super teardown must happen after unmount to avoid deletion of /build
super().tearDown()
def test_java_isolation_rootfs_chroot_war(self):
if not self.is_su:
print('require root')
raise unittest.SkipTest()
isolation = {
'rootfs': self.testdir,
}
self.load('empty_war', isolation=isolation)
self.assertIn(
'success',
self.conf(
'"/"', '/config/applications/empty_war/working_directory',
),
)
self.assertIn(
'success', self.conf('"/jars"', 'applications/empty_war/unit_jars')
)
self.assertIn(
'success',
self.conf('"/java/empty.war"', 'applications/empty_war/webapp'),
)
self.assertEqual(self.get()['status'], 200, 'war')
if __name__ == '__main__':
TestJavaIsolationRootfs.main()

View File

@@ -0,0 +1,57 @@
import unittest
from unit.applications.lang.php import TestApplicationPHP
from unit.feature.isolation import TestFeatureIsolation
class TestPHPIsolation(TestApplicationPHP):
prerequisites = {'modules': {'php': 'any'}, 'features': ['isolation']}
isolation = TestFeatureIsolation()
@classmethod
def setUpClass(cls, complete_check=True):
unit = super().setUpClass(complete_check=False)
TestFeatureIsolation().check(cls.available, unit.testdir)
return unit if not complete_check else unit.complete()
def test_php_isolation_rootfs(self):
isolation_features = self.available['features']['isolation'].keys()
if 'mnt' not in isolation_features:
print('requires mnt ns')
raise unittest.SkipTest()
if not self.is_su:
if 'user' not in isolation_features:
print('requires unprivileged userns or root')
raise unittest.SkipTest()
if not 'unprivileged_userns_clone' in isolation_features:
print('requires unprivileged userns or root')
raise unittest.SkipTest()
isolation = {
'namespaces': {'credential': not self.is_su, 'mount': True},
'rootfs': self.current_dir,
}
self.load('phpinfo', isolation=isolation)
self.assertIn(
'success', self.conf('"/php/phpinfo"', 'applications/phpinfo/root')
)
self.assertIn(
'success',
self.conf(
'"/php/phpinfo"', 'applications/phpinfo/working_directory'
),
)
self.assertEqual(self.get()['status'], 200, 'empty rootfs')
if __name__ == '__main__':
TestPHPIsolation.main()

View File

@@ -0,0 +1,79 @@
import unittest
from unit.applications.lang.python import TestApplicationPython
from unit.feature.isolation import TestFeatureIsolation
class TestPythonIsolation(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}, 'features': ['isolation']}
isolation = TestFeatureIsolation()
@classmethod
def setUpClass(cls, complete_check=True):
unit = super().setUpClass(complete_check=False)
TestFeatureIsolation().check(cls.available, unit.testdir)
return unit if not complete_check else unit.complete()
def test_python_isolation_rootfs(self):
isolation_features = self.available['features']['isolation'].keys()
if 'mnt' not in isolation_features:
print('requires mnt ns')
raise unittest.SkipTest()
if not self.is_su:
if 'user' not in isolation_features:
print('requires unprivileged userns or root')
raise unittest.SkipTest()
if not 'unprivileged_userns_clone' in isolation_features:
print('requires unprivileged userns or root')
raise unittest.SkipTest()
isolation = {
'namespaces': {'credential': not self.is_su, 'mount': True},
'rootfs': self.testdir,
}
self.load('empty', isolation=isolation)
self.assertEqual(self.get()['status'], 200, 'python rootfs')
self.load('ns_inspect', isolation=isolation)
self.assertEqual(
self.getjson(url='/?path=' + self.testdir)['body']['FileExists'],
False,
'testdir does not exists in rootfs',
)
self.assertEqual(
self.getjson(url='/?path=/proc/self')['body']['FileExists'],
False,
'no /proc/self',
)
self.assertEqual(
self.getjson(url='/?path=/dev/pts')['body']['FileExists'],
False,
'no /dev/pts',
)
self.assertEqual(
self.getjson(url='/?path=/sys/kernel')['body']['FileExists'],
False,
'no /sys/kernel',
)
ret = self.getjson(url='/?path=/app/python/ns_inspect')
self.assertEqual(
ret['body']['FileExists'], True, 'application exists in rootfs',
)
if __name__ == '__main__':
TestPythonIsolation.main()

View File

@@ -0,0 +1,57 @@
import unittest
from unit.applications.lang.python import TestApplicationPython
from unit.feature.isolation import TestFeatureIsolation
class TestPythonIsolation(TestApplicationPython):
prerequisites = {'modules': {'python': 'any'}}
def test_python_isolation_chroot(self):
if not self.is_su:
print('requires root')
raise unittest.SkipTest()
isolation = {
'rootfs': self.testdir,
}
self.load('empty', isolation=isolation)
self.assertEqual(self.get()['status'], 200, 'python chroot')
self.load('ns_inspect', isolation=isolation)
self.assertEqual(
self.getjson(url='/?path=' + self.testdir)['body']['FileExists'],
False,
'testdir does not exists in rootfs',
)
self.assertEqual(
self.getjson(url='/?path=/proc/self')['body']['FileExists'],
False,
'no /proc/self',
)
self.assertEqual(
self.getjson(url='/?path=/dev/pts')['body']['FileExists'],
False,
'no /dev/pts',
)
self.assertEqual(
self.getjson(url='/?path=/sys/kernel')['body']['FileExists'],
False,
'no /sys/kernel',
)
ret = self.getjson(url='/?path=/app/python/ns_inspect')
self.assertEqual(
ret['body']['FileExists'], True, 'application exists in rootfs',
)
if __name__ == '__main__':
TestPythonIsolation.main()

View File

@@ -0,0 +1,71 @@
import os
import shutil
import unittest
from unit.applications.lang.ruby import TestApplicationRuby
from unit.feature.isolation import TestFeatureIsolation
class TestRubyIsolation(TestApplicationRuby):
prerequisites = {'modules': {'ruby': 'any'}, 'features': ['isolation']}
isolation = TestFeatureIsolation()
@classmethod
def setUpClass(cls, complete_check=True):
unit = super().setUpClass(complete_check=False)
TestFeatureIsolation().check(cls.available, unit.testdir)
return unit if not complete_check else unit.complete()
def test_ruby_isolation_rootfs(self):
isolation_features = self.available['features']['isolation'].keys()
if 'mnt' not in isolation_features:
print('requires mnt ns')
raise unittest.SkipTest()
if not self.is_su:
if 'user' not in isolation_features:
print('requires unprivileged userns or root')
raise unittest.SkipTest()
if not 'unprivileged_userns_clone' in isolation_features:
print('requires unprivileged userns or root')
raise unittest.SkipTest()
os.mkdir(self.testdir + '/ruby')
shutil.copytree(
self.current_dir + '/ruby/status_int',
self.testdir + '/ruby/status_int',
)
isolation = {
'namespaces': {'credential': not self.is_su, 'mount': True},
'rootfs': self.testdir,
}
self.load('status_int', isolation=isolation)
self.assertIn(
'success',
self.conf(
'"/ruby/status_int/config.ru"',
'applications/status_int/script',
),
)
self.assertIn(
'success',
self.conf(
'"/ruby/status_int"',
'applications/status_int/working_directory',
),
)
self.assertEqual(self.get()['status'], 200, 'status int')
if __name__ == '__main__':
TestRubyIsolation.main()

View File

@@ -19,26 +19,36 @@ class TestApplicationGo(TestApplicationProto):
return unit if not complete_check else unit.complete() return unit if not complete_check else unit.complete()
def prepare_env(self, script, name): def prepare_env(self, script, name, static=False):
if not os.path.exists(self.testdir + '/go'): if not os.path.exists(self.testdir + '/go'):
os.mkdir(self.testdir + '/go') os.mkdir(self.testdir + '/go')
env = os.environ.copy() env = os.environ.copy()
env['GOPATH'] = self.pardir + '/build/go' env['GOPATH'] = self.pardir + '/build/go'
try: if static:
process = subprocess.Popen( args = [
[ 'go',
'go', 'build',
'build', '-tags',
'-o', 'netgo',
self.testdir + '/go/' + name, '-ldflags',
self.current_dir + '/go/' + script + '/' + name + '.go', '-extldflags "-static"',
], '-o',
env=env, self.testdir + '/go/' + name,
stderr=subprocess.STDOUT, self.current_dir + '/go/' + script + '/' + name + '.go',
) ]
else:
args = [
'go',
'build',
'-o',
self.testdir + '/go/' + name,
self.current_dir + '/go/' + script + '/' + name + '.go',
]
try:
process = subprocess.Popen(args, env=env)
process.communicate() process.communicate()
except: except:
@@ -47,21 +57,28 @@ class TestApplicationGo(TestApplicationProto):
return process return process
def load(self, script, name='app', **kwargs): def load(self, script, name='app', **kwargs):
self.prepare_env(script, name) static_build = False
self._load_conf( wdir = self.current_dir + "/go/" + script
{ executable = self.testdir + "/go/" + name
"listeners": {"*:7080": {"pass": "applications/" + script}},
"applications": { if 'isolation' in kwargs and 'rootfs' in kwargs['isolation']:
script: { wdir = "/go/"
"type": "external", executable = "/go/" + name
"processes": {"spare": 0}, static_build = True
"working_directory": self.current_dir
+ "/go/" self.prepare_env(script, name, static=static_build)
+ script,
"executable": self.testdir + "/go/" + name, conf = {
} "listeners": {"*:7080": {"pass": "applications/" + script}},
"applications": {
script: {
"type": "external",
"processes": {"spare": 0},
"working_directory": wdir,
"executable": executable,
}, },
}, },
**kwargs }
)
self._load_conf(conf, **kwargs)

View File

@@ -1,3 +1,6 @@
import shutil
import os
from unit.applications.proto import TestApplicationProto from unit.applications.proto import TestApplicationProto
@@ -8,7 +11,21 @@ class TestApplicationPython(TestApplicationProto):
if name is None: if name is None:
name = script name = script
script_path = self.current_dir + '/python/' + script if script[0] == '/':
script_path = script
else:
script_path = self.current_dir + '/python/' + script
if kwargs.get('isolation') and kwargs['isolation'].get('rootfs'):
rootfs = kwargs['isolation']['rootfs']
if not os.path.exists(rootfs + '/app/python/'):
os.makedirs(rootfs + '/app/python/')
if not os.path.exists(rootfs + '/app/python/' + name):
shutil.copytree(script_path, rootfs + '/app/python/' + name)
script_path = '/app/python/' + name
self._load_conf( self._load_conf(
{ {

View File

@@ -58,6 +58,7 @@ class TestUnit(unittest.TestCase):
if prereq_version == 'all': if prereq_version == 'all':
for version in available_versions: for version in available_versions:
self.application_type = type + ' ' + version self.application_type = type + ' ' + version
self.application_version = version
super().run(result) super().run(result)
elif prereq_version == 'any': elif prereq_version == 'any':
self.application_type = type + ' ' + available_versions[0] self.application_type = type + ' ' + available_versions[0]
@@ -165,7 +166,7 @@ class TestUnit(unittest.TestCase):
self._run() self._run()
def _run(self): def _run(self):
build_dir = self.pardir + '/build' build_dir = os.path.join(self.pardir, 'build')
self.unitd = build_dir + '/unitd' self.unitd = build_dir + '/unitd'
if not os.path.isfile(self.unitd): if not os.path.isfile(self.unitd):