# Author: Hubert Kario, (c) 2015
# Released under Gnu GPL v2.0, see LICENSE file for details
"""Objects for generating TLS messages to send."""
import random
import struct
from tlslite.messages import ClientHello, ClientKeyExchange, ChangeCipherSpec,\
Finished, Alert, ApplicationData, Message, Certificate, \
CertificateVerify, CertificateRequest, ClientMasterKey, \
ClientFinished, ServerKeyExchange, ServerHello, Heartbeat, \
KeyUpdate
from tlslite.constants import AlertLevel, AlertDescription, ContentType, \
ExtensionType, CertificateType, HashAlgorithm, \
SignatureAlgorithm, CipherSuite, SignatureScheme, TLS_1_3_HRR, \
HeartbeatMessageType
import tlslite.utils.tlshashlib as hashlib
from tlslite.extensions import TLSExtension, RenegotiationInfoExtension, \
ClientKeyShareExtension, StatusRequestExtension
from tlslite.messagesocket import MessageSocket
from tlslite.defragmenter import Defragmenter
from tlslite.mathtls import calc_key, \
calcExtendedMasterSecret
from tlslite.handshakehashes import HandshakeHashes
from tlslite.utils.codec import Writer
from tlslite.utils.cryptomath import getRandomBytes, numBytes, \
numberToByteArray, bytesToNumber, HKDF_expand_label, secureHMAC, \
derive_secret
from tlslite.keyexchange import KeyExchange
from tlslite.bufferedsocket import BufferedSocket
from tlslite.recordlayer import ConnectionState
from .helpers import key_share_gen, AutoEmptyExtension, ECDSA_SIG_ALL, \
RSA_PKCS1_ALL, RSA_PSS_PSS_ALL, RSA_PSS_RSAE_ALL, SIG_ALL, DSA_ALL
from .handshake_helpers import calc_pending_states, curve_name_to_hash_tls13
from .tree import TreeNode
import socket
from functools import partial
[docs]
class Command(TreeNode):
"""Command objects."""
def __init__(self):
"""Create object."""
super(Command, self).__init__()
[docs]
def is_command(self):
"""Define object as a command node."""
return True
[docs]
def is_expect(self):
"""Define object as a command node."""
return False
[docs]
def is_generator(self):
"""Define object as a command node."""
return False
[docs]
def process(self, state):
"""Change the state of the connection."""
raise NotImplementedError("Subclasses need to implement this!")
[docs]
class Connect(Command):
"""Object used to connect to a TCP server."""
def __init__(self, hostname, port, version=(3, 0), timeout=5):
"""
Provide minimal settings needed to connect to other peer.
:param str hostname: host name of the server to connect to
:param int port: :term:`TCP` port number to connect to
:param tuple(int,int) version: the protocol version used in the
record layer for the initial messages
:param float timeout: amount of time to wait while expecting a message
before aborting the connection, in seconds
"""
super(Connect, self).__init__()
self.hostname = hostname
self.port = port
self.version = version
self.timeout = timeout
[docs]
def process(self, state):
"""Connect to a server."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
sock.connect((self.hostname, self.port))
# disable Nagle - we handle buffering and flushing ourselves
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
# allow for later buffering of writes to the socket
sock = BufferedSocket(sock)
defragmenter = Defragmenter()
defragmenter.add_static_size(ContentType.alert, 2)
defragmenter.add_static_size(ContentType.change_cipher_spec, 1)
defragmenter.add_dynamic_size(ContentType.handshake, 1, 3)
state.msg_sock = MessageSocket(sock, defragmenter)
state.msg_sock.version = self.version
[docs]
class SetRecordVersion(Command):
"""Change the version used at record layer."""
def __init__(self, version):
super(SetRecordVersion, self).__init__()
self.version = version
[docs]
def process(self, state):
state.msg_sock.version = self.version
[docs]
class Close(Command):
"""Object used to close a TCP connection."""
def __init__(self):
"""Close connection object."""
super(Close, self).__init__()
[docs]
def process(self, state):
"""Close currently open connection."""
state.msg_sock.sock.close()
[docs]
class CloseRST(Command):
"""Object used to close a TCP connection with a RST packet."""
def __init__(self):
"""CloseRST connection object."""
super(CloseRST, self).__init__()
[docs]
def process(self, state):
"""Close currently open connection by sending a RST packet."""
l_onoff = 1
l_linger = 0
state.msg_sock.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', l_onoff, l_linger))
state.msg_sock.sock.close()
[docs]
class ResetHandshakeHashes(Command):
"""
Object used to reset current state of handshake hashes to zero.
Used for session renegotiation or resumption.
Also prepares for negotiation (or dropping) of record_size_limit extension.
"""
def __init__(self):
"""Object for resetting handshake hashes of session."""
super(ResetHandshakeHashes, self).__init__()
[docs]
def process(self, state):
"""Reset current running handshake protocol hashes."""
state.handshake_hashes = HandshakeHashes()
# remove the keys that can interfere with key calculation
# don't remove all as we need things like "resumption master secret"
# to actually resume next session
state.key.pop('PSK secret', None)
state.key.pop('DH shared secret', None)
# while not part of handshake hashes, it needs to be done every time
# renegotiation or resumption is performed as the extension needs to
# be negotiated every time
state._our_record_size_limit = 2**14
state._peer_record_size_limit = 2**14
[docs]
class ResetRenegotiationInfo(Command):
"""Object used to reset state of data needed for secure renegotiation."""
def __init__(self, client=None, server=None):
"""Reset the stored rengotiation info to provided values."""
super(ResetRenegotiationInfo, self).__init__()
self.client_verify_data = client
self.server_verify_data = server
[docs]
def process(self, state):
"""Reset current Finished message values."""
if self.client_verify_data is None:
self.client_verify_data = bytearray(0)
if self.server_verify_data is None:
self.server_verify_data = bytearray(0)
state.key['client_verify_data'] = self.client_verify_data
state.key['server_verify_data'] = self.server_verify_data
[docs]
class SetMaxRecordSize(Command):
"""Change the Record Layer to send records of non standard size."""
def __init__(self, max_size=None):
"""Set the maximum record layer message size, no option for default."""
super(SetMaxRecordSize, self).__init__()
self.max_size = max_size
[docs]
def process(self, state):
"""Change the size of messages in record layer."""
if self.max_size is None:
# the maximum for SSLv3, TLS1.0, TLS1.1 and TLS1.2
state.msg_sock.recordSize = 2**14
else:
state.msg_sock.recordSize = self.max_size
[docs]
class SetPaddingCallback(Command):
"""
Set the padding callback which returns the length of the padding to be
added to the message in the record layer.
"""
def __init__(self, cb=None):
"""Set the padding callback"""
super(SetPaddingCallback, self).__init__()
self.padding_cb = cb
[docs]
@staticmethod
def fixed_length_cb(size):
"""
Returns a callback function which returns a fixed number as the
padding size
"""
def _fixed_len_cb(length, contenttype, max_padding, zeroes=size):
"""
Simple callback which returns a fixed number as the padding size
to be added to the message
"""
if zeroes > (max_padding - length):
raise ValueError("requested padding size is too large")
return zeroes
return _fixed_len_cb
[docs]
@staticmethod
def fill_padding_cb(length, contenttype, max_padding):
"""
Simple callback which returns the maximum padding size as
the size of the padding to be added to the message
"""
return max_padding - length
[docs]
@staticmethod
def add_fixed_padding_cb(size):
"""
Returns a callback function which returns a fixed number as the
padding size
"""
def _add_fixed_passing_cb(length, contenttype, max_padding,
zeros=size):
"""
Simple callback which returns a fixed number as the padding size
to be added to the message.
This function does not check a correct size of padding, can cause
buffer overflow alert.
"""
return zeros
return _add_fixed_passing_cb
[docs]
def process(self, state):
"""
Set the callback which returns the length of the padding in record
layer.
"""
state.msg_sock.padding_cb = self.padding_cb
[docs]
class TCPBufferingEnable(Command):
"""
Start buffering all writes on the TCP level of connection.
You will need to call an explicit flush to send the messages.
"""
[docs]
def process(self, state):
"""Enable TCP buffering."""
state.msg_sock.sock.buffer_writes = True
[docs]
class TCPBufferingDisable(Command):
"""
Stop buffering all writes on the TCP level.
All messages will be now passed directly to the TCP socket
"""
[docs]
def process(self, state):
"""Disable TCP buffering."""
state.msg_sock.sock.buffer_writes = False
[docs]
class TCPBufferingFlush(Command):
"""
Send all messages in the buffer.
Does not change the state of buffering
"""
[docs]
def process(self, state):
"""Flush all messages to TCP socket."""
state.msg_sock.sock.flush()
[docs]
class ResetWriteConnectionState(Command):
"""
Reset _writeState configuration to default values
All sent messages will be unencrypted now
"""
[docs]
def process(self, state):
state.msg_sock._writeState = ConnectionState()
[docs]
class CollectNonces(Command):
"""
Start collecting nonces being sent by the server in the provided array.
Works only for ciphers like AES-GCM which use explicit nonces. Ciphers
like Chacha20 use implicit nonce constructed from PRF output and sequence
number.
Needs to be run after the cipher was negotiated and switched to (after
CCS), will collect nonces only till next renegotiation.
"""
def __init__(self, nonces):
"""Link the list for storing nonces with the object."""
super(CollectNonces, self).__init__()
self.nonces = nonces
[docs]
def process(self, state):
"""Monkey patch the seal() method."""
seal_mthd = state.msg_sock._writeState.encContext.seal
def collector(nonce, buf, authData, old_seal=seal_mthd,
nonces=self.nonces):
"""Collect used nonces for encryption."""
nonces.append(nonce)
return old_seal(nonce, buf, authData)
state.msg_sock._writeState.encContext.seal = collector
[docs]
class CopyVariables(Command):
"""
Copy current random values of connection to provided arrays.
Available keys are either ``ClientHello.random``, ``ServerHello.random``,
``ServerHello.session_id`` or
one of the values in ``key`` in
:py:class:`~tlsfuzzer.runner.ConnectionState`
(``premaster_secret``,
``master_secret``, ``ServerHello.extensions.key_share.key_exchange``,
``server handshake traffic secret``, ``exporter master secret``,
``ServerKeyExchange.key_share``, ``ServerKeyExchange.dh_p``,
``ClientKeyExchange.dh_Yc``, ``ClientKeyExchange.ecdh_Yc``,
``DH shared secret``, ``PSK secret``, ``client_verify_data``,
``server_verify_data``, ``client application traffic secret``,
``server application traffic secret``,
``resumption master secret``, ``early secret``, or ``handshake secret``)
The log should be a dict (where keys have the above specified names)
and values should be arrays (the values will be appended there).
This node needs to be put right after a node that calculate or use the
specific values to guarantee correct collection (i.e. if the conversation
performs a renegotiation, it needs to be placed after both
:py:class:`~tlsfuzzer.expect.ExpectServerHello` nodes to collect both
``ServerHello.random`` values).
:param dict(str,list) log: dictionary with names of values to collect
"""
def __init__(self, log):
"""Link the randoms to log with session."""
super(CopyVariables, self).__init__()
self.log = log
[docs]
def process(self, state):
"""Copy current variables to log arrays."""
for name, val in self.log.items():
if name == 'ClientHello.random':
val.append(state.client_random)
elif name == 'ServerHello.random':
val.append(state.server_random)
elif name == 'ServerHello.session_id':
val.append(state.session_id)
else:
if name not in state.key:
raise ValueError("'{0}' variable is not defined yet or "
"invalid for ConnectionState.key lookups."
.format(name))
val.append(state.key[name])
[docs]
class RawSocketWriteGenerator(Command):
"""
Send a plaintext data irrespective of encryption state.
Does not update handshake hashes, record layer state, does not fragment,
etc.
:ivar bytearray ~.data: data to send
:ivar str ~.description: identifier to print when processing of the node
fails
"""
def __init__(self, data, description=None):
"""Set the record layer type and payload to send."""
super(RawSocketWriteGenerator, self).__init__()
self.data = data
self.description = description
def __repr__(self):
"""Return human readable representation of the object."""
return self._repr(["data", "description"])
[docs]
def process(self, state):
"""Send the message over the socket."""
state.msg_sock._recordSocket.sock.send(self.data)
[docs]
class PlaintextMessageGenerator(Command):
"""
Send a plaintext data record irrespective of encryption state.
Does not update handshake hashes, record layer state, does not fragment,
etc.
:ivar int content_type: content type of message, used in record layer
header. See :py:class:`~tlslite.constants.ContentType` for well-known
values
:ivar bytearray ~.data: payload for the record
:ivar str ~.description: identifier to print when processing of the node
fails
"""
def __init__(self, content_type, data, description=None):
"""Set the record layer type and payload to send."""
super(PlaintextMessageGenerator, self).__init__()
self.content_type = content_type
self.data = data
self.description = description
def __repr__(self):
"""Return human readable representation of the object."""
vals = []
vals.append(('content_type', self.content_type))
vals.append(('data', repr(self.data)))
if self.description:
vals.append(('description', repr(self.description)))
return "PlaintextMessageGenerator({0})".format(
", ".join("{0}={1}".format(i[0], i[1]) for i in vals))
[docs]
def process(self, state):
"""Send the message over the socket."""
msg = Message(self.content_type, self.data)
for _ in state.msg_sock._recordSocket.send(msg):
pass
[docs]
class MessageGenerator(TreeNode):
"""Message generator objects."""
def __init__(self):
"""Initialize the object."""
super(MessageGenerator, self).__init__()
self.msg = None
self.queue = False
[docs]
def is_command(self):
"""Define object as a generator node."""
return False
[docs]
def is_expect(self):
"""Define object as a generator node."""
return False
[docs]
def is_generator(self):
"""Define object as a generator node."""
return True
[docs]
def generate(self, state):
"""Return a message ready to write to socket."""
raise NotImplementedError("Subclasses need to implement this!")
[docs]
def post_send(self, state):
"""Modify the state after sending the message."""
# since most messages don't require any post-send modifications
# create a no-op default action
pass
[docs]
class RawMessageGenerator(MessageGenerator):
"""
Generator for arbitrary record layer messages.
Can generate message with any content_type and any payload. Will
be encrypted if encryption is negotiated in the connection.
:ivar int content_type: content type of message, used in record layer
header. See :py:class:`~tlslite.constants.ContentType` for well-known
values
:ivar bytearray ~.data: payload for the record
:ivar str ~.description: identifier to print when processing of the node
fails
"""
def __init__(self, content_type, data, description=None):
"""Set the record layer type and payload to send."""
super(RawMessageGenerator, self).__init__()
self.content_type = content_type
self.data = data
self.description = description
[docs]
def generate(self, state):
"""Create a tlslite-ng message that can be send."""
message = Message(self.content_type, self.data)
return message
def __repr__(self):
"""Return human readable representation of the object."""
if self.description is None:
return "RawMessageGenerator(content_type={0!s}, data={1!r})".\
format(self.content_type, self.data)
else:
return "RawMessageGenerator(content_type={0!s}, data={1!r}, " \
"description={2!r})".format(self.content_type, self.data,
self.description)
[docs]
class HandshakeProtocolMessageGenerator(MessageGenerator):
"""Message generator for TLS Handshake protocol messages."""
[docs]
def post_send(self, state):
"""Update handshake hashes after sending."""
super(HandshakeProtocolMessageGenerator, self).post_send(state)
state.handshake_hashes.update(self.msg.write())
state.handshake_messages.append(self.msg)
[docs]
def ch_cookie_handler(state):
"""Client Hello cookie extension handler.
Copies the cookie extension from last HRR message.
"""
hrr = state.get_last_message_of_type(ServerHello)
if not hrr or hrr.random != TLS_1_3_HRR:
# as the second CH should never be used without ExpectHelloRetryRequest
# before it, using this helper when there is no HRR in current
# handshake messages in the state is a user error, not server error
raise ValueError("No HRR received")
cookie = hrr.getExtension(ExtensionType.cookie)
return cookie
[docs]
def ch_key_share_handler(state):
"""Client Hello key_share extension handler.
Generates the key share for the group selected by server in the last
HRR message.
"""
hrr = state.get_last_message_of_type(ServerHello)
if not hrr or hrr.random != TLS_1_3_HRR:
# as the second CH should never be used without ExpectHelloRetryRequest
# before it, using this helper when there is no HRR in current
# handshake messages in the state is a user error, not server error
raise ValueError("No HRR received")
hrr_key_share = hrr.getExtension(ExtensionType.key_share)
# the check if the group selected in HRR was advertised in the first
# ClientHello happens in the hrr_ext_handler_key_share()
group_id = hrr_key_share.selected_group
key_shares = [key_share_gen(group_id)]
key_share = ClientKeyShareExtension().create(key_shares)
return key_share
[docs]
class ClientHelloGenerator(HandshakeProtocolMessageGenerator):
"""Generator for TLS handshake protocol Client Hello messages."""
def __init__(self, ciphers=None, extensions=None, version=None,
session_id=None, random=None, compression=None, ssl2=False,
modifiers=None):
"""Set up the object for generation of Client Hello messages."""
super(ClientHelloGenerator, self).__init__()
if ciphers is None:
ciphers = []
if compression is None:
compression = [0]
self.version = version
self.ciphers = ciphers
self.extensions = extensions
self.session_id = session_id
self.random = random
self.compression = compression
self.ssl2 = ssl2
self.modifiers = modifiers
def __repr__(self):
"""Human readable representation of the object."""
ret = []
if self.ssl2:
ret.append("ssl2={0!r}".format(self.ssl2))
if self.version:
ret.append("version={0!r}".format(self.version))
if self.ciphers:
ret.append("ciphers={0!r}".format(self.ciphers))
if self.random:
ret.append("random={0!r}".format(self.random))
if self.session_id:
ret.append("session_id={0!r}".format(self.session_id))
if self.compression:
ret.append("compression={0!r}".format(self.compression))
if self.extensions:
ret.append("extensions={0!r}".format(self.extensions))
if self.modifiers:
ret.append("modifiers={0!r}".format(self.modifiers))
return "ClientHelloGenerator({0})".format(", ".join(ret))
[docs]
def _generate_extensions(self, state):
"""Convert extension generators to extension objects."""
extensions = []
for ext_id in self.extensions:
if self.extensions[ext_id] is not None:
if callable(self.extensions[ext_id]):
extensions.append(self.extensions[ext_id](state))
elif isinstance(self.extensions[ext_id], TLSExtension):
extensions.append(self.extensions[ext_id])
elif self.extensions[ext_id] is AutoEmptyExtension():
extensions.append(TLSExtension().create(ext_id,
bytearray()))
else:
raise ValueError("Bad extension, id: {0}".format(ext_id))
continue
if ext_id == ExtensionType.renegotiation_info:
ext = RenegotiationInfoExtension()\
.create(state.key['client_verify_data'])
elif ext_id == ExtensionType.status_request:
ext = StatusRequestExtension().create()
elif ext_id in (ExtensionType.client_hello_padding,
ExtensionType.encrypt_then_mac,
ExtensionType.extended_master_secret,
ExtensionType.session_ticket,
49, # post_handshake_auth
52): # transparency_info
ext = TLSExtension().create(ext_id, bytearray())
else:
raise ValueError("No autohandler for extension {0}"
.format(ExtensionType.toStr(ext_id)))
extensions.append(ext)
return extensions
[docs]
def _handle_modifiers(self, state, clnt_hello):
"""Handle processing of the modifiers of the message."""
if self.modifiers is None:
self.modifiers = [] # TODO psk_binder updater for session tickets
for mod in self.modifiers:
mod(state, clnt_hello)
[docs]
def generate(self, state):
"""Create a Client Hello message."""
if self.version is None:
self.version = state.client_version
if self.random:
state.client_random = self.random
if self.session_id is None and state.session_id:
self.session_id = state.session_id
if self.session_id is None and self.extensions \
and ExtensionType.supported_versions in self.extensions:
# in TLS 1.3, the server should reply with CCS (middlebox compat
# mode) only when client sends a session_id
self.session_id = getRandomBytes(32)
# if still unset, set to default value
if not self.session_id:
self.session_id = bytearray(b'')
if not state.client_random:
state.client_random = bytearray(32)
extensions = None
if self.extensions is not None:
extensions = self._generate_extensions(state)
clnt_hello = ClientHello(self.ssl2).create(self.version,
state.client_random,
self.session_id,
self.ciphers,
extensions=extensions)
clnt_hello.compression_methods = self.compression
self._handle_modifiers(state, clnt_hello)
state.client_version = self.version
self.msg = clnt_hello
return clnt_hello
[docs]
class ClientKeyExchangeGenerator(HandshakeProtocolMessageGenerator):
"""
Generator for TLS handshake protocol Client Key Exchange messages.
:vartype dh_Yc: int
:ivar dh_Yc: Override the sent dh_Yc value to the specified one
:vartype padding_subs: dict(int,int)
:ivar padding_subs: Substitutions for the encrypted premaster secret
padding bytes (applicable only for the RSA key exchange)
:vartype padding_xors: dict(int,int)
:ivar padding_xors: XORs for the encrypted premaster secret padding bytes
(applicable only for the RSA key exchange)
:vartype ecdh_Yc: bytearray
:ivar ecdh_Yc: encoded ECC point being the client key share for the
key exchange
:vartype encrypted_premaster: bytearray
:ivar encrypted_premaster: the premaster secret after it was encrypted,
as it will be sent on the wire
:vartype modulus_as_encrypted_premaster: bool
:ivar modulus_as_encrypted_premaster: if True, set the encrypted
premaster (the value seen on the wire) to the server's certificate
modulus (the server's public key)
:vartype p_as_share: bool
:ivar p_as_share: set the key share to the value :math:`p` provided by
server
in Server Key Exchange (applicable only to FFDHE key exchange)
:vartype p_1_as_share: bool
:ivar p_1_as_share: set the key share to the value :math:`p - 1`, as
provided by
server in Server Key Exchange (applicable only to FFDHE key exchange
with safe primes)
:ivar int padding_byte: byte to use as padding instead of randomly
generated bytes (applicable only for RSA key exchange)
:ivar tuple(int,int) client_version: the version to set in the RSA
pre-master secret
:ivar bool reuse_encrypted_premaster: if set to true, the message generator
will create the RSA ciphertext once and reuse it for subsequent
connections. Applicable only to RSA key exchange, useful only for
tests that run the same conversation over and over (like timing tests).
:ivar encrypted_premaster_file: The :term:`file object` from which to
read the encrypted premaster secret, on node re-ececution will read
subsequent values, does not rewind the file pointer or close the file.
The file must be opened in binary mode
:vartype encrypted_premaster_file: :term:`file object`
:ivar int encrypted_premaster_length: The length of data to read, in bytes
:ivar bool random_premaster: whether to use a random premaster value
or the static default (48 zero bytes)
"""
def __init__(self, cipher=None, version=None, client_version=None,
dh_Yc=None, padding_subs=None, padding_xors=None,
ecdh_Yc=None, encrypted_premaster=None,
modulus_as_encrypted_premaster=False, p_as_share=False,
p_1_as_share=False, premaster_secret=None,
padding_byte=None, reuse_encrypted_premaster=False,
encrypted_premaster_file=None,
encrypted_premaster_length=None,
random_premaster=False):
"""Set settings of the Client Key Exchange to be sent."""
super(ClientKeyExchangeGenerator, self).__init__()
self.cipher = cipher
self.version = version
self.client_version = client_version
if premaster_secret is None:
self.premaster_secret = bytearray(48)
else:
self.premaster_secret = premaster_secret
self.dh_Yc = dh_Yc
self.padding_subs = padding_subs
self.padding_xors = padding_xors
self.ecdh_Yc = ecdh_Yc
self.encrypted_premaster = encrypted_premaster
self.modulus_as_encrypted_premaster = modulus_as_encrypted_premaster
self.p_as_share = p_as_share
self.p_1_as_share = p_1_as_share
self.padding_byte = padding_byte
self.reuse_encrypted_premaster = reuse_encrypted_premaster
self.encrypted_premaster_file = encrypted_premaster_file
self.encrypted_premaster_length = encrypted_premaster_length
self.random_premaster = random_premaster
if encrypted_premaster_file and not encrypted_premaster_length:
raise ValueError("Must specify the length of data to read from"
" encrypted_premaster_file")
if modulus_as_encrypted_premaster and encrypted_premaster_file:
raise ValueError("Can't set both modulus_as_encrypted_premaster "
"and encrypted_premaster_file at the same time")
if p_as_share and p_1_as_share:
raise ValueError("Can't set both p_as_share and p_1_as_share at "
"the same time")
[docs]
def generate(self, status):
"""Create a Client Key Exchange message."""
if self.version is None:
self.version = status.version
if self.cipher is None:
self.cipher = status.cipher
if self.client_version is None:
self.client_version = status.client_version
cke = ClientKeyExchange(self.cipher, self.version)
if self.cipher in CipherSuite.certSuites:
if self.modulus_as_encrypted_premaster:
public_key = status.get_server_public_key()
self.encrypted_premaster = numberToByteArray(public_key.n)
elif self.encrypted_premaster_file:
self.encrypted_premaster = self.encrypted_premaster_file.read(
self.encrypted_premaster_length)
if self.encrypted_premaster:
cke.createRSA(self.encrypted_premaster)
if self.reuse_encrypted_premaster:
status.key['premaster_secret'] = self.premaster_secret
else:
if self.random_premaster:
self.premaster_secret = getRandomBytes(48)
if len(self.premaster_secret) >= 2:
self.premaster_secret[0] = self.client_version[0]
self.premaster_secret[1] = self.client_version[1]
status.key['premaster_secret'] = self.premaster_secret
public_key = status.get_server_public_key()
enc_premaster = self._encrypt_with_fuzzing(public_key)
if self.reuse_encrypted_premaster:
self.encrypted_premaster = enc_premaster
cke.createRSA(enc_premaster)
elif self.cipher in CipherSuite.dheCertSuites or \
self.cipher in CipherSuite.dheDsaSuites:
if self.dh_Yc is not None:
cke = ClientKeyExchange(self.cipher,
self.version).createDH(self.dh_Yc)
elif self.p_as_share or self.p_1_as_share:
ske = status.get_last_message_of_type(ServerKeyExchange)
assert ske, "No server key exchange in messages"
if self.p_as_share:
cke = ClientKeyExchange(self.cipher,
self.version).createDH(ske.dh_p)
else:
cke = ClientKeyExchange(self.cipher,
self.version).createDH(ske.dh_p-1)
else:
cke = status.key_exchange.makeClientKeyExchange()
status.key['ClientKeyExchange.dh_Yc'] = cke.dh_Yc
elif self.cipher in CipherSuite.ecdhAllSuites:
if self.ecdh_Yc is not None:
cke = ClientKeyExchange(self.cipher,
self.version).createECDH(self.ecdh_Yc)
else:
cke = status.key_exchange.makeClientKeyExchange()
status.key['ClientKeyExchange.ecdh_Yc'] = cke.ecdh_Yc
else:
raise AssertionError("Unknown cipher/key exchange type")
self.msg = cke
return cke
[docs]
def _encrypt_with_fuzzing(self, public_key):
"""Use public_key to encrypt premaster secret with fuzzed padding."""
old_addPKCS1Padding = public_key._addPKCS1Padding
public_key = fuzz_pkcs1_padding(public_key, self.padding_subs,
self.padding_xors, self.padding_byte)
ret = public_key.encrypt(self.premaster_secret)
public_key._addPKCS1Padding = old_addPKCS1Padding
return ret
[docs]
def post_send(self, state):
"""Save intermediate handshake hash value."""
# for EMS all messages up to and including CKE are part of
# "session_hash"
super(ClientKeyExchangeGenerator, self).post_send(state)
state.certificate_verify_handshake_hashes = \
state.handshake_hashes.copy()
[docs]
class ClientMasterKeyGenerator(HandshakeProtocolMessageGenerator):
"""Generator for SSLv2 Handshake Protocol CLIENT-MASTER-KEY message."""
def __init__(self, cipher=None, master_key=None):
"""Set the cipher to send to server."""
super(ClientMasterKeyGenerator, self).__init__()
self.cipher = cipher
self.master_key = master_key
[docs]
def generate(self, state):
"""Generate a new CLIENT-MASTER-KEY message."""
if self.cipher is None:
raise NotImplementedError("No cipher autonegotiation")
if self.master_key is None:
if state.key['master_secret'] == bytearray(0):
if self.cipher in CipherSuite.ssl2_128Key:
key_size = 16
elif self.cipher in CipherSuite.ssl2_192Key:
key_size = 24
elif self.cipher in CipherSuite.ssl2_64Key:
key_size = 8
else:
raise AssertionError("unknown cipher but no master_secret")
self.master_key = getRandomBytes(key_size)
else:
self.master_key = state.key['master_secret']
cipher = self.cipher
if (cipher not in CipherSuite.ssl2rc4 and
cipher not in CipherSuite.ssl2_3des):
# tlslite-ng doesn't implement anything else, so we need to
# workaround calculation of pending states failure for test
# cases which don't really encrypt data
cipher = CipherSuite.SSL_CK_RC4_128_WITH_MD5
key_arg = state.msg_sock.calcSSL2PendingStates(cipher,
self.master_key,
state.client_random,
state.server_random,
None)
if self.cipher in CipherSuite.ssl2export:
clear_key = self.master_key[:-5]
secret_key = self.master_key[-5:]
else:
clear_key = bytearray(0)
secret_key = self.master_key
pub_key = state.get_server_public_key()
encrypted_master_key = pub_key.encrypt(secret_key)
cmk = ClientMasterKey()
cmk.create(self.cipher, clear_key, encrypted_master_key, key_arg)
self.msg = cmk
return cmk
[docs]
class CertificateGenerator(HandshakeProtocolMessageGenerator):
"""Generator for TLS handshake protocol Certificate message."""
def __init__(self, certs=None, cert_type=None, version=None, context=None):
"""Set the certificates to send to server."""
super(CertificateGenerator, self).__init__()
self.certs = certs
self.cert_type = cert_type
self.version = version
self.context = context
[docs]
def generate(self, status):
"""Create a Certificate message."""
if self.version is None:
self.version = status.version
if self.cert_type is None:
self.cert_type = CertificateType.x509
context = b''
if self.context:
context = self.context[-1]
assert isinstance(context, CertificateRequest)
context = context.certificate_request_context
cert = Certificate(self.cert_type, version=self.version)
cert.create(self.certs, context=context)
if self.context:
self.context.append(cert)
self.msg = cert
return cert
[docs]
class CertificateVerifyGenerator(HandshakeProtocolMessageGenerator):
"""
Generator for TLS handshake protocol Certificate Verify message.
:vartype msg_alg: tuple(int,int)
:ivar msg_alg: signature and hash algorithm to be set on in the
digitally-signed structure of TLSv1.2 Certificate Verify message.
By default the first matching algorithm from CertificateRequest that
matches our key or sent certificate. If no CertificateRequest received
it will send the first algorithm matching our key or certificate sent.
If no Certificate nor private key is available, it will select first
algorithm from CertificateRequest. If no Certificate, CertificateRequest
nor private key is availbale then it will use SHA-1 + RSA
The first value in the tuple specifies hash type (from
HashAlgorithm) and the second value specifies the signature algorithm
(from SignatureAlgorithm). Or the value from SignatureScheme.
:vartype msg_version: tuple(int,int)
:ivar msg_version: protocol version that the message is to use,
default is taken from current connection state
:vartype sig_version: tuple(int,int)
:ivar sig_version: protocol version to use for calculating the verify bytes
for the signature (overrides msg_version, but just for the signature).
Equal to msg_version by default.
:vartype sig_alg: tuple(int,int)
:ivar sig_alg: hash and signature algorithm to be used for creating the
signature in the message. Equal to msg_alg by default. Requires the
``sig_version`` to be set to at least TLSv1.2 to be effective.
:vartype signature: bytearray
:ivar signature: bytes to sent as the signature of the message
:vartype padding_xors: dict(int,int)
:ivar padding_xors: which bytes of the pre-encryption RSA padding or
post-signature ECDSA signature should be xored and with what values
:vartype padding_subs: dict(int,int)
:ivar padding_subs: same as padding_xors but substitues specified bytes
instead
:vartype mgf1_hash: str
:ivar mgf1_hash: name of the hash to be used for calculating MGF1,
effective only if sig_alg is set to a RSA_PSS algorithm and sig_version
is TLS 1.2 or greater. By default the hash taken from sig_alg.
:vartype rsa_pss_salt_len: int
:ivar rsa_pss_salt_len: length of the salt (in bytes) used in signature.
Effective only if sig_alg is set to a RSA_PSS algorithm and sig_version
is TLS 1.2 or greater. By default it's equal to the length of the
hash taken from sig_alg.
:vartype private_key: :py:class:`~tlslite.utils.rsakey.RSAKey` or
:py:class:`~tlslite.utils.ecdsakey.ECDSAKey`
:ivar private_key: key that will be used for signing the message
"""
def __init__(self, private_key=None, msg_version=None, msg_alg=None,
sig_version=None, sig_alg=None, signature=None,
rsa_pss_salt_len=None, padding_xors=None, padding_subs=None,
mgf1_hash=None, context=None):
"""Create object for generating Certificate Verify messages."""
super(CertificateVerifyGenerator, self).__init__()
self.private_key = private_key
self.msg_alg = msg_alg
self.msg_version = msg_version
self.sig_version = sig_version
if not isinstance(sig_version, tuple) and sig_version is not None:
raise ValueError("sig_version must be None or a tuple")
self.sig_alg = sig_alg
self.signature = signature
self.rsa_pss_salt_len = rsa_pss_salt_len
self.padding_xors = padding_xors
self.padding_subs = padding_subs
self.mgf1_hash = mgf1_hash
self.context = context
[docs]
@staticmethod
def _sig_alg_for_rsa_key(key_alg, accept_sig_algs, version):
"""Select an acceptable signature algorithm for a given rsa key."""
if version < (3, 3):
# in TLS 1.1 and earlier, there is no algorithm selection,
# pick one closest, as far as used algorithms are concerned, to
# the TLS 1.2 algorithm
return (HashAlgorithm.sha1, SignatureAlgorithm.rsa)
if key_alg == "rsa":
# with rsa key we can make either RSA-PSS or RSA-PKCS#1 signatures
if version < (3, 4):
valid_sig_algs = RSA_PSS_RSAE_ALL + RSA_PKCS1_ALL
else:
# but not in TLS 1.3
valid_sig_algs = RSA_PSS_RSAE_ALL
else:
# with rsa-pss key we can make only RSA-PSS signatures
assert key_alg == "rsa-pss"
valid_sig_algs = RSA_PSS_PSS_ALL
return next((i for i in accept_sig_algs
if i in valid_sig_algs), valid_sig_algs[0])
[docs]
@staticmethod
def _sig_alg_for_ecdsa_key(accept_sig_algs, version, key):
"""Select an acceptable signature algorithm for a given ecdsa key."""
if version < (3, 3):
# in TLS 1.1 and earlier, there is no algorithm selection,
# pick one closest, as far as used algorithms are concerned, to
# the TLS 1.2 algorithm
return (HashAlgorithm.sha1, SignatureAlgorithm.ecdsa)
if version < (3, 4):
# in TLS 1.2 we can mix and match hashes and curves
return next((i for i in accept_sig_algs
if i in ECDSA_SIG_ALL), ECDSA_SIG_ALL[0])
# but in TLS 1.3 we need to select a hash that matches our key
hash_name = curve_name_to_hash_tls13(key.curve_name)
# while it may select one that wasn't advertised by server,
# this is better last resort than sending a sha1+rsa sigalg
return (getattr(HashAlgorithm, hash_name), SignatureAlgorithm.ecdsa)
[docs]
@staticmethod
def _sig_alg_for_dsa_key(accept_sig_algs, version, key):
"""Select an acceptable signature algorithm for a given DSA key."""
if version < (3, 3):
# in TLS 1.1 and earlier, there is no algorithm selection,
# pick one closest, as far as used algorithms are concerned, to
# the TLS 1.2 algorithm
return (HashAlgorithm.sha1, SignatureAlgorithm.dsa)
if version < (3, 4):
# in TLS 1.2 we can mix and match hashes and curves
return next((i for i in accept_sig_algs
if i in DSA_ALL), DSA_ALL[0])
# but in TLS 1.3 we need to select a hash that matches our key
hash_name = curve_name_to_hash_tls13(key.curve_name)
# while it may select one that wasn't advertised by server,
# this is better last resort than sending a sha1+rsa sigalg
return (getattr(HashAlgorithm, hash_name), SignatureAlgorithm.dsa)
[docs]
@staticmethod
def _sig_alg_for_eddsa_key(key_alg, accept_sig_algs):
sig_alg = getattr(SignatureScheme, key_alg.lower())
assert sig_alg in accept_sig_algs
return sig_alg
[docs]
@staticmethod
def _sig_alg_for_certificate(key_alg, accept_sig_algs, version, key):
"""
Select an acceptable signature algorithm based on key algorithm,
protocol version and curve name (in case of ECDSA).
"""
if key_alg in ("rsa", "rsa-pss"):
return CertificateVerifyGenerator._sig_alg_for_rsa_key(
key_alg, accept_sig_algs, version)
if key_alg in ("Ed25519", "Ed448"):
return CertificateVerifyGenerator._sig_alg_for_eddsa_key(
key_alg, accept_sig_algs)
if key_alg == "dsa":
return CertificateVerifyGenerator._sig_alg_for_dsa_key(
accept_sig_algs, version, key)
assert key_alg == "ecdsa"
return CertificateVerifyGenerator._sig_alg_for_ecdsa_key(
accept_sig_algs, version, key)
[docs]
def _get_key_and_key_type(self, status):
"""
Get a key, or if not possible, certificate for selecting the signature
algorithm.
"""
key_type = None
key = None
if self.private_key:
key_type = self.private_key.key_type
key = self.private_key
cert = status.get_last_message_of_type(Certificate)
our_cert = None
if cert:
our_cert = cert.cert_chain.x509List[0]
if not key_type:
key_type = our_cert.certAlg
key = our_cert.publicKey
return key_type, key, our_cert
[docs]
def _select_msg_alg(self, status):
"""
Select the signature algorithm based on CertificateRequest from server,
either our sent Certificate or our private key and the protocol
version.
"""
# first, what signature algorithms we can use
key_type, key, our_cert = self._get_key_and_key_type(status)
# second, what signature algorithms does the server like
ok_sig_algs = SIG_ALL
if self.context:
# do post-handshake authentication from TLS 1.3
cert_req = self.context[0]
assert isinstance(cert_req, CertificateRequest)
else:
cert_req = status.get_last_message_of_type(CertificateRequest)
if cert_req is not None:
# when we got a CR message, we need to select a signature that
# matches one of the algorithms the server sent
if not self.private_key and not our_cert:
# when sending malformed messages, the key may not be
# even loaded, so select any algorithm acceptable to server
self.msg_alg = cert_req.supported_signature_algs[0]
return
ok_sig_algs = cert_req.supported_signature_algs
# try to find one acceptable given all limitations
if key_type:
self.msg_alg = self._sig_alg_for_certificate(
key_type, ok_sig_algs, self.sig_version, key)
if self.msg_alg is None:
# as an ultimate fallback, when we have no certificate,
# private key or CertificateRequest to work with
self.msg_alg = (HashAlgorithm.sha1,
SignatureAlgorithm.rsa)
[docs]
@staticmethod
def _normalise_dict(dictionary, max_byte):
# python2.6 does not support dict comprehension
# pylint: disable=consider-using-dict-comprehension
return dict([(min(k, max_byte), v) for k, v in
dictionary.items()])
# pylint: enable=consider-using-dict-comprehension
[docs]
def _normalise_subs_and_xors(self, max_byte):
"""
Make sure that the substitutions and xors don't go over the size
of buffer, this is fine as ECDSA signatures are ASN.1 objects so have
variable size
"""
if self.padding_subs:
self.padding_subs = \
self._normalise_dict(self.padding_subs, max_byte)
if self.padding_xors:
self.padding_xors = \
self._normalise_dict(self.padding_xors, max_byte)
[docs]
def _get_ecdsa_sig_parameters(self):
"""Set up parameters for sign() operation with ecdsa keys."""
if self.sig_alg:
# while the argument is called mgf1_hash in ecdsa
# signatures it's used for the derivation of the nonce
self.mgf1_hash = HashAlgorithm.toStr(self.sig_alg[0])
else:
# in TLS 1.1 and earlier we do simple sha1 signatures
self.mgf1_hash = "sha1"
# yes, we're using bad names (for API compat reasons) and we're
# accessing private methods (as we intentionally want to break stuff)
# so ignore those issues
# pylint: disable=invalid-name,protected-access
[docs]
def _get_rsa_sig_parameters(self):
"""Return parameters for sign() operation with rsa keys."""
scheme = SignatureScheme.toRepr(self.sig_alg)
hash_name = None
if scheme is None:
padding = "pkcs1"
else:
padding = SignatureScheme.getPadding(scheme)
if padding == 'pss':
hash_name = SignatureScheme.getHash(scheme)
if self.rsa_pss_salt_len is None:
self.rsa_pss_salt_len = \
getattr(hashlib, hash_name)().digest_size
if not self.mgf1_hash:
self.mgf1_hash = hash_name
def _newRawPrivateKeyOp(self, m, original_raw_private_key_op_bytes,
subs=None, xors=None):
sign_bytes = m
sign_bytes = substitute_and_xor(sign_bytes, subs, xors)
m = bytesToNumber(sign_bytes)
# RSA operations are defined only on numbers that are
# smaller than the modulus, so ensure the XORing or
# substitutions
# didn't break it (especially necessary for pycrypto as
# it raises exception in such case)
if m > self.n:
m %= self.n
return original_raw_private_key_op_bytes(
numberToByteArray(m, numBytes(self.n)))
old_private_key_op = self.private_key._raw_private_key_op_bytes
self.private_key._raw_private_key_op_bytes = \
partial(_newRawPrivateKeyOp,
self.private_key,
original_raw_private_key_op_bytes=old_private_key_op,
subs=self.padding_subs,
xors=self.padding_xors)
return padding, old_private_key_op
# pylint: enable=invalid-name,protected-access
[docs]
def _make_signature(self, status):
"""Create signature for CertificateVerify message."""
if self.private_key is None:
raise ValueError("Can't create a signature without "
"private key!")
if self.sig_alg and self.sig_alg[1] == SignatureAlgorithm.ecdsa or\
self.private_key.key_type == "ecdsa":
signature_type = "ecdsa"
elif self.sig_alg and self.sig_alg in (
SignatureScheme.ed25519, SignatureScheme.ed448) or \
self.private_key.key_type in ("Ed25519", "Ed448"):
signature_type = "eddsa"
elif self.sig_alg and self.sig_alg[1] == SignatureAlgorithm.dsa or \
self.private_key.key_type == "dsa":
signature_type = "dsa"
else:
signature_type = "rsa"
if self.context:
# if we have context set, it means we're doing post handshake
# authentication in TLS 1.3
handshake_hashes = \
status.key['client finished handshake hashes'].copy()
for ctx in self.context:
handshake_hashes.update(ctx.write())
else:
handshake_hashes = status.handshake_hashes
verify_bytes = \
KeyExchange.calcVerifyBytes(self.sig_version,
handshake_hashes,
self.sig_alg,
status.key['premaster_secret'],
status.client_random,
status.server_random,
status.prf_name,
key_type=self.private_key.key_type)
if signature_type == "eddsa":
self.mgf1_hash = "intrinsic"
self.rsa_pss_salt_len = None
padding = None
old_private_key_op = None
sig_func = self.private_key.hashAndSign
elif signature_type == "ecdsa":
self._get_ecdsa_sig_parameters()
padding = None
old_private_key_op = None
# truncate the hash so that if we sign big hash with small
# curve, the signing is successful
verify_bytes = verify_bytes[:self.private_key.
private_key.curve.baselen]
sig_func = self.private_key.sign
elif signature_type == "dsa":
padding = None
old_private_key_op = None
sig_func = self.private_key.sign
else:
# we don't have to handle non pkcs1 padding because the
# calcVerifyBytes does everything
padding, old_private_key_op = self._get_rsa_sig_parameters()
sig_func = self.private_key.sign
try:
signature = sig_func(verify_bytes,
padding,
self.mgf1_hash,
self.rsa_pss_salt_len)
finally:
# make sure the changes are undone even if the signing fails
self.private_key._raw_private_key_op_bytes = old_private_key_op
if signature_type in ("ecdsa", "dsa"):
# because DSA and ECDSA signatures are ANS.1 DER objects, they
# can have different lengths depending on the bit size of
# "r" and "s" variables
# given that indexing would fail if it was asked to index
# over an nonexistent byte, we need to limit the numbers
signature = bytearray(signature)
max_byte = len(signature) - 1
self._normalise_subs_and_xors(max_byte)
if signature_type in ("ecdsa", "eddsa", "dsa"):
# but EdDSA signatures are always the same length for given
# key type, so don't normalise the values for them
signature = substitute_and_xor(signature, self.padding_subs,
self.padding_xors)
return signature
[docs]
def generate(self, status):
"""Create a CertificateVerify message."""
if self.msg_version is None:
self.msg_version = status.version
if self.sig_version is None:
self.sig_version = self.msg_version
if self.msg_alg is None and self.msg_version >= (3, 3):
self._select_msg_alg(status)
if self.sig_alg is None:
self.sig_alg = self.msg_alg
# TODO: generate a random key if none provided
if self.signature is None:
signature = self._make_signature(status)
else:
signature = self.signature
cert_verify = CertificateVerify(self.msg_version)
cert_verify.create(signature, self.msg_alg)
if self.context:
self.context.append(cert_verify)
self.msg = cert_verify
return cert_verify
[docs]
class ClearContext(Command):
"""
Object used to zero-out the context used in PHA.
This is necessary if the conversation is executed more than once.
"""
def __init__(self, context):
super(ClearContext, self).__init__()
self.context = context
[docs]
def process(self, state):
"""Zero out the associated context"""
del state # unused
self.context[:] = []
[docs]
class ChangeCipherSpecGenerator(MessageGenerator):
"""
Generator for TLS Change Cipher Spec messages.
.. note::
After sending the ChangeCipherSpec message, in TLS 1.2 and earlier,
the record layer will switch to encrypted communication (or newly
negotiated keys). In TLS 1.3 the message has no effect on encryption
or record layer state.
"""
def __init__(self, extended_master_secret=None, fake=False):
"""Create an object for generating CCS messages."""
super(ChangeCipherSpecGenerator, self).__init__()
self.extended_master_secret = extended_master_secret
self.fake = fake
[docs]
def generate(self, status):
"""Create a message for sending to server."""
ccs = ChangeCipherSpec()
return ccs
[docs]
def post_send(self, status):
"""Generate new encryption keys for connection."""
# in TLS 1.3 it's a fake message, doesn't cause calculation of new keys
if status.version >= (3, 4) or self.fake:
return
cipher_suite = status.cipher
status.msg_sock.encryptThenMAC = status.encrypt_then_mac
if self.extended_master_secret is None:
self.extended_master_secret = status.extended_master_secret
if not status.resuming:
if self.extended_master_secret:
# in case client certificates are used, we need to omit
# certificate verify message
hh = status.certificate_verify_handshake_hashes
if not hh:
hh = status.handshake_hashes
master_secret = \
calcExtendedMasterSecret(status.version,
cipher_suite,
status.key['premaster_secret'],
hh)
else:
master_secret = calc_key(
status.version,
status.key['premaster_secret'],
cipher_suite,
b'master secret',
client_random=status.client_random,
server_random=status.server_random,
output_length=48)
status.key['master_secret'] = master_secret
# in case of resumption, the pending states are generated
# during receive of server sent CCS
calc_pending_states(status)
status.msg_sock.changeWriteState()
if status._peer_record_size_limit:
status.msg_sock.send_record_limit = \
status._peer_record_size_limit
status.msg_sock.recordSize = status._peer_record_size_limit
[docs]
class FinishedGenerator(HandshakeProtocolMessageGenerator):
"""
Generator for TLS handshake protocol Finished messages.
.. note::
The FinishedGenerator may influence the record layer encryption.
In SSLv2, the record layer will be configured to expect encrypted
records and send encrypted records *before* the message is sent.
In SSLv3 up to TLS 1.2 the message has no impact on state of
encryption. In TLS 1.3, *after* the message is sent, the record layer
will be switched to use ``client_application_traffic_secret`` keys for
*sending*.
"""
def __init__(self, protocol=None,
trunc_start=0, trunc_end=None,
pad_byte=0, pad_left=0, pad_right=0, context=None):
"""Object to generate Finished messages."""
super(FinishedGenerator, self).__init__()
self.protocol = protocol
self.server_finish_hh = None
self.trunc_start = trunc_start
self.trunc_end = trunc_end
self.pad_byte = pad_byte
self.pad_left = pad_left
self.pad_right = pad_right
self.context = context
[docs]
def generate(self, status):
"""Create a Finished message."""
if self.protocol is None:
self.protocol = status.version
if self.protocol in ((0, 2), (2, 0)):
finished = ClientFinished()
verify_data = status.session_id
# in SSLv2 we're using it as a CCS-of-sorts too
status.msg_sock.changeWriteState()
status.msg_sock.changeReadState()
elif self.protocol <= (3, 3):
finished = Finished(self.protocol)
verify_data = calc_key(status.version,
status.key['master_secret'],
status.cipher,
b'client finished' if status.client
else b'server finished',
status.handshake_hashes,
output_length=12)
else: # TLS 1.3
finished = Finished(self.protocol, status.prf_size)
if self.context:
# post-handshake authentication in TLS 1.3
base_key = status.key['client application traffic secret']
else:
base_key = status.key['client handshake traffic secret']
finished_key = HKDF_expand_label(
base_key,
b'finished',
b'',
status.prf_size,
status.prf_name)
if self.context:
# post-handshake authentication in TLS 1.3
self.server_finish_hh = \
status.key['client finished handshake hashes'].copy()
for ctx in self.context:
self.server_finish_hh.update(ctx.write())
else:
self.server_finish_hh = status.handshake_hashes.copy()
verify_data = secureHMAC(
finished_key,
self.server_finish_hh.digest(status.prf_name),
status.prf_name)
# messing with the message - truncation
verify_data = verify_data[self.trunc_start:self.trunc_end]
# messing with the message - padding
verify_data = bytearray([self.pad_byte]*self.pad_left) \
+ verify_data \
+ bytearray([self.pad_byte]*self.pad_right)
status.key['client_verify_data'] = verify_data
finished.create(verify_data)
self.msg = finished
return finished
[docs]
def post_send(self, status):
"""Perform post-transmit changes needed by generation of Finished."""
super(FinishedGenerator, self).post_send(status)
if self.context:
return
# resumption finished
status.resuming = False
if status.version <= (3, 3):
return
# Switch to application traffic secret for writing.
# For reading we switched with the server Finished.
status.msg_sock.changeWriteState()
# derive resumption master secret key
secret = status.key['master secret']
res_ms = derive_secret(secret, b'res master', status.handshake_hashes,
status.prf_name)
status.key['resumption master secret'] = res_ms
# preserve the hash state for post-handshake authentication
status.key['client finished handshake hashes'] = \
status.handshake_hashes.copy()
[docs]
class AlertGenerator(MessageGenerator):
"""Generator for TLS Alert messages."""
def __init__(self, level=AlertLevel.warning,
description=AlertDescription.close_notify):
"""Save the level and description of the Alert to send."""
super(AlertGenerator, self).__init__()
self.level = level
self.description = description
[docs]
def generate(self, status):
"""Send the Alert to server."""
alert = Alert().create(self.description, self.level)
return alert
[docs]
class ApplicationDataGenerator(MessageGenerator):
"""Generator for TLS Application Data messages."""
def __init__(self, payload):
"""Save the data to send to server."""
super(ApplicationDataGenerator, self).__init__()
self.payload = payload
[docs]
def generate(self, status):
"""Send data to server in Application Data messages."""
app_data = ApplicationData().create(self.payload)
return app_data
[docs]
class KeyUpdateGenerator(MessageGenerator):
"""Generator for TLS 1.3 KeyUpdate message."""
def __init__(self, message_type=0):
"""Save the type of the KeyUpdate message."""
super(KeyUpdateGenerator, self).__init__()
self.message_type = message_type
[docs]
def generate(self, state):
"""Generate a KeyUpdate message."""
del state # needed only for API compatibility
key_update = KeyUpdate().create(self.message_type)
return key_update
[docs]
def post_send(self, state):
"""Perform post-transmit changes needed by generation of KeyUpdate."""
super(KeyUpdateGenerator, self).post_send(state)
cl_app_secret, _ = state.msg_sock.\
calcTLS1_3KeyUpdate_reciever(
state.cipher,
state.key['client application traffic secret'],
state.key['server application traffic secret'])
state.key['client application traffic secret'] = cl_app_secret
[docs]
class HeartbeatGenerator(MessageGenerator):
"""
Generator for heartbeat messages.
:ivar message_type: the type of the message to send, see
:py:class:`HeartbeatMessageType` enum for values
:vartype message_type: int
:ivar payload: data to be sent to the other size for it to echo it back
:vartype payload: bytearray
:ivar padding: payload to be sent to the other side, it should be at least
16 bytes long for the message to be valid
:vartype padding: bytearray
"""
def __init__(self, payload,
message_type=HeartbeatMessageType.heartbeat_request,
padding_length=None):
"""
Initialise and create instance of object.
:type payload: bytes-like
:param payload: payload to send to the other side; either a reply
to received heartbeat request or a value that the other side will
have to echo
:type message_type: int
:param message_type: the type of message to send, valid values are
defined in :py:class:`HeartbeatMessageType` enum
:type padding_length: int
:param padding_length: the length (in bytes) of the random padding that
will be generated; 16 by default (if special contents of padding
are necessary, it's possible to set the
:py:attr:`~HeartbeatGenerator.padding` field in the object's
instance after the object was initialised)
"""
super(HeartbeatGenerator, self).__init__()
self.message_type = message_type
self.payload = payload
if padding_length is None:
padding_length = 16
self.padding = getRandomBytes(padding_length)
[docs]
def generate(self, state):
"""
Create a Heartbeat message.
:rtype: `~tlslite.messages.Heartbeat`
:return: heartbeat message to be sent to the other side
"""
del state
heartbeat = Heartbeat()
heartbeat.message_type = self.message_type
heartbeat.payload = self.payload
heartbeat.padding = self.padding
return heartbeat
[docs]
def pad_handshake(generator, size=0, pad_byte=0, pad=None):
"""
Pad or truncate handshake messages.
Pad or truncate a handshake message by given amount of bytes, use negative
size to truncate. Update handshake protocol header to compensate.
:param MessageGenerator generator: modified message
:param int size: number of bytes to add at the end (if positive) or number
of bytes to remove at the end of payload (if negative)
:param int pad_byte: numerical value of added bytes, must be between
0 and 255 inclusive
:param bytearray pad: bytes to add at the end of payload
"""
def new_generate(state, old_generate=generator.generate):
"""Monkey patch for the generate method of the Handshake generators."""
msg = old_generate(state)
def post_write(writer, self=msg, size=size, pad_byte=pad_byte,
pad=pad):
"""Monkey patch for the postWrite of handshake messages."""
if pad is not None:
size = len(pad)
header_writer = Writer()
header_writer.add(self.handshakeType, 1)
header_writer.add(len(writer.bytes) + size, 3)
if pad is not None:
return header_writer.bytes + writer.bytes + pad
elif size < 0:
return header_writer.bytes + writer.bytes[:size]
else:
return header_writer.bytes + writer.bytes + \
bytearray([pad_byte]*size)
msg.postWrite = post_write
return msg
generator.generate = new_generate
return generator
[docs]
def truncate_handshake(generator, size=0, pad_byte=0):
"""
Truncate a handshake message.
See :py:func:`pad_handshake` for inverse of this function
"""
return pad_handshake(generator, -size, pad_byte)
[docs]
def _apply_function(data, settings, fun):
"""Modify data based on settings and function fun."""
for pos in settings:
if settings[pos] == -1:
data[pos] = fun(data[pos], random.randint(0, 255))
elif settings[pos] == -2:
data[pos] = fun(data[pos], random.randint(1, 255))
else:
assert settings[pos] >= 0
data[pos] = fun(data[pos], settings[pos])
[docs]
def substitute_and_xor(data, substitutions, xors):
"""
Apply changes from substitutions and xors to data for fuzzing.
(Method used internally by tlsfuzzer.)
"""
if substitutions:
_apply_function(data, substitutions, lambda a, b: b)
if xors:
_apply_function(data, xors, lambda a, b: a ^ b)
return data
[docs]
def fuzz_message(generator, substitutions=None, xors=None):
"""
Change arbitrary bytes of the message after write.
Modified data includes handshake protocol header but doesn't include
record header, content type or record-level padding.
:param MessageGenerator generator: modified message
:param dict(int,int) substitutions: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to change the bytes to
:param dict(int,int) xors: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to xor with
"""
def new_generate(state, old_generate=generator.generate):
"""Monkey patch for the generate method of the Handshake generators."""
msg = old_generate(state)
def new_write(old_write=msg.write, substitutions=substitutions,
xors=xors):
"""Monkey patch for the write method of messages."""
data = old_write()
data = substitute_and_xor(data, substitutions, xors)
return data
msg.write = new_write
return msg
generator.generate = new_generate
return generator
[docs]
def post_send_msg_sock_restore(obj, method_name, old_method_name):
"""
Un-Monkey patch a method of msg_sock.
(Method used internally by tlsfuzzer.)
"""
def new_post_send(state, obj=obj,
method_name=method_name,
old_method_name=old_method_name,
old_post_send=obj.post_send):
"""Reverse the patching of a method in msg_sock."""
setattr(state.msg_sock, method_name, getattr(obj, old_method_name))
old_post_send(state)
obj.post_send = new_post_send
return obj
[docs]
def fuzz_mac(generator, substitutions=None, xors=None):
"""
Change arbitrary bytes of the MAC value.
Works with stream and CBC cipher suites in SSL 3 up to TLS 1.2.
Works with both encrypt then MAC and MAC then encrypt connections.
:param MessageGenerator generator: modified message
:param dict(int,int) substitutions: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to change the bytes to
:param dict(int,int) xors: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to xor with
"""
def new_generate(state, self=generator,
old_generate=generator.generate,
substitutions=substitutions,
xors=xors):
"""Monkey patch to modify MAC calculation of created MAC."""
msg = old_generate(state)
old_calculate_mac = state.msg_sock.calculateMAC
self.old_calculate_mac = old_calculate_mac
def new_calculate_mac(mac, seqnumBytes, contentType, data,
old_calculate_mac=old_calculate_mac,
substitutions=substitutions,
xors=xors):
"""Monkey patch for the MAC calculation method of msg socket."""
mac_bytes = old_calculate_mac(mac, seqnumBytes, contentType, data)
mac_bytes = substitute_and_xor(mac_bytes, substitutions, xors)
return mac_bytes
state.msg_sock.calculateMAC = new_calculate_mac
return msg
generator.generate = new_generate
post_send_msg_sock_restore(generator, 'calculateMAC', 'old_calculate_mac')
return generator
[docs]
def fuzz_encrypted_message(generator, substitutions=None, xors=None):
"""
Change arbitrary bytes of the authenticated ciphertext block.
Can modify authentication tag of AEAD ciphers and CBC ciphers working
in encrypt then MAC mode.
:param MessageGenerator generator: modified message
:param dict(int,int) substitutions: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to change the bytes to
:param dict(int,int) xors: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to xor with
"""
def new_generate(state, self=generator,
old_generate=generator.generate,
substitutions=substitutions,
xors=xors):
"""Monkey patch to modify authenticated ciphertext block."""
msg = old_generate(state)
old_send = state.msg_sock._recordSocket.send
self.old_send = old_send
def new_send(message, padding, old_send=old_send,
substitutions=substitutions, xors=xors):
"""
Monkey patch for the send method of msg socket.
message.data is the encrypted tls record, e.g.
message.data = aead_encrypt(plain); defined by aead suite
message.data = encrypt(plain + padding) + mac; etm
message.data = encrypt(plain + mac + padding); mte
"""
data = message.write()
data = substitute_and_xor(data, substitutions, xors)
new_message = Message(message.contentType, data)
return old_send(new_message, padding)
state.msg_sock._recordSocket.send = new_send
return msg
generator.generate = new_generate
post_send_msg_sock_restore(generator, '._recordSocket.send', 'old_send')
return generator
[docs]
def div_ceil(divident, divisor):
"""Perform integer division of divident by divisor, round up."""
quotient, reminder = divmod(divident, divisor)
return quotient + int(bool(reminder))
[docs]
def fuzz_padding(generator, min_length=None, substitutions=None, xors=None):
"""
Change the padding of the message.
Works with CBC ciphers only.
Note: the "-1" position is the byte with
the length of padding while "-2" is the last byte of padding (if padding
has non-zero length)
:param MessageGenerator generator: modified message
:param int min_length: the minimum length of padding created, including
the byte specifying length of padding, must be smaller than 257
:param dict(int,int) substitutions: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to change the bytes to
:param dict(int,int) xors: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to xor with
"""
if min_length is not None and min_length > 256:
raise ValueError("Padding cannot be longer than 256 bytes")
def new_generate(state, self=generator,
old_generate=generator.generate,
substitutions=substitutions,
xors=xors):
"""Monkey patch to modify padding behaviour."""
msg = old_generate(state)
self.old_add_padding = state.msg_sock.addPadding
def new_add_padding(data, self=state.msg_sock,
old_add_padding=self.old_add_padding,
substitutions=substitutions,
xors=xors):
"""Monkey patch the padding creating method."""
if min_length is None:
# make a copy of data as we need it unmodified later
padded_data = old_add_padding(bytearray(data))
padding_length = padded_data[-1]
padding = padded_data[-(padding_length+1):]
else:
block_size = self.blockSize
padding_length = div_ceil(len(data) + min_length,
block_size) * block_size - len(data)
if padding_length > 256:
raise ValueError("min_length set too "
"high for message: {0}"
.format(padding_length))
padding = bytearray([padding_length - 1] * (padding_length))
padding = substitute_and_xor(padding, substitutions, xors)
return data + padding
state.msg_sock.addPadding = new_add_padding
return msg
generator.generate = new_generate
post_send_msg_sock_restore(generator, 'addPadding', 'old_add_padding')
return generator
[docs]
def replace_plaintext(generator, new_plaintext):
"""
Change the plaintext of the message right before encryption.
Will replace all data before encryption, including the IV, MAC and
padding.
Note: works only with CBC ciphers. in EtM mode will NOT modify MAC.
Length of new_plaintext must be multiple of negotiated cipher block size
(8 bytes for 3DES, 16 bytes for AES)
"""
def new_generate(state, self=generator,
old_generate=generator.generate,
new_plaintext=new_plaintext):
"""Monkey patch to modify padding behaviour."""
msg = old_generate(state)
self.old_add_padding = state.msg_sock.addPadding
def new_add_padding(data,
old_add_padding=self.old_add_padding,
self=state.msg_sock,
new_plaintext=new_plaintext):
"""Monkey patch the padding creating method."""
del data
del old_add_padding
block_size = self.blockSize
if len(new_plaintext) % block_size:
raise ValueError("new_plaintext length not a multiple of "
"cipher block size")
return new_plaintext
state.msg_sock.addPadding = new_add_padding
return msg
generator.generate = new_generate
post_send_msg_sock_restore(generator, 'addPadding', 'old_add_padding')
return generator
[docs]
def fuzz_plaintext(generator, substitutions=None, xors=None):
"""
Change arbitrary bytes of the plaintext right before encryption.
Get access to all data before encryption, including the IV, MAC and
padding.
Works only with CBC ciphers. in EtM mode will not include MAC.
Note: the "-1" position is the byte with
length of padding while "-2" is the last byte of padding (if padding
has non-zero length)
:param dict(int,int) substitutions: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to change the bytes to
:param dict(int,int) xors: modify specified bytes of the message,
the keys indicate the positions in the message (negative numbers
count from the end of messages), the values of the dictionary specify
the values to xor with
"""
def new_generate(state, self=generator,
old_generate=generator.generate,
substitutions=substitutions,
xors=xors):
"""Monkey patch to modify padding behaviour."""
msg = old_generate(state)
self.old_add_padding = state.msg_sock.addPadding
def new_add_padding(data,
old_add_padding=self.old_add_padding,
substitutions=substitutions,
xors=xors):
"""Monkey patch the padding creating method."""
data = old_add_padding(data)
data = substitute_and_xor(data, substitutions, xors)
return data
state.msg_sock.addPadding = new_add_padding
return msg
generator.generate = new_generate
post_send_msg_sock_restore(generator, 'addPadding', 'old_add_padding')
return generator
[docs]
def split_message(generator, fragment_list, size):
"""
Split a given message type to multiple messages.
Allows for splicing message into the middle of a different message type
"""
def new_generate(state, old_generate=generator.generate,
fragment_list=fragment_list, size=size):
"""Monkey patch for the generate method of the message generator."""
msg = old_generate(state)
content_type = msg.contentType
data = msg.write()
# since empty messages can be created much more easily with
# RawMessageGenerator, we don't handle 0 length messages here
if not data:
raise IndexError("Empty message payload")
while len(data) > 0:
# move the data to fragment_list (outside the method)
fragment_list.append(Message(content_type, data[:size]))
data = data[size:]
fragment_list.append(generator.post_send)
# make sure that the effect of the message is visible only once
# all parts of the message have been sent
generator.post_send = lambda x: None
return fragment_list.pop(0)
generator.generate = new_generate
return generator
[docs]
def queue_message(generator):
"""Queue message with other ones of the same content type.
Allow coalescing of the message with other messages with the same
content type, this allows for sending Certificate and CertificateVerify
in a single record.
"""
assert generator.is_generator()
generator.queue = True
return generator
[docs]
def skip_post_send(generator):
"""Make the post_send method of generator do nothing.
This is useful when combining messages that update connection state,
like KeyUpdate or Finished in TLS 1.3.
"""
assert generator.is_generator()
generator.post_send = lambda _: None
return generator
[docs]
class FlushMessageQueue(Command):
"""Flush the record layer queue of messages."""
def __init__(self, description=None):
super(FlushMessageQueue, self).__init__()
self.description = description
def __repr__(self):
vals = []
if self.description:
vals.append(('description', repr(self.description)))
return 'FlushMessageQueue({0})'.format(
', '.join("{0}={1}".format(i[0], i[1]) for i in vals))
[docs]
def process(self, state):
state.msg_sock.flushBlocking()
[docs]
class PopMessageFromList(MessageGenerator):
"""Takes a reference to list, pops a message from it to generate one."""
def __init__(self, fragment_list):
"""Link a list to store messages with the object."""
super(PopMessageFromList, self).__init__()
self.fragment_list = fragment_list
[docs]
def generate(self, state):
"""Create a message using the reference to list from init."""
msg = self.fragment_list.pop(0)
return msg
[docs]
def post_send(self, state):
super(PopMessageFromList, self).post_send(state)
if self.fragment_list and callable(self.fragment_list[0]):
func = self.fragment_list.pop(0)
func(state)
[docs]
class FlushMessageList(PopMessageFromList):
"""Takes a reference to list, empties it to generate a message."""
[docs]
def generate(self, state):
"""Creata a single message to empty the list."""
msg = self.fragment_list.pop(0)
content_type = msg.contentType
data = msg.write()
while self.fragment_list and not callable(self.fragment_list[0]):
msg_frag = self.fragment_list.pop(0)
assert msg_frag.contentType == content_type
data += msg_frag.write()
msg_ret = Message(content_type, data)
return msg_ret
[docs]
def fuzz_pkcs1_padding(key, substitutions=None, xors=None, padding_byte=None):
"""
Fuzz the PKCS#1 padding used in signatures or encryption.
Use to modify Client Key Exchange padding of encrypted value.
"""
if xors is None and substitutions is None and padding_byte is None:
return key
def new_addPKCS1Padding(bytes, blockType, self=key,
old_add_padding=key._addPKCS1Padding,
substitutions=substitutions, xors=xors,
padding_byte=padding_byte):
"""Monkey patch for the _addPKCS1Padding() method of RSA key."""
ret = old_add_padding(bytes, blockType)
pad_length = numBytes(self.n) - len(bytes)
pad = ret[:pad_length]
value = ret[pad_length:]
if padding_byte is not None:
# don't change version, type or payload separator
for i in range(2, pad_length-1):
pad[i] = padding_byte
pad = substitute_and_xor(pad, substitutions, xors)
return pad + value
key._addPKCS1Padding = new_addPKCS1Padding
return key