Tests: TLS tests.

This commit is contained in:
Andrey Zelenkov
2018-09-20 16:34:34 +03:00
parent e4650b7412
commit d0e428aace
2 changed files with 505 additions and 4 deletions

View File

@@ -1,5 +1,6 @@
import os
import re
import ssl
import sys
import json
import time
@@ -67,6 +68,19 @@ class TestUnit(unittest.TestCase):
except:
m = None
elif module == 'openssl':
try:
subprocess.check_output(['which', 'openssl'])
output = subprocess.check_output([
self.pardir + '/build/unitd', '--version'],
stderr=subprocess.STDOUT)
m = re.search('--openssl', output.decode())
except:
m = None
else:
m = re.search('module: ' + module, log)
@@ -192,6 +206,7 @@ class TestUnitHTTP(TestUnit):
port = 7080 if 'port' not in kwargs else kwargs['port']
url = '/' if 'url' not in kwargs else kwargs['url']
http = 'HTTP/1.0' if 'http_10' in kwargs else 'HTTP/1.1'
blocking = False if 'blocking' not in kwargs else kwargs['blocking']
headers = ({
'Host': 'localhost',
@@ -215,6 +230,9 @@ class TestUnitHTTP(TestUnit):
if 'sock' not in kwargs:
sock = socket.socket(sock_types[sock_type], socket.SOCK_STREAM)
if 'wrapper' in kwargs:
sock = kwargs['wrapper'](sock)
connect_args = addr if sock_type == 'unix' else (addr, port)
try:
sock.connect(connect_args)
@@ -222,11 +240,11 @@ class TestUnitHTTP(TestUnit):
sock.close()
return None
sock.setblocking(blocking)
else:
sock = kwargs['sock']
sock.setblocking(False)
if 'raw' not in kwargs:
req = ' '.join([start_str, url, http]) + crlf
@@ -371,8 +389,8 @@ class TestUnitApplicationProto(TestUnitControl):
def sec_epoch(self):
return time.mktime(time.gmtime())
def date_to_sec_epoch(self, date):
return time.mktime(time.strptime(date, '%a, %d %b %Y %H:%M:%S %Z'))
def date_to_sec_epoch(self, date, template='%a, %d %b %Y %H:%M:%S %Z'):
return time.mktime(time.strptime(date, template))
def search_in_log(self, pattern):
with open(self.testdir + '/unit.log', 'r', errors='ignore') as f:
@@ -484,3 +502,69 @@ class TestUnitApplicationPerl(TestUnitApplicationProto):
}
}
})
class TestUnitApplicationTLS(TestUnitApplicationProto):
def __init__(self, test):
super().__init__(test)
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_NONE
def certificate(self, name='default', load=True):
subprocess.call(['openssl', 'req', '-x509', '-new', '-config',
self.testdir + '/openssl.conf', '-subj', '/CN=' + name + '/',
'-out', self.testdir + '/' + name + '.crt',
'-keyout', self.testdir + '/' + name + '.key'])
if load:
self.certificate_load(name)
def certificate_load(self, crt, key=None):
if key is None:
key = crt
with open(self.testdir + '/' + key + '.key', 'rb') as k, \
open(self.testdir + '/' + crt + '.crt', 'rb') as c:
return self.conf(k.read() + c.read(), '/certificates/' + crt)
def get_ssl(self, **kwargs):
return self.get(blocking=True, wrapper=self.context.wrap_socket,
**kwargs)
def post_ssl(self, **kwargs):
return self.post(blocking=True, wrapper=self.context.wrap_socket,
**kwargs)
def get_server_certificate(self, addr=('127.0.0.1', 7080)):
return ssl.get_server_certificate(addr)
def load(self, script, name=None):
if name is None:
name = script
# create default openssl configuration
with open(self.testdir + '/openssl.conf', 'w') as f:
f.write("""[ req ]
default_bits = 1024
encrypt_key = no
distinguished_name = req_distinguished_name
[ req_distinguished_name ]""")
self.conf({
"listeners": {
"*:7080": {
"application": name
}
},
"applications": {
name: {
"type": "python",
"processes": { "spare": 0 },
"path": self.current_dir + '/python/' + script,
"working_directory": self.current_dir + '/python/' + script,
"module": "wsgi"
}
}
})