Class usage came from the unittest framework and it was always redundant after migration to the pytest. This commit removes classes from files containing tests to make them more readable and understandable.
203 lines
5.4 KiB
Python
203 lines
5.4 KiB
Python
import socket
|
|
|
|
import pytest
|
|
|
|
pytest.importorskip('OpenSSL.SSL')
|
|
from OpenSSL.SSL import (
|
|
TLSv1_2_METHOD,
|
|
Context,
|
|
Connection,
|
|
_lib,
|
|
)
|
|
from unit.applications.tls import ApplicationTLS
|
|
|
|
prerequisites = {'modules': {'openssl': 'any'}}
|
|
|
|
client = ApplicationTLS()
|
|
|
|
TICKET = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE'
|
|
TICKET2 = '5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt'
|
|
TICKET80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\
|
|
49TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw='
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def setup_method_fixture():
|
|
client.certificate()
|
|
|
|
listener_conf = {
|
|
"pass": "routes",
|
|
"tls": {
|
|
"certificate": "default",
|
|
"session": {"cache_size": 0, "tickets": True},
|
|
},
|
|
}
|
|
|
|
assert 'success' in client.conf(
|
|
{
|
|
"listeners": {
|
|
"*:7080": listener_conf,
|
|
"*:7081": listener_conf,
|
|
"*:7082": listener_conf,
|
|
},
|
|
"routes": [{"action": {"return": 200}}],
|
|
"applications": {},
|
|
}
|
|
), 'load application configuration'
|
|
|
|
|
|
def connect(ctx=None, session=None, port=7080):
|
|
sock = socket.create_connection(('127.0.0.1', port))
|
|
|
|
if ctx is None:
|
|
ctx = Context(TLSv1_2_METHOD)
|
|
|
|
conn = Connection(ctx, sock)
|
|
conn.set_connect_state()
|
|
|
|
if session is not None:
|
|
conn.set_session(session)
|
|
|
|
conn.do_handshake()
|
|
conn.shutdown()
|
|
|
|
return (
|
|
conn.get_session(),
|
|
ctx,
|
|
_lib.SSL_session_reused(conn._ssl),
|
|
)
|
|
|
|
|
|
def has_ticket(sess):
|
|
return _lib.SSL_SESSION_has_ticket(sess._session)
|
|
|
|
|
|
def set_tickets(tickets=True, port=7080):
|
|
assert 'success' in client.conf(
|
|
{"cache_size": 0, "tickets": tickets},
|
|
f'listeners/*:{port}/tls/session',
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
|
|
reason='ticket check is not supported',
|
|
)
|
|
def test_tls_ticket():
|
|
sess, ctx, reused = connect()
|
|
assert has_ticket(sess), 'tickets True'
|
|
assert not reused, 'tickets True not reused'
|
|
|
|
sess, ctx, reused = connect(ctx, sess)
|
|
assert has_ticket(sess), 'tickets True reconnect'
|
|
assert reused, 'tickets True reused'
|
|
|
|
set_tickets(tickets=False)
|
|
|
|
sess, _, _ = connect()
|
|
assert not has_ticket(sess), 'tickets False'
|
|
|
|
assert 'success' in client.conf_delete(
|
|
'listeners/*:7080/tls/session/tickets'
|
|
), 'tickets default configure'
|
|
|
|
sess, _, _ = connect()
|
|
assert not has_ticket(sess), 'tickets default (false)'
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
|
|
reason='ticket check is not supported',
|
|
)
|
|
def test_tls_ticket_string():
|
|
set_tickets(TICKET)
|
|
sess, ctx, _ = connect()
|
|
assert has_ticket(sess), 'tickets string'
|
|
|
|
sess2, _, reused = connect(ctx, sess)
|
|
assert has_ticket(sess2), 'tickets string reconnect'
|
|
assert reused, 'tickets string reused'
|
|
|
|
sess2, _, reused = connect(ctx, sess, port=7081)
|
|
assert has_ticket(sess2), 'connect True'
|
|
assert not reused, 'connect True not reused'
|
|
|
|
set_tickets(TICKET2, port=7081)
|
|
|
|
sess2, _, reused = connect(ctx, sess, port=7081)
|
|
assert has_ticket(sess2), 'wrong ticket'
|
|
assert not reused, 'wrong ticket not reused'
|
|
|
|
set_tickets(TICKET80)
|
|
|
|
sess, ctx, _ = connect()
|
|
assert has_ticket(sess), 'tickets string 80'
|
|
|
|
sess2, _, reused = connect(ctx, sess)
|
|
assert has_ticket(sess2), 'tickets string 80 reconnect'
|
|
assert reused, 'tickets string 80 reused'
|
|
|
|
sess2, _, reused = connect(ctx, sess, port=7081)
|
|
assert has_ticket(sess2), 'wrong ticket 80'
|
|
assert not reused, 'wrong ticket 80 not reused'
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
|
|
reason='ticket check is not supported',
|
|
)
|
|
def test_tls_ticket_array():
|
|
set_tickets([])
|
|
|
|
sess, ctx, _ = connect()
|
|
assert not has_ticket(sess), 'tickets array empty'
|
|
|
|
set_tickets([TICKET, TICKET2])
|
|
set_tickets(TICKET, port=7081)
|
|
set_tickets(TICKET2, port=7082)
|
|
|
|
sess, ctx, _ = connect()
|
|
_, _, reused = connect(ctx, sess, port=7081)
|
|
assert not reused, 'not last ticket'
|
|
_, _, reused = connect(ctx, sess, port=7082)
|
|
assert reused, 'last ticket'
|
|
|
|
sess, ctx, _ = connect(port=7081)
|
|
_, _, reused = connect(ctx, sess)
|
|
assert reused, 'first ticket'
|
|
|
|
sess, ctx, _ = connect(port=7082)
|
|
_, _, reused = connect(ctx, sess)
|
|
assert reused, 'second ticket'
|
|
|
|
assert 'success' in client.conf_delete(
|
|
'listeners/*:7080/tls/session/tickets/0'
|
|
), 'removed first ticket'
|
|
assert 'success' in client.conf_post(
|
|
f'"{TICKET}"', 'listeners/*:7080/tls/session/tickets'
|
|
), 'add new ticket to the end of array'
|
|
|
|
sess, ctx, _ = connect()
|
|
_, _, reused = connect(ctx, sess, port=7082)
|
|
assert not reused, 'not last ticket 2'
|
|
_, _, reused = connect(ctx, sess, port=7081)
|
|
assert reused, 'last ticket 2'
|
|
|
|
|
|
def test_tls_ticket_invalid():
|
|
def check_tickets(tickets):
|
|
assert 'error' in client.conf(
|
|
{"tickets": tickets},
|
|
'listeners/*:7080/tls/session',
|
|
)
|
|
|
|
check_tickets({})
|
|
check_tickets('!?&^' * 16)
|
|
check_tickets(f'{TICKET[:-2]}!{TICKET[3:]}')
|
|
check_tickets(TICKET[:-1])
|
|
check_tickets(f'{TICKET}b')
|
|
check_tickets(f'{TICKET}blah')
|
|
check_tickets([True, TICKET, TICKET2])
|
|
check_tickets([TICKET, 'blah', TICKET2])
|
|
check_tickets([TICKET, TICKET2, []])
|