Tests: added test for TLS with empty Subject field.

This commit is contained in:
Andrei Zeliankou
2021-05-07 17:42:48 +01:00
parent b9e8d8073c
commit a17f7e03d4

View File

@@ -6,6 +6,7 @@ import subprocess
import pytest import pytest
from unit.applications.tls import TestApplicationTLS from unit.applications.tls import TestApplicationTLS
from unit.option import option
class TestTLS(TestApplicationTLS): class TestTLS(TestApplicationTLS):
@@ -28,6 +29,91 @@ class TestTLS(TestApplicationTLS):
{"pass": "applications/" + application}, 'listeners/*:' + str(port) {"pass": "applications/" + application}, 'listeners/*:' + str(port)
) )
def req(self, name='localhost', subject=None, x509=False):
subj = subject if subject is not None else '/CN=' + name + '/'
subprocess.call(
[
'openssl',
'req',
'-new',
'-subj',
subj,
'-config',
option.temp_dir + '/openssl.conf',
'-out',
option.temp_dir + '/' + name + '.csr',
'-keyout',
option.temp_dir + '/' + name + '.key',
],
stderr=subprocess.STDOUT,
)
def generate_ca_conf(self):
with open(option.temp_dir + '/ca.conf', 'w') as f:
f.write(
"""[ ca ]
default_ca = myca
[ myca ]
new_certs_dir = %(dir)s
database = %(database)s
default_md = sha256
policy = myca_policy
serial = %(certserial)s
default_days = 1
x509_extensions = myca_extensions
copy_extensions = copy
[ myca_policy ]
commonName = optional
[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
% {
'dir': option.temp_dir,
'database': option.temp_dir + '/certindex',
'certserial': option.temp_dir + '/certserial',
}
)
with open(option.temp_dir + '/certserial', 'w') as f:
f.write('1000')
with open(option.temp_dir + '/certindex', 'w') as f:
f.write('')
with open(option.temp_dir + '/certindex.attr', 'w') as f:
f.write('')
def ca(self, cert='root', out='localhost'):
subprocess.call(
[
'openssl',
'ca',
'-batch',
'-config',
option.temp_dir + '/ca.conf',
'-keyfile',
option.temp_dir + '/' + cert + '.key',
'-cert',
option.temp_dir + '/' + cert + '.crt',
'-in',
option.temp_dir + '/' + out + '.csr',
'-out',
option.temp_dir + '/' + out + '.crt',
],
stderr=subprocess.STDOUT,
)
def set_certificate_req_context(self, cert='root'):
self.context = ssl.create_default_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_REQUIRED
self.context.load_verify_locations(
option.temp_dir + '/' + cert + '.crt'
)
def test_tls_listener_option_add(self): def test_tls_listener_option_add(self):
self.load('empty') self.load('empty')
@@ -208,113 +294,13 @@ class TestTLS(TestApplicationTLS):
self.certificate('root', False) self.certificate('root', False)
subprocess.call( self.req('int')
[ self.req('end')
'openssl',
'req',
'-new',
'-subj',
'/CN=int/',
'-config',
temp_dir + '/openssl.conf',
'-out',
temp_dir + '/int.csr',
'-keyout',
temp_dir + '/int.key',
],
stderr=subprocess.STDOUT,
)
subprocess.call( self.generate_ca_conf()
[
'openssl',
'req',
'-new',
'-subj',
'/CN=end/',
'-config',
temp_dir + '/openssl.conf',
'-out',
temp_dir + '/end.csr',
'-keyout',
temp_dir + '/end.key',
],
stderr=subprocess.STDOUT,
)
with open(temp_dir + '/ca.conf', 'w') as f: self.ca(cert='root', out='int')
f.write( self.ca(cert='int', out='end')
"""[ ca ]
default_ca = myca
[ myca ]
new_certs_dir = %(dir)s
database = %(database)s
default_md = sha256
policy = myca_policy
serial = %(certserial)s
default_days = 1
x509_extensions = myca_extensions
[ myca_policy ]
commonName = supplied
[ myca_extensions ]
basicConstraints = critical,CA:TRUE"""
% {
'dir': temp_dir,
'database': temp_dir + '/certindex',
'certserial': temp_dir + '/certserial',
}
)
with open(temp_dir + '/certserial', 'w') as f:
f.write('1000')
with open(temp_dir + '/certindex', 'w') as f:
f.write('')
subprocess.call(
[
'openssl',
'ca',
'-batch',
'-subj',
'/CN=int/',
'-config',
temp_dir + '/ca.conf',
'-keyfile',
temp_dir + '/root.key',
'-cert',
temp_dir + '/root.crt',
'-in',
temp_dir + '/int.csr',
'-out',
temp_dir + '/int.crt',
],
stderr=subprocess.STDOUT,
)
subprocess.call(
[
'openssl',
'ca',
'-batch',
'-subj',
'/CN=end/',
'-config',
temp_dir + '/ca.conf',
'-keyfile',
temp_dir + '/int.key',
'-cert',
temp_dir + '/int.crt',
'-in',
temp_dir + '/end.csr',
'-out',
temp_dir + '/end.crt',
],
stderr=subprocess.STDOUT,
)
crt_path = temp_dir + '/end-int.crt' crt_path = temp_dir + '/end-int.crt'
end_path = temp_dir + '/end.crt' end_path = temp_dir + '/end.crt'
@@ -325,10 +311,7 @@ basicConstraints = critical,CA:TRUE"""
) as int: ) as int:
crt.write(end.read() + int.read()) crt.write(end.read() + int.read())
self.context = ssl.create_default_context() self.set_certificate_req_context()
self.context.check_hostname = False
self.context.verify_mode = ssl.CERT_REQUIRED
self.context.load_verify_locations(temp_dir + '/root.crt')
# incomplete chain # incomplete chain
@@ -402,6 +385,44 @@ basicConstraints = critical,CA:TRUE"""
self.get_ssl()['status'] == 200 self.get_ssl()['status'] == 200
), 'certificate chain intermediate server' ), 'certificate chain intermediate server'
def test_tls_certificate_empty_cn(self, temp_dir):
self.certificate('root', False)
self.req(subject='/')
self.generate_ca_conf()
self.ca()
self.set_certificate_req_context()
assert 'success' in self.certificate_load('localhost', 'localhost')
cert = self.conf_get('/certificates/localhost')
assert cert['chain'][0]['subject'] == {}, 'empty subject'
assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
def test_tls_certificate_empty_cn_san(self, temp_dir):
self.certificate('root', False)
self.openssl_conf(
rewrite=True, alt_names=["example.com", "www.example.net"]
)
self.req(subject='/')
self.generate_ca_conf()
self.ca()
self.set_certificate_req_context()
assert 'success' in self.certificate_load('localhost', 'localhost')
cert = self.conf_get('/certificates/localhost')
assert cert['chain'][0]['subject'] == {
'alt_names': ['example.com', 'www.example.net']
}, 'subject alt_names'
assert cert['chain'][0]['issuer']['common_name'] == 'root', 'issuer'
@pytest.mark.skip('not yet') @pytest.mark.skip('not yet')
def test_tls_reconfigure(self): def test_tls_reconfigure(self):
self.load('empty') self.load('empty')