Tests: switched to using f-strings.
Previously, it was necessary to support older versions of Python for compatibility. F-strings were released in Python 3.6. Python 3.5 was marked as unsupported by the end of 2020, so now it's possible to start using f-strings safely for better readability and performance.
This commit is contained in:
@@ -17,19 +17,19 @@ class TestTLS(TestApplicationTLS):
|
||||
def add_tls(self, application='empty', cert='default', port=7080):
|
||||
assert 'success' in self.conf(
|
||||
{
|
||||
"pass": "applications/" + application,
|
||||
"pass": f"applications/{application}",
|
||||
"tls": {"certificate": cert},
|
||||
},
|
||||
'listeners/*:' + str(port),
|
||||
f'listeners/*:{port}',
|
||||
)
|
||||
|
||||
def remove_tls(self, application='empty', port=7080):
|
||||
assert 'success' in self.conf(
|
||||
{"pass": "applications/" + application}, 'listeners/*:' + str(port)
|
||||
{"pass": f"applications/{application}"}, f'listeners/*:{port}'
|
||||
)
|
||||
|
||||
def req(self, name='localhost', subject=None, x509=False):
|
||||
subj = subject if subject is not None else '/CN=' + name + '/'
|
||||
subj = subject if subject is not None else f'/CN={name}/'
|
||||
|
||||
subprocess.check_output(
|
||||
[
|
||||
@@ -39,27 +39,27 @@ class TestTLS(TestApplicationTLS):
|
||||
'-subj',
|
||||
subj,
|
||||
'-config',
|
||||
option.temp_dir + '/openssl.conf',
|
||||
f'{option.temp_dir}/openssl.conf',
|
||||
'-out',
|
||||
option.temp_dir + '/' + name + '.csr',
|
||||
f'{option.temp_dir}/{name}.csr',
|
||||
'-keyout',
|
||||
option.temp_dir + '/' + name + '.key',
|
||||
f'{option.temp_dir}/{name}.key',
|
||||
],
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
def generate_ca_conf(self):
|
||||
with open(option.temp_dir + '/ca.conf', 'w') as f:
|
||||
with open(f'{option.temp_dir}/ca.conf', 'w') as f:
|
||||
f.write(
|
||||
"""[ ca ]
|
||||
f"""[ ca ]
|
||||
default_ca = myca
|
||||
|
||||
[ myca ]
|
||||
new_certs_dir = %(dir)s
|
||||
database = %(database)s
|
||||
new_certs_dir = {option.temp_dir}
|
||||
database = {option.temp_dir}/certindex
|
||||
default_md = sha256
|
||||
policy = myca_policy
|
||||
serial = %(certserial)s
|
||||
serial = {option.temp_dir}/certserial
|
||||
default_days = 1
|
||||
x509_extensions = myca_extensions
|
||||
copy_extensions = copy
|
||||
@@ -69,20 +69,15 @@ commonName = optional
|
||||
|
||||
[ myca_extensions ]
|
||||
basicConstraints = critical,CA:TRUE"""
|
||||
% {
|
||||
'dir': option.temp_dir,
|
||||
'database': option.temp_dir + '/certindex',
|
||||
'certserial': option.temp_dir + '/certserial',
|
||||
}
|
||||
)
|
||||
|
||||
with open(option.temp_dir + '/certserial', 'w') as f:
|
||||
with open(f'{option.temp_dir}/certserial', 'w') as f:
|
||||
f.write('1000')
|
||||
|
||||
with open(option.temp_dir + '/certindex', 'w') as f:
|
||||
with open(f'{option.temp_dir}/certindex', 'w') as f:
|
||||
f.write('')
|
||||
|
||||
with open(option.temp_dir + '/certindex.attr', 'w') as f:
|
||||
with open(f'{option.temp_dir}/certindex.attr', 'w') as f:
|
||||
f.write('')
|
||||
|
||||
def ca(self, cert='root', out='localhost'):
|
||||
@@ -92,15 +87,15 @@ basicConstraints = critical,CA:TRUE"""
|
||||
'ca',
|
||||
'-batch',
|
||||
'-config',
|
||||
option.temp_dir + '/ca.conf',
|
||||
f'{option.temp_dir}/ca.conf',
|
||||
'-keyfile',
|
||||
option.temp_dir + '/' + cert + '.key',
|
||||
f'{option.temp_dir}/{cert}.key',
|
||||
'-cert',
|
||||
option.temp_dir + '/' + cert + '.crt',
|
||||
f'{option.temp_dir}/{cert}.crt',
|
||||
'-in',
|
||||
option.temp_dir + '/' + out + '.csr',
|
||||
f'{option.temp_dir}/{out}.csr',
|
||||
'-out',
|
||||
option.temp_dir + '/' + out + '.crt',
|
||||
f'{option.temp_dir}/{out}.crt',
|
||||
],
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
@@ -109,9 +104,7 @@ basicConstraints = critical,CA:TRUE"""
|
||||
self.context = ssl.create_default_context()
|
||||
self.context.check_hostname = False
|
||||
self.context.verify_mode = ssl.CERT_REQUIRED
|
||||
self.context.load_verify_locations(
|
||||
option.temp_dir + '/' + cert + '.crt'
|
||||
)
|
||||
self.context.load_verify_locations(f'{option.temp_dir}/{cert}.crt')
|
||||
|
||||
def test_tls_listener_option_add(self):
|
||||
self.load('empty')
|
||||
@@ -230,7 +223,7 @@ basicConstraints = critical,CA:TRUE"""
|
||||
'-noout',
|
||||
'-genkey',
|
||||
'-out',
|
||||
temp_dir + '/ec.key',
|
||||
f'{temp_dir}/ec.key',
|
||||
'-name',
|
||||
'prime256v1',
|
||||
],
|
||||
@@ -246,11 +239,11 @@ basicConstraints = critical,CA:TRUE"""
|
||||
'-subj',
|
||||
'/CN=ec/',
|
||||
'-config',
|
||||
temp_dir + '/openssl.conf',
|
||||
f'{temp_dir}/openssl.conf',
|
||||
'-key',
|
||||
temp_dir + '/ec.key',
|
||||
f'{temp_dir}/ec.key',
|
||||
'-out',
|
||||
temp_dir + '/ec.crt',
|
||||
f'{temp_dir}/ec.crt',
|
||||
],
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
@@ -305,9 +298,9 @@ basicConstraints = critical,CA:TRUE"""
|
||||
self.ca(cert='root', out='int')
|
||||
self.ca(cert='int', out='end')
|
||||
|
||||
crt_path = temp_dir + '/end-int.crt'
|
||||
end_path = temp_dir + '/end.crt'
|
||||
int_path = temp_dir + '/int.crt'
|
||||
crt_path = f'{temp_dir}/end-int.crt'
|
||||
end_path = f'{temp_dir}/end.crt'
|
||||
int_path = f'{temp_dir}/int.crt'
|
||||
|
||||
with open(crt_path, 'wb') as crt, open(end_path, 'rb') as end, open(
|
||||
int_path, 'rb'
|
||||
@@ -400,22 +393,24 @@ basicConstraints = critical,CA:TRUE"""
|
||||
elif i == chain_length - 1:
|
||||
self.req('end')
|
||||
else:
|
||||
self.req('int{}'.format(i))
|
||||
self.req(f'int{i}')
|
||||
|
||||
for i in range(chain_length - 1):
|
||||
if i == 0:
|
||||
self.ca(cert='root', out='int1')
|
||||
elif i == chain_length - 2:
|
||||
self.ca(cert='int{}'.format(chain_length - 2), out='end')
|
||||
self.ca(cert=f'int{(chain_length - 2)}', out='end')
|
||||
else:
|
||||
self.ca(cert='int{}'.format(i), out='int{}'.format(i + 1))
|
||||
self.ca(cert=f'int{i}', out=f'int{(i + 1)}')
|
||||
|
||||
for i in range(chain_length - 1, 0, -1):
|
||||
path = temp_dir + (
|
||||
'/end.crt' if i == chain_length - 1 else '/int{}.crt'.format(i)
|
||||
path = (
|
||||
f'{temp_dir}/end.crt'
|
||||
if i == chain_length - 1
|
||||
else f'{temp_dir}/int{i}.crt'
|
||||
)
|
||||
|
||||
with open(temp_dir + '/all.crt', 'a') as chain, open(path) as cert:
|
||||
with open(f'{temp_dir}/all.crt', 'a') as chain, open(path) as cert:
|
||||
chain.write(cert.read())
|
||||
|
||||
self.set_certificate_req_context()
|
||||
@@ -611,10 +606,10 @@ basicConstraints = critical,CA:TRUE"""
|
||||
|
||||
subprocess.check_output(['kill', '-9', app_id])
|
||||
|
||||
skip_alert(r'process %s exited on signal 9' % app_id)
|
||||
skip_alert(fr'process {app_id} exited on signal 9')
|
||||
|
||||
self.wait_for_record(
|
||||
r' (?!' + app_id + r'#)(\d+)#\d+ "mirror" application started'
|
||||
fr' (?!{app_id}#)(\d+)#\d+ "mirror" application started'
|
||||
)
|
||||
|
||||
resp = self.post_ssl(sock=sock, body='0123456789')
|
||||
@@ -673,7 +668,7 @@ basicConstraints = critical,CA:TRUE"""
|
||||
}
|
||||
)
|
||||
assert res['status'] == 200, 'status ok'
|
||||
assert res['body'] == filename + data
|
||||
assert res['body'] == f'{filename}{data}'
|
||||
|
||||
def test_tls_multi_listener(self):
|
||||
self.load('empty')
|
||||
|
||||
Reference in New Issue
Block a user