Tests: added tests for TLS session tickets.
This commit is contained in:
195
test/test_tls_tickets.py
Normal file
195
test/test_tls_tickets.py
Normal file
@@ -0,0 +1,195 @@
|
||||
import socket
|
||||
|
||||
import pytest
|
||||
from OpenSSL.SSL import (
|
||||
TLSv1_2_METHOD,
|
||||
Context,
|
||||
Connection,
|
||||
Session,
|
||||
_lib,
|
||||
)
|
||||
from unit.applications.tls import TestApplicationTLS
|
||||
from unit.option import option
|
||||
|
||||
|
||||
class TestTLSTicket(TestApplicationTLS):
|
||||
prerequisites = {'modules': {'openssl': 'any'}}
|
||||
|
||||
ticket = 'U1oDTh11mMxODuw12gS0EXX1E/PkZG13cJNQ6m5+6BGlfPTjNlIEw7PSVU3X1gTE'
|
||||
ticket2 = (
|
||||
'5AV0DSYIYbZWZQB7fCnTHZmMxtotb/aXjam+n2XS79lTvX3Tq9xGqpC8XKNEF2lt'
|
||||
)
|
||||
ticket80 = '6Pfil8lv/k8zf8MndPpfXaO5EAV6dhME6zs6CfUyq2yziynQwSywtKQMqHGnJ2HR\
|
||||
49TZXi/Y4/8RSIO7QPsU51/HLR1gWIMhVM2m9yh93Bw='
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_method_fixture(self, request):
|
||||
self.certificate()
|
||||
|
||||
listener_conf = {
|
||||
"pass": "routes",
|
||||
"tls": {
|
||||
"certificate": "default",
|
||||
"session": {"cache_size": 0, "tickets": True},
|
||||
},
|
||||
}
|
||||
|
||||
assert 'success' in self.conf(
|
||||
{
|
||||
"listeners": {
|
||||
"*:7080": listener_conf,
|
||||
"*:7081": listener_conf,
|
||||
"*:7082": listener_conf,
|
||||
},
|
||||
"routes": [{"action": {"return": 200}}],
|
||||
"applications": {},
|
||||
}
|
||||
), 'load application configuration'
|
||||
|
||||
def set_tickets(self, tickets=True, port=7080):
|
||||
assert 'success' in self.conf(
|
||||
{"cache_size": 0, "tickets": tickets},
|
||||
'listeners/*:' + str(port) + '/tls/session',
|
||||
)
|
||||
|
||||
def connect(self, ctx=None, session=None, port=7080):
|
||||
sock = socket.create_connection(('127.0.0.1', port))
|
||||
|
||||
if ctx is None:
|
||||
ctx = Context(TLSv1_2_METHOD)
|
||||
|
||||
client = Connection(ctx, sock)
|
||||
client.set_connect_state()
|
||||
|
||||
if session is not None:
|
||||
client.set_session(session)
|
||||
|
||||
client.do_handshake()
|
||||
client.shutdown()
|
||||
|
||||
return (
|
||||
client.get_session(),
|
||||
ctx,
|
||||
_lib.SSL_session_reused(client._ssl),
|
||||
)
|
||||
|
||||
def has_ticket(self, sess):
|
||||
return _lib.SSL_SESSION_has_ticket(sess._session)
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
|
||||
reason='ticket check is not supported',
|
||||
)
|
||||
def test_tls_ticket(self):
|
||||
sess, ctx, reused = self.connect()
|
||||
assert self.has_ticket(sess), 'tickets True'
|
||||
assert not reused, 'tickets True not reused'
|
||||
|
||||
sess, ctx, reused = self.connect(ctx, sess)
|
||||
assert self.has_ticket(sess), 'tickets True reconnect'
|
||||
assert reused, 'tickets True reused'
|
||||
|
||||
self.set_tickets(tickets=False)
|
||||
|
||||
sess, _, _ = self.connect()
|
||||
assert not self.has_ticket(sess), 'tickets False'
|
||||
|
||||
assert 'success' in self.conf_delete(
|
||||
'listeners/*:7080/tls/session/tickets'
|
||||
), 'tickets default configure'
|
||||
|
||||
sess, _, _ = self.connect()
|
||||
assert not self.has_ticket(sess), 'tickets default (false)'
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
|
||||
reason='ticket check is not supported',
|
||||
)
|
||||
def test_tls_ticket_string(self):
|
||||
self.set_tickets(self.ticket)
|
||||
sess, ctx, _ = self.connect()
|
||||
assert self.has_ticket(sess), 'tickets string'
|
||||
|
||||
sess2, _, reused = self.connect(ctx, sess)
|
||||
assert self.has_ticket(sess2), 'tickets string reconnect'
|
||||
assert reused, 'tickets string reused'
|
||||
|
||||
sess2, _, reused = self.connect(ctx, sess, port=7081)
|
||||
assert self.has_ticket(sess2), 'connect True'
|
||||
assert not reused, 'connect True not reused'
|
||||
|
||||
self.set_tickets(self.ticket2, port=7081)
|
||||
|
||||
sess2, _, reused = self.connect(ctx, sess, port=7081)
|
||||
assert self.has_ticket(sess2), 'wrong ticket'
|
||||
assert not reused, 'wrong ticket not reused'
|
||||
|
||||
self.set_tickets(self.ticket80)
|
||||
|
||||
sess, ctx, _ = self.connect()
|
||||
assert self.has_ticket(sess), 'tickets string 80'
|
||||
|
||||
sess2, _, reused = self.connect(ctx, sess)
|
||||
assert self.has_ticket(sess2), 'tickets string 80 reconnect'
|
||||
assert reused, 'tickets string 80 reused'
|
||||
|
||||
sess2, _, reused = self.connect(ctx, sess, port=7081)
|
||||
assert self.has_ticket(sess2), 'wrong ticket 80'
|
||||
assert not reused, 'wrong ticket 80 not reused'
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not hasattr(_lib, 'SSL_SESSION_has_ticket'),
|
||||
reason='ticket check is not supported',
|
||||
)
|
||||
def test_tls_ticket_array(self):
|
||||
self.set_tickets([])
|
||||
|
||||
sess, ctx, _ = self.connect()
|
||||
assert not self.has_ticket(sess), 'tickets array empty'
|
||||
|
||||
self.set_tickets([self.ticket, self.ticket2])
|
||||
self.set_tickets(self.ticket, port=7081)
|
||||
self.set_tickets(self.ticket2, port=7082)
|
||||
|
||||
sess, ctx, _ = self.connect()
|
||||
_, _, reused = self.connect(ctx, sess, port=7081)
|
||||
assert not reused, 'not last ticket'
|
||||
_, _, reused = self.connect(ctx, sess, port=7082)
|
||||
assert reused, 'last ticket'
|
||||
|
||||
sess, ctx, _ = self.connect(port=7081)
|
||||
_, _, reused = self.connect(ctx, sess)
|
||||
assert reused, 'first ticket'
|
||||
|
||||
sess, ctx, _ = self.connect(port=7082)
|
||||
_, _, reused = self.connect(ctx, sess)
|
||||
assert reused, 'second ticket'
|
||||
|
||||
assert 'success' in self.conf_delete(
|
||||
'listeners/*:7080/tls/session/tickets/0'
|
||||
), 'removed first ticket'
|
||||
assert 'success' in self.conf_post(
|
||||
'"' + self.ticket + '"', 'listeners/*:7080/tls/session/tickets'
|
||||
), 'add new ticket to the end of array'
|
||||
|
||||
sess, ctx, _ = self.connect()
|
||||
_, _, reused = self.connect(ctx, sess, port=7082)
|
||||
assert not reused, 'not last ticket 2'
|
||||
_, _, reused = self.connect(ctx, sess, port=7081)
|
||||
assert reused, 'last ticket 2'
|
||||
|
||||
def test_tls_ticket_invalid(self):
|
||||
def check_tickets(tickets):
|
||||
assert 'error' in self.conf(
|
||||
{"tickets": tickets}, 'listeners/*:7080/tls/session',
|
||||
)
|
||||
|
||||
check_tickets({})
|
||||
check_tickets('!?&^' * 16)
|
||||
check_tickets(self.ticket[:-2] + '!' + self.ticket[3:])
|
||||
check_tickets(self.ticket[:-1])
|
||||
check_tickets(self.ticket + 'b')
|
||||
check_tickets(self.ticket + 'blah')
|
||||
check_tickets([True, self.ticket, self.ticket2])
|
||||
check_tickets([self.ticket, 'blah', self.ticket2])
|
||||
check_tickets([self.ticket, self.ticket2, []])
|
||||
Reference in New Issue
Block a user