Source code for tlsfuzzer.messages

# Author: Hubert Kario, (c) 2015
# Released under Gnu GPL v2.0, see LICENSE file for details

"""Objects for generating TLS messages to send."""

import random
import struct
from tlslite.messages import ClientHello, ClientKeyExchange, ChangeCipherSpec,\
        Finished, Alert, ApplicationData, Message, Certificate, \
        CertificateVerify, CertificateRequest, ClientMasterKey, \
        ClientFinished, ServerKeyExchange, ServerHello, Heartbeat, \
        KeyUpdate
from tlslite.constants import AlertLevel, AlertDescription, ContentType, \
        ExtensionType, CertificateType, HashAlgorithm, \
        SignatureAlgorithm, CipherSuite, SignatureScheme, TLS_1_3_HRR, \
        HeartbeatMessageType
import tlslite.utils.tlshashlib as hashlib
from tlslite.extensions import TLSExtension, RenegotiationInfoExtension, \
        ClientKeyShareExtension, StatusRequestExtension
from tlslite.messagesocket import MessageSocket
from tlslite.defragmenter import Defragmenter
from tlslite.mathtls import calc_key, \
        calcExtendedMasterSecret
from tlslite.handshakehashes import HandshakeHashes
from tlslite.utils.codec import Writer
from tlslite.utils.cryptomath import getRandomBytes, numBytes, \
    numberToByteArray, bytesToNumber, HKDF_expand_label, secureHMAC, \
    derive_secret
from tlslite.keyexchange import KeyExchange
from tlslite.bufferedsocket import BufferedSocket
from tlslite.recordlayer import ConnectionState
from .helpers import key_share_gen, AutoEmptyExtension, ECDSA_SIG_ALL, \
        RSA_PKCS1_ALL, RSA_PSS_PSS_ALL, RSA_PSS_RSAE_ALL, SIG_ALL, DSA_ALL
from .handshake_helpers import calc_pending_states, curve_name_to_hash_tls13
from .tree import TreeNode
import socket
from functools import partial


[docs] class Command(TreeNode): """Command objects.""" def __init__(self): """Create object.""" super(Command, self).__init__()
[docs] def is_command(self): """Define object as a command node.""" return True
[docs] def is_expect(self): """Define object as a command node.""" return False
[docs] def is_generator(self): """Define object as a command node.""" return False
[docs] def process(self, state): """Change the state of the connection.""" raise NotImplementedError("Subclasses need to implement this!")
[docs] class Connect(Command): """Object used to connect to a TCP server.""" def __init__(self, hostname, port, version=(3, 0), timeout=5): """ Provide minimal settings needed to connect to other peer. :param str hostname: host name of the server to connect to :param int port: :term:`TCP` port number to connect to :param tuple(int,int) version: the protocol version used in the record layer for the initial messages :param float timeout: amount of time to wait while expecting a message before aborting the connection, in seconds """ super(Connect, self).__init__() self.hostname = hostname self.port = port self.version = version self.timeout = timeout
[docs] def process(self, state): """Connect to a server.""" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(self.timeout) sock.connect((self.hostname, self.port)) # disable Nagle - we handle buffering and flushing ourselves sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # allow for later buffering of writes to the socket sock = BufferedSocket(sock) defragmenter = Defragmenter() defragmenter.add_static_size(ContentType.alert, 2) defragmenter.add_static_size(ContentType.change_cipher_spec, 1) defragmenter.add_dynamic_size(ContentType.handshake, 1, 3) state.msg_sock = MessageSocket(sock, defragmenter) state.msg_sock.version = self.version
[docs] class SetRecordVersion(Command): """Change the version used at record layer.""" def __init__(self, version): super(SetRecordVersion, self).__init__() self.version = version
[docs] def process(self, state): state.msg_sock.version = self.version
[docs] class Close(Command): """Object used to close a TCP connection.""" def __init__(self): """Close connection object.""" super(Close, self).__init__()
[docs] def process(self, state): """Close currently open connection.""" state.msg_sock.sock.close()
[docs] class CloseRST(Command): """Object used to close a TCP connection with a RST packet.""" def __init__(self): """CloseRST connection object.""" super(CloseRST, self).__init__()
[docs] def process(self, state): """Close currently open connection by sending a RST packet.""" l_onoff = 1 l_linger = 0 state.msg_sock.sock.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack('ii', l_onoff, l_linger)) state.msg_sock.sock.close()
[docs] class ResetHandshakeHashes(Command): """ Object used to reset current state of handshake hashes to zero. Used for session renegotiation or resumption. Also prepares for negotiation (or dropping) of record_size_limit extension. """ def __init__(self): """Object for resetting handshake hashes of session.""" super(ResetHandshakeHashes, self).__init__()
[docs] def process(self, state): """Reset current running handshake protocol hashes.""" state.handshake_hashes = HandshakeHashes() # remove the keys that can interfere with key calculation # don't remove all as we need things like "resumption master secret" # to actually resume next session state.key.pop('PSK secret', None) state.key.pop('DH shared secret', None) # while not part of handshake hashes, it needs to be done every time # renegotiation or resumption is performed as the extension needs to # be negotiated every time state._our_record_size_limit = 2**14 state._peer_record_size_limit = 2**14
[docs] class ResetRenegotiationInfo(Command): """Object used to reset state of data needed for secure renegotiation.""" def __init__(self, client=None, server=None): """Reset the stored rengotiation info to provided values.""" super(ResetRenegotiationInfo, self).__init__() self.client_verify_data = client self.server_verify_data = server
[docs] def process(self, state): """Reset current Finished message values.""" if self.client_verify_data is None: self.client_verify_data = bytearray(0) if self.server_verify_data is None: self.server_verify_data = bytearray(0) state.key['client_verify_data'] = self.client_verify_data state.key['server_verify_data'] = self.server_verify_data
[docs] class SetMaxRecordSize(Command): """Change the Record Layer to send records of non standard size.""" def __init__(self, max_size=None): """Set the maximum record layer message size, no option for default.""" super(SetMaxRecordSize, self).__init__() self.max_size = max_size
[docs] def process(self, state): """Change the size of messages in record layer.""" if self.max_size is None: # the maximum for SSLv3, TLS1.0, TLS1.1 and TLS1.2 state.msg_sock.recordSize = 2**14 else: state.msg_sock.recordSize = self.max_size
[docs] class SetPaddingCallback(Command): """ Set the padding callback which returns the length of the padding to be added to the message in the record layer. """ def __init__(self, cb=None): """Set the padding callback""" super(SetPaddingCallback, self).__init__() self.padding_cb = cb
[docs] @staticmethod def fixed_length_cb(size): """ Returns a callback function which returns a fixed number as the padding size """ def _fixed_len_cb(length, contenttype, max_padding, zeroes=size): """ Simple callback which returns a fixed number as the padding size to be added to the message """ if zeroes > (max_padding - length): raise ValueError("requested padding size is too large") return zeroes return _fixed_len_cb
[docs] @staticmethod def fill_padding_cb(length, contenttype, max_padding): """ Simple callback which returns the maximum padding size as the size of the padding to be added to the message """ return max_padding - length
[docs] @staticmethod def add_fixed_padding_cb(size): """ Returns a callback function which returns a fixed number as the padding size """ def _add_fixed_passing_cb(length, contenttype, max_padding, zeros=size): """ Simple callback which returns a fixed number as the padding size to be added to the message. This function does not check a correct size of padding, can cause buffer overflow alert. """ return zeros return _add_fixed_passing_cb
[docs] def process(self, state): """ Set the callback which returns the length of the padding in record layer. """ state.msg_sock.padding_cb = self.padding_cb
[docs] class TCPBufferingEnable(Command): """ Start buffering all writes on the TCP level of connection. You will need to call an explicit flush to send the messages. """
[docs] def process(self, state): """Enable TCP buffering.""" state.msg_sock.sock.buffer_writes = True
[docs] class TCPBufferingDisable(Command): """ Stop buffering all writes on the TCP level. All messages will be now passed directly to the TCP socket """
[docs] def process(self, state): """Disable TCP buffering.""" state.msg_sock.sock.buffer_writes = False
[docs] class TCPBufferingFlush(Command): """ Send all messages in the buffer. Does not change the state of buffering """
[docs] def process(self, state): """Flush all messages to TCP socket.""" state.msg_sock.sock.flush()
[docs] class ResetWriteConnectionState(Command): """ Reset _writeState configuration to default values All sent messages will be unencrypted now """
[docs] def process(self, state): state.msg_sock._writeState = ConnectionState()
[docs] class CollectNonces(Command): """ Start collecting nonces being sent by the server in the provided array. Works only for ciphers like AES-GCM which use explicit nonces. Ciphers like Chacha20 use implicit nonce constructed from PRF output and sequence number. Needs to be run after the cipher was negotiated and switched to (after CCS), will collect nonces only till next renegotiation. """ def __init__(self, nonces): """Link the list for storing nonces with the object.""" super(CollectNonces, self).__init__() self.nonces = nonces
[docs] def process(self, state): """Monkey patch the seal() method.""" seal_mthd = state.msg_sock._writeState.encContext.seal def collector(nonce, buf, authData, old_seal=seal_mthd, nonces=self.nonces): """Collect used nonces for encryption.""" nonces.append(nonce) return old_seal(nonce, buf, authData) state.msg_sock._writeState.encContext.seal = collector
[docs] class CopyVariables(Command): """ Copy current random values of connection to provided arrays. Available keys are either ``ClientHello.random``, ``ServerHello.random``, ``ServerHello.session_id`` or one of the values in ``key`` in :py:class:`~tlsfuzzer.runner.ConnectionState` (``premaster_secret``, ``master_secret``, ``ServerHello.extensions.key_share.key_exchange``, ``server handshake traffic secret``, ``exporter master secret``, ``ServerKeyExchange.key_share``, ``ServerKeyExchange.dh_p``, ``ClientKeyExchange.dh_Yc``, ``ClientKeyExchange.ecdh_Yc``, ``DH shared secret``, ``PSK secret``, ``client_verify_data``, ``server_verify_data``, ``client application traffic secret``, ``server application traffic secret``, ``resumption master secret``, ``early secret``, or ``handshake secret``) The log should be a dict (where keys have the above specified names) and values should be arrays (the values will be appended there). This node needs to be put right after a node that calculate or use the specific values to guarantee correct collection (i.e. if the conversation performs a renegotiation, it needs to be placed after both :py:class:`~tlsfuzzer.expect.ExpectServerHello` nodes to collect both ``ServerHello.random`` values). :param dict(str,list) log: dictionary with names of values to collect """ def __init__(self, log): """Link the randoms to log with session.""" super(CopyVariables, self).__init__() self.log = log
[docs] def process(self, state): """Copy current variables to log arrays.""" for name, val in self.log.items(): if name == 'ClientHello.random': val.append(state.client_random) elif name == 'ServerHello.random': val.append(state.server_random) elif name == 'ServerHello.session_id': val.append(state.session_id) else: if name not in state.key: raise ValueError("'{0}' variable is not defined yet or " "invalid for ConnectionState.key lookups." .format(name)) val.append(state.key[name])
[docs] class RawSocketWriteGenerator(Command): """ Send a plaintext data irrespective of encryption state. Does not update handshake hashes, record layer state, does not fragment, etc. :ivar bytearray ~.data: data to send :ivar str ~.description: identifier to print when processing of the node fails """ def __init__(self, data, description=None): """Set the record layer type and payload to send.""" super(RawSocketWriteGenerator, self).__init__() self.data = data self.description = description def __repr__(self): """Return human readable representation of the object.""" return self._repr(["data", "description"])
[docs] def process(self, state): """Send the message over the socket.""" state.msg_sock._recordSocket.sock.send(self.data)
[docs] class PlaintextMessageGenerator(Command): """ Send a plaintext data record irrespective of encryption state. Does not update handshake hashes, record layer state, does not fragment, etc. :ivar int content_type: content type of message, used in record layer header. See :py:class:`~tlslite.constants.ContentType` for well-known values :ivar bytearray ~.data: payload for the record :ivar str ~.description: identifier to print when processing of the node fails """ def __init__(self, content_type, data, description=None): """Set the record layer type and payload to send.""" super(PlaintextMessageGenerator, self).__init__() self.content_type = content_type self.data = data self.description = description def __repr__(self): """Return human readable representation of the object.""" vals = [] vals.append(('content_type', self.content_type)) vals.append(('data', repr(self.data))) if self.description: vals.append(('description', repr(self.description))) return "PlaintextMessageGenerator({0})".format( ", ".join("{0}={1}".format(i[0], i[1]) for i in vals))
[docs] def process(self, state): """Send the message over the socket.""" msg = Message(self.content_type, self.data) for _ in state.msg_sock._recordSocket.send(msg): pass
[docs] class MessageGenerator(TreeNode): """Message generator objects.""" def __init__(self): """Initialize the object.""" super(MessageGenerator, self).__init__() self.msg = None self.queue = False
[docs] def is_command(self): """Define object as a generator node.""" return False
[docs] def is_expect(self): """Define object as a generator node.""" return False
[docs] def is_generator(self): """Define object as a generator node.""" return True
[docs] def generate(self, state): """Return a message ready to write to socket.""" raise NotImplementedError("Subclasses need to implement this!")
[docs] def post_send(self, state): """Modify the state after sending the message.""" # since most messages don't require any post-send modifications # create a no-op default action pass
[docs] class RawMessageGenerator(MessageGenerator): """ Generator for arbitrary record layer messages. Can generate message with any content_type and any payload. Will be encrypted if encryption is negotiated in the connection. :ivar int content_type: content type of message, used in record layer header. See :py:class:`~tlslite.constants.ContentType` for well-known values :ivar bytearray ~.data: payload for the record :ivar str ~.description: identifier to print when processing of the node fails """ def __init__(self, content_type, data, description=None): """Set the record layer type and payload to send.""" super(RawMessageGenerator, self).__init__() self.content_type = content_type self.data = data self.description = description
[docs] def generate(self, state): """Create a tlslite-ng message that can be send.""" message = Message(self.content_type, self.data) return message
def __repr__(self): """Return human readable representation of the object.""" if self.description is None: return "RawMessageGenerator(content_type={0!s}, data={1!r})".\ format(self.content_type, self.data) else: return "RawMessageGenerator(content_type={0!s}, data={1!r}, " \ "description={2!r})".format(self.content_type, self.data, self.description)
[docs] class HandshakeProtocolMessageGenerator(MessageGenerator): """Message generator for TLS Handshake protocol messages."""
[docs] def post_send(self, state): """Update handshake hashes after sending.""" super(HandshakeProtocolMessageGenerator, self).post_send(state) state.handshake_hashes.update(self.msg.write()) state.handshake_messages.append(self.msg)
[docs] def ch_key_share_handler(state): """Client Hello key_share extension handler. Generates the key share for the group selected by server in the last HRR message. """ hrr = state.get_last_message_of_type(ServerHello) if not hrr or hrr.random != TLS_1_3_HRR: # as the second CH should never be used without ExpectHelloRetryRequest # before it, using this helper when there is no HRR in current # handshake messages in the state is a user error, not server error raise ValueError("No HRR received") hrr_key_share = hrr.getExtension(ExtensionType.key_share) # the check if the group selected in HRR was advertised in the first # ClientHello happens in the hrr_ext_handler_key_share() group_id = hrr_key_share.selected_group key_shares = [key_share_gen(group_id)] key_share = ClientKeyShareExtension().create(key_shares) return key_share
[docs] class ClientHelloGenerator(HandshakeProtocolMessageGenerator): """Generator for TLS handshake protocol Client Hello messages.""" def __init__(self, ciphers=None, extensions=None, version=None, session_id=None, random=None, compression=None, ssl2=False, modifiers=None): """Set up the object for generation of Client Hello messages.""" super(ClientHelloGenerator, self).__init__() if ciphers is None: ciphers = [] if compression is None: compression = [0] self.version = version self.ciphers = ciphers self.extensions = extensions self.session_id = session_id self.random = random self.compression = compression self.ssl2 = ssl2 self.modifiers = modifiers def __repr__(self): """Human readable representation of the object.""" ret = [] if self.ssl2: ret.append("ssl2={0!r}".format(self.ssl2)) if self.version: ret.append("version={0!r}".format(self.version)) if self.ciphers: ret.append("ciphers={0!r}".format(self.ciphers)) if self.random: ret.append("random={0!r}".format(self.random)) if self.session_id: ret.append("session_id={0!r}".format(self.session_id)) if self.compression: ret.append("compression={0!r}".format(self.compression)) if self.extensions: ret.append("extensions={0!r}".format(self.extensions)) if self.modifiers: ret.append("modifiers={0!r}".format(self.modifiers)) return "ClientHelloGenerator({0})".format(", ".join(ret))
[docs] def _generate_extensions(self, state): """Convert extension generators to extension objects.""" extensions = [] for ext_id in self.extensions: if self.extensions[ext_id] is not None: if callable(self.extensions[ext_id]): extensions.append(self.extensions[ext_id](state)) elif isinstance(self.extensions[ext_id], TLSExtension): extensions.append(self.extensions[ext_id]) elif self.extensions[ext_id] is AutoEmptyExtension(): extensions.append(TLSExtension().create(ext_id, bytearray())) else: raise ValueError("Bad extension, id: {0}".format(ext_id)) continue if ext_id == ExtensionType.renegotiation_info: ext = RenegotiationInfoExtension()\ .create(state.key['client_verify_data']) elif ext_id == ExtensionType.status_request: ext = StatusRequestExtension().create() elif ext_id in (ExtensionType.client_hello_padding, ExtensionType.encrypt_then_mac, ExtensionType.extended_master_secret, ExtensionType.session_ticket, 49, # post_handshake_auth 52): # transparency_info ext = TLSExtension().create(ext_id, bytearray()) else: raise ValueError("No autohandler for extension {0}" .format(ExtensionType.toStr(ext_id))) extensions.append(ext) return extensions
[docs] def _handle_modifiers(self, state, clnt_hello): """Handle processing of the modifiers of the message.""" if self.modifiers is None: self.modifiers = [] # TODO psk_binder updater for session tickets for mod in self.modifiers: mod(state, clnt_hello)
[docs] def generate(self, state): """Create a Client Hello message.""" if self.version is None: self.version = state.client_version if self.random: state.client_random = self.random if self.session_id is None and state.session_id: self.session_id = state.session_id if self.session_id is None and self.extensions \ and ExtensionType.supported_versions in self.extensions: # in TLS 1.3, the server should reply with CCS (middlebox compat # mode) only when client sends a session_id self.session_id = getRandomBytes(32) # if still unset, set to default value if not self.session_id: self.session_id = bytearray(b'') if not state.client_random: state.client_random = bytearray(32) extensions = None if self.extensions is not None: extensions = self._generate_extensions(state) clnt_hello = ClientHello(self.ssl2).create(self.version, state.client_random, self.session_id, self.ciphers, extensions=extensions) clnt_hello.compression_methods = self.compression self._handle_modifiers(state, clnt_hello) state.client_version = self.version self.msg = clnt_hello return clnt_hello
[docs] class ClientKeyExchangeGenerator(HandshakeProtocolMessageGenerator): """ Generator for TLS handshake protocol Client Key Exchange messages. :vartype dh_Yc: int :ivar dh_Yc: Override the sent dh_Yc value to the specified one :vartype padding_subs: dict(int,int) :ivar padding_subs: Substitutions for the encrypted premaster secret padding bytes (applicable only for the RSA key exchange) :vartype padding_xors: dict(int,int) :ivar padding_xors: XORs for the encrypted premaster secret padding bytes (applicable only for the RSA key exchange) :vartype ecdh_Yc: bytearray :ivar ecdh_Yc: encoded ECC point being the client key share for the key exchange :vartype encrypted_premaster: bytearray :ivar encrypted_premaster: the premaster secret after it was encrypted, as it will be sent on the wire :vartype modulus_as_encrypted_premaster: bool :ivar modulus_as_encrypted_premaster: if True, set the encrypted premaster (the value seen on the wire) to the server's certificate modulus (the server's public key) :vartype p_as_share: bool :ivar p_as_share: set the key share to the value :math:`p` provided by server in Server Key Exchange (applicable only to FFDHE key exchange) :vartype p_1_as_share: bool :ivar p_1_as_share: set the key share to the value :math:`p - 1`, as provided by server in Server Key Exchange (applicable only to FFDHE key exchange with safe primes) :ivar int padding_byte: byte to use as padding instead of randomly generated bytes (applicable only for RSA key exchange) :ivar tuple(int,int) client_version: the version to set in the RSA pre-master secret :ivar bool reuse_encrypted_premaster: if set to true, the message generator will create the RSA ciphertext once and reuse it for subsequent connections. Applicable only to RSA key exchange, useful only for tests that run the same conversation over and over (like timing tests). :ivar encrypted_premaster_file: The :term:`file object` from which to read the encrypted premaster secret, on node re-ececution will read subsequent values, does not rewind the file pointer or close the file. The file must be opened in binary mode :vartype encrypted_premaster_file: :term:`file object` :ivar int encrypted_premaster_length: The length of data to read, in bytes :ivar bool random_premaster: whether to use a random premaster value or the static default (48 zero bytes) """ def __init__(self, cipher=None, version=None, client_version=None, dh_Yc=None, padding_subs=None, padding_xors=None, ecdh_Yc=None, encrypted_premaster=None, modulus_as_encrypted_premaster=False, p_as_share=False, p_1_as_share=False, premaster_secret=None, padding_byte=None, reuse_encrypted_premaster=False, encrypted_premaster_file=None, encrypted_premaster_length=None, random_premaster=False): """Set settings of the Client Key Exchange to be sent.""" super(ClientKeyExchangeGenerator, self).__init__() self.cipher = cipher self.version = version self.client_version = client_version if premaster_secret is None: self.premaster_secret = bytearray(48) else: self.premaster_secret = premaster_secret self.dh_Yc = dh_Yc self.padding_subs = padding_subs self.padding_xors = padding_xors self.ecdh_Yc = ecdh_Yc self.encrypted_premaster = encrypted_premaster self.modulus_as_encrypted_premaster = modulus_as_encrypted_premaster self.p_as_share = p_as_share self.p_1_as_share = p_1_as_share self.padding_byte = padding_byte self.reuse_encrypted_premaster = reuse_encrypted_premaster self.encrypted_premaster_file = encrypted_premaster_file self.encrypted_premaster_length = encrypted_premaster_length self.random_premaster = random_premaster if encrypted_premaster_file and not encrypted_premaster_length: raise ValueError("Must specify the length of data to read from" " encrypted_premaster_file") if modulus_as_encrypted_premaster and encrypted_premaster_file: raise ValueError("Can't set both modulus_as_encrypted_premaster " "and encrypted_premaster_file at the same time") if p_as_share and p_1_as_share: raise ValueError("Can't set both p_as_share and p_1_as_share at " "the same time")
[docs] def generate(self, status): """Create a Client Key Exchange message.""" if self.version is None: self.version = status.version if self.cipher is None: self.cipher = status.cipher if self.client_version is None: self.client_version = status.client_version cke = ClientKeyExchange(self.cipher, self.version) if self.cipher in CipherSuite.certSuites: if self.modulus_as_encrypted_premaster: public_key = status.get_server_public_key() self.encrypted_premaster = numberToByteArray(public_key.n) elif self.encrypted_premaster_file: self.encrypted_premaster = self.encrypted_premaster_file.read( self.encrypted_premaster_length) if self.encrypted_premaster: cke.createRSA(self.encrypted_premaster) if self.reuse_encrypted_premaster: status.key['premaster_secret'] = self.premaster_secret else: if self.random_premaster: self.premaster_secret = getRandomBytes(48) if len(self.premaster_secret) >= 2: self.premaster_secret[0] = self.client_version[0] self.premaster_secret[1] = self.client_version[1] status.key['premaster_secret'] = self.premaster_secret public_key = status.get_server_public_key() enc_premaster = self._encrypt_with_fuzzing(public_key) if self.reuse_encrypted_premaster: self.encrypted_premaster = enc_premaster cke.createRSA(enc_premaster) elif self.cipher in CipherSuite.dheCertSuites or \ self.cipher in CipherSuite.dheDsaSuites: if self.dh_Yc is not None: cke = ClientKeyExchange(self.cipher, self.version).createDH(self.dh_Yc) elif self.p_as_share or self.p_1_as_share: ske = status.get_last_message_of_type(ServerKeyExchange) assert ske, "No server key exchange in messages" if self.p_as_share: cke = ClientKeyExchange(self.cipher, self.version).createDH(ske.dh_p) else: cke = ClientKeyExchange(self.cipher, self.version).createDH(ske.dh_p-1) else: cke = status.key_exchange.makeClientKeyExchange() status.key['ClientKeyExchange.dh_Yc'] = cke.dh_Yc elif self.cipher in CipherSuite.ecdhAllSuites: if self.ecdh_Yc is not None: cke = ClientKeyExchange(self.cipher, self.version).createECDH(self.ecdh_Yc) else: cke = status.key_exchange.makeClientKeyExchange() status.key['ClientKeyExchange.ecdh_Yc'] = cke.ecdh_Yc else: raise AssertionError("Unknown cipher/key exchange type") self.msg = cke return cke
[docs] def _encrypt_with_fuzzing(self, public_key): """Use public_key to encrypt premaster secret with fuzzed padding.""" old_addPKCS1Padding = public_key._addPKCS1Padding public_key = fuzz_pkcs1_padding(public_key, self.padding_subs, self.padding_xors, self.padding_byte) ret = public_key.encrypt(self.premaster_secret) public_key._addPKCS1Padding = old_addPKCS1Padding return ret
[docs] def post_send(self, state): """Save intermediate handshake hash value.""" # for EMS all messages up to and including CKE are part of # "session_hash" super(ClientKeyExchangeGenerator, self).post_send(state) state.certificate_verify_handshake_hashes = \ state.handshake_hashes.copy()
[docs] class ClientMasterKeyGenerator(HandshakeProtocolMessageGenerator): """Generator for SSLv2 Handshake Protocol CLIENT-MASTER-KEY message.""" def __init__(self, cipher=None, master_key=None): """Set the cipher to send to server.""" super(ClientMasterKeyGenerator, self).__init__() self.cipher = cipher self.master_key = master_key
[docs] def generate(self, state): """Generate a new CLIENT-MASTER-KEY message.""" if self.cipher is None: raise NotImplementedError("No cipher autonegotiation") if self.master_key is None: if state.key['master_secret'] == bytearray(0): if self.cipher in CipherSuite.ssl2_128Key: key_size = 16 elif self.cipher in CipherSuite.ssl2_192Key: key_size = 24 elif self.cipher in CipherSuite.ssl2_64Key: key_size = 8 else: raise AssertionError("unknown cipher but no master_secret") self.master_key = getRandomBytes(key_size) else: self.master_key = state.key['master_secret'] cipher = self.cipher if (cipher not in CipherSuite.ssl2rc4 and cipher not in CipherSuite.ssl2_3des): # tlslite-ng doesn't implement anything else, so we need to # workaround calculation of pending states failure for test # cases which don't really encrypt data cipher = CipherSuite.SSL_CK_RC4_128_WITH_MD5 key_arg = state.msg_sock.calcSSL2PendingStates(cipher, self.master_key, state.client_random, state.server_random, None) if self.cipher in CipherSuite.ssl2export: clear_key = self.master_key[:-5] secret_key = self.master_key[-5:] else: clear_key = bytearray(0) secret_key = self.master_key pub_key = state.get_server_public_key() encrypted_master_key = pub_key.encrypt(secret_key) cmk = ClientMasterKey() cmk.create(self.cipher, clear_key, encrypted_master_key, key_arg) self.msg = cmk return cmk
[docs] class CertificateGenerator(HandshakeProtocolMessageGenerator): """Generator for TLS handshake protocol Certificate message.""" def __init__(self, certs=None, cert_type=None, version=None, context=None): """Set the certificates to send to server.""" super(CertificateGenerator, self).__init__() self.certs = certs self.cert_type = cert_type self.version = version self.context = context
[docs] def generate(self, status): """Create a Certificate message.""" if self.version is None: self.version = status.version if self.cert_type is None: self.cert_type = CertificateType.x509 context = b'' if self.context: context = self.context[-1] assert isinstance(context, CertificateRequest) context = context.certificate_request_context cert = Certificate(self.cert_type, version=self.version) cert.create(self.certs, context=context) if self.context: self.context.append(cert) self.msg = cert return cert
[docs] class CertificateVerifyGenerator(HandshakeProtocolMessageGenerator): """ Generator for TLS handshake protocol Certificate Verify message. :vartype msg_alg: tuple(int,int) :ivar msg_alg: signature and hash algorithm to be set on in the digitally-signed structure of TLSv1.2 Certificate Verify message. By default the first matching algorithm from CertificateRequest that matches our key or sent certificate. If no CertificateRequest received it will send the first algorithm matching our key or certificate sent. If no Certificate nor private key is available, it will select first algorithm from CertificateRequest. If no Certificate, CertificateRequest nor private key is availbale then it will use SHA-1 + RSA The first value in the tuple specifies hash type (from HashAlgorithm) and the second value specifies the signature algorithm (from SignatureAlgorithm). Or the value from SignatureScheme. :vartype msg_version: tuple(int,int) :ivar msg_version: protocol version that the message is to use, default is taken from current connection state :vartype sig_version: tuple(int,int) :ivar sig_version: protocol version to use for calculating the verify bytes for the signature (overrides msg_version, but just for the signature). Equal to msg_version by default. :vartype sig_alg: tuple(int,int) :ivar sig_alg: hash and signature algorithm to be used for creating the signature in the message. Equal to msg_alg by default. Requires the ``sig_version`` to be set to at least TLSv1.2 to be effective. :vartype signature: bytearray :ivar signature: bytes to sent as the signature of the message :vartype padding_xors: dict(int,int) :ivar padding_xors: which bytes of the pre-encryption RSA padding or post-signature ECDSA signature should be xored and with what values :vartype padding_subs: dict(int,int) :ivar padding_subs: same as padding_xors but substitues specified bytes instead :vartype mgf1_hash: str :ivar mgf1_hash: name of the hash to be used for calculating MGF1, effective only if sig_alg is set to a RSA_PSS algorithm and sig_version is TLS 1.2 or greater. By default the hash taken from sig_alg. :vartype rsa_pss_salt_len: int :ivar rsa_pss_salt_len: length of the salt (in bytes) used in signature. Effective only if sig_alg is set to a RSA_PSS algorithm and sig_version is TLS 1.2 or greater. By default it's equal to the length of the hash taken from sig_alg. :vartype private_key: :py:class:`~tlslite.utils.rsakey.RSAKey` or :py:class:`~tlslite.utils.ecdsakey.ECDSAKey` :ivar private_key: key that will be used for signing the message """ def __init__(self, private_key=None, msg_version=None, msg_alg=None, sig_version=None, sig_alg=None, signature=None, rsa_pss_salt_len=None, padding_xors=None, padding_subs=None, mgf1_hash=None, context=None): """Create object for generating Certificate Verify messages.""" super(CertificateVerifyGenerator, self).__init__() self.private_key = private_key self.msg_alg = msg_alg self.msg_version = msg_version self.sig_version = sig_version if not isinstance(sig_version, tuple) and sig_version is not None: raise ValueError("sig_version must be None or a tuple") self.sig_alg = sig_alg self.signature = signature self.rsa_pss_salt_len = rsa_pss_salt_len self.padding_xors = padding_xors self.padding_subs = padding_subs self.mgf1_hash = mgf1_hash self.context = context
[docs] @staticmethod def _sig_alg_for_rsa_key(key_alg, accept_sig_algs, version): """Select an acceptable signature algorithm for a given rsa key.""" if version < (3, 3): # in TLS 1.1 and earlier, there is no algorithm selection, # pick one closest, as far as used algorithms are concerned, to # the TLS 1.2 algorithm return (HashAlgorithm.sha1, SignatureAlgorithm.rsa) if key_alg == "rsa": # with rsa key we can make either RSA-PSS or RSA-PKCS#1 signatures if version < (3, 4): valid_sig_algs = RSA_PSS_RSAE_ALL + RSA_PKCS1_ALL else: # but not in TLS 1.3 valid_sig_algs = RSA_PSS_RSAE_ALL else: # with rsa-pss key we can make only RSA-PSS signatures assert key_alg == "rsa-pss" valid_sig_algs = RSA_PSS_PSS_ALL return next((i for i in accept_sig_algs if i in valid_sig_algs), valid_sig_algs[0])
[docs] @staticmethod def _sig_alg_for_ecdsa_key(accept_sig_algs, version, key): """Select an acceptable signature algorithm for a given ecdsa key.""" if version < (3, 3): # in TLS 1.1 and earlier, there is no algorithm selection, # pick one closest, as far as used algorithms are concerned, to # the TLS 1.2 algorithm return (HashAlgorithm.sha1, SignatureAlgorithm.ecdsa) if version < (3, 4): # in TLS 1.2 we can mix and match hashes and curves return next((i for i in accept_sig_algs if i in ECDSA_SIG_ALL), ECDSA_SIG_ALL[0]) # but in TLS 1.3 we need to select a hash that matches our key hash_name = curve_name_to_hash_tls13(key.curve_name) # while it may select one that wasn't advertised by server, # this is better last resort than sending a sha1+rsa sigalg return (getattr(HashAlgorithm, hash_name), SignatureAlgorithm.ecdsa)
[docs] @staticmethod def _sig_alg_for_dsa_key(accept_sig_algs, version, key): """Select an acceptable signature algorithm for a given DSA key.""" if version < (3, 3): # in TLS 1.1 and earlier, there is no algorithm selection, # pick one closest, as far as used algorithms are concerned, to # the TLS 1.2 algorithm return (HashAlgorithm.sha1, SignatureAlgorithm.dsa) if version < (3, 4): # in TLS 1.2 we can mix and match hashes and curves return next((i for i in accept_sig_algs if i in DSA_ALL), DSA_ALL[0]) # but in TLS 1.3 we need to select a hash that matches our key hash_name = curve_name_to_hash_tls13(key.curve_name) # while it may select one that wasn't advertised by server, # this is better last resort than sending a sha1+rsa sigalg return (getattr(HashAlgorithm, hash_name), SignatureAlgorithm.dsa)
[docs] @staticmethod def _sig_alg_for_eddsa_key(key_alg, accept_sig_algs): sig_alg = getattr(SignatureScheme, key_alg.lower()) assert sig_alg in accept_sig_algs return sig_alg
[docs] @staticmethod def _sig_alg_for_certificate(key_alg, accept_sig_algs, version, key): """ Select an acceptable signature algorithm based on key algorithm, protocol version and curve name (in case of ECDSA). """ if key_alg in ("rsa", "rsa-pss"): return CertificateVerifyGenerator._sig_alg_for_rsa_key( key_alg, accept_sig_algs, version) if key_alg in ("Ed25519", "Ed448"): return CertificateVerifyGenerator._sig_alg_for_eddsa_key( key_alg, accept_sig_algs) if key_alg == "dsa": return CertificateVerifyGenerator._sig_alg_for_dsa_key( accept_sig_algs, version, key) assert key_alg == "ecdsa" return CertificateVerifyGenerator._sig_alg_for_ecdsa_key( accept_sig_algs, version, key)
[docs] def _get_key_and_key_type(self, status): """ Get a key, or if not possible, certificate for selecting the signature algorithm. """ key_type = None key = None if self.private_key: key_type = self.private_key.key_type key = self.private_key cert = status.get_last_message_of_type(Certificate) our_cert = None if cert: our_cert = cert.cert_chain.x509List[0] if not key_type: key_type = our_cert.certAlg key = our_cert.publicKey return key_type, key, our_cert
[docs] def _select_msg_alg(self, status): """ Select the signature algorithm based on CertificateRequest from server, either our sent Certificate or our private key and the protocol version. """ # first, what signature algorithms we can use key_type, key, our_cert = self._get_key_and_key_type(status) # second, what signature algorithms does the server like ok_sig_algs = SIG_ALL if self.context: # do post-handshake authentication from TLS 1.3 cert_req = self.context[0] assert isinstance(cert_req, CertificateRequest) else: cert_req = status.get_last_message_of_type(CertificateRequest) if cert_req is not None: # when we got a CR message, we need to select a signature that # matches one of the algorithms the server sent if not self.private_key and not our_cert: # when sending malformed messages, the key may not be # even loaded, so select any algorithm acceptable to server self.msg_alg = cert_req.supported_signature_algs[0] return ok_sig_algs = cert_req.supported_signature_algs # try to find one acceptable given all limitations if key_type: self.msg_alg = self._sig_alg_for_certificate( key_type, ok_sig_algs, self.sig_version, key) if self.msg_alg is None: # as an ultimate fallback, when we have no certificate, # private key or CertificateRequest to work with self.msg_alg = (HashAlgorithm.sha1, SignatureAlgorithm.rsa)
[docs] @staticmethod def _normalise_dict(dictionary, max_byte): # python2.6 does not support dict comprehension # pylint: disable=consider-using-dict-comprehension return dict([(min(k, max_byte), v) for k, v in dictionary.items()])
# pylint: enable=consider-using-dict-comprehension
[docs] def _normalise_subs_and_xors(self, max_byte): """ Make sure that the substitutions and xors don't go over the size of buffer, this is fine as ECDSA signatures are ASN.1 objects so have variable size """ if self.padding_subs: self.padding_subs = \ self._normalise_dict(self.padding_subs, max_byte) if self.padding_xors: self.padding_xors = \ self._normalise_dict(self.padding_xors, max_byte)
[docs] def _get_ecdsa_sig_parameters(self): """Set up parameters for sign() operation with ecdsa keys.""" if self.sig_alg: # while the argument is called mgf1_hash in ecdsa # signatures it's used for the derivation of the nonce self.mgf1_hash = HashAlgorithm.toStr(self.sig_alg[0]) else: # in TLS 1.1 and earlier we do simple sha1 signatures self.mgf1_hash = "sha1"
# yes, we're using bad names (for API compat reasons) and we're # accessing private methods (as we intentionally want to break stuff) # so ignore those issues # pylint: disable=invalid-name,protected-access
[docs] def _get_rsa_sig_parameters(self): """Return parameters for sign() operation with rsa keys.""" scheme = SignatureScheme.toRepr(self.sig_alg) hash_name = None if scheme is None: padding = "pkcs1" else: padding = SignatureScheme.getPadding(scheme) if padding == 'pss': hash_name = SignatureScheme.getHash(scheme) if self.rsa_pss_salt_len is None: self.rsa_pss_salt_len = \ getattr(hashlib, hash_name)().digest_size if not self.mgf1_hash: self.mgf1_hash = hash_name def _newRawPrivateKeyOp(self, m, original_raw_private_key_op_bytes, subs=None, xors=None): sign_bytes = m sign_bytes = substitute_and_xor(sign_bytes, subs, xors) m = bytesToNumber(sign_bytes) # RSA operations are defined only on numbers that are # smaller than the modulus, so ensure the XORing or # substitutions # didn't break it (especially necessary for pycrypto as # it raises exception in such case) if m > self.n: m %= self.n return original_raw_private_key_op_bytes( numberToByteArray(m, numBytes(self.n))) old_private_key_op = self.private_key._raw_private_key_op_bytes self.private_key._raw_private_key_op_bytes = \ partial(_newRawPrivateKeyOp, self.private_key, original_raw_private_key_op_bytes=old_private_key_op, subs=self.padding_subs, xors=self.padding_xors) return padding, old_private_key_op
# pylint: enable=invalid-name,protected-access
[docs] def _make_signature(self, status): """Create signature for CertificateVerify message.""" if self.private_key is None: raise ValueError("Can't create a signature without " "private key!") if self.sig_alg and self.sig_alg[1] == SignatureAlgorithm.ecdsa or\ self.private_key.key_type == "ecdsa": signature_type = "ecdsa" elif self.sig_alg and self.sig_alg in ( SignatureScheme.ed25519, SignatureScheme.ed448) or \ self.private_key.key_type in ("Ed25519", "Ed448"): signature_type = "eddsa" elif self.sig_alg and self.sig_alg[1] == SignatureAlgorithm.dsa or \ self.private_key.key_type == "dsa": signature_type = "dsa" else: signature_type = "rsa" if self.context: # if we have context set, it means we're doing post handshake # authentication in TLS 1.3 handshake_hashes = \ status.key['client finished handshake hashes'].copy() for ctx in self.context: handshake_hashes.update(ctx.write()) else: handshake_hashes = status.handshake_hashes verify_bytes = \ KeyExchange.calcVerifyBytes(self.sig_version, handshake_hashes, self.sig_alg, status.key['premaster_secret'], status.client_random, status.server_random, status.prf_name, key_type=self.private_key.key_type) if signature_type == "eddsa": self.mgf1_hash = "intrinsic" self.rsa_pss_salt_len = None padding = None old_private_key_op = None sig_func = self.private_key.hashAndSign elif signature_type == "ecdsa": self._get_ecdsa_sig_parameters() padding = None old_private_key_op = None # truncate the hash so that if we sign big hash with small # curve, the signing is successful verify_bytes = verify_bytes[:self.private_key. private_key.curve.baselen] sig_func = self.private_key.sign elif signature_type == "dsa": padding = None old_private_key_op = None sig_func = self.private_key.sign else: # we don't have to handle non pkcs1 padding because the # calcVerifyBytes does everything padding, old_private_key_op = self._get_rsa_sig_parameters() sig_func = self.private_key.sign try: signature = sig_func(verify_bytes, padding, self.mgf1_hash, self.rsa_pss_salt_len) finally: # make sure the changes are undone even if the signing fails self.private_key._raw_private_key_op_bytes = old_private_key_op if signature_type in ("ecdsa", "dsa"): # because DSA and ECDSA signatures are ANS.1 DER objects, they # can have different lengths depending on the bit size of # "r" and "s" variables # given that indexing would fail if it was asked to index # over an nonexistent byte, we need to limit the numbers signature = bytearray(signature) max_byte = len(signature) - 1 self._normalise_subs_and_xors(max_byte) if signature_type in ("ecdsa", "eddsa", "dsa"): # but EdDSA signatures are always the same length for given # key type, so don't normalise the values for them signature = substitute_and_xor(signature, self.padding_subs, self.padding_xors) return signature
[docs] def generate(self, status): """Create a CertificateVerify message.""" if self.msg_version is None: self.msg_version = status.version if self.sig_version is None: self.sig_version = self.msg_version if self.msg_alg is None and self.msg_version >= (3, 3): self._select_msg_alg(status) if self.sig_alg is None: self.sig_alg = self.msg_alg # TODO: generate a random key if none provided if self.signature is None: signature = self._make_signature(status) else: signature = self.signature cert_verify = CertificateVerify(self.msg_version) cert_verify.create(signature, self.msg_alg) if self.context: self.context.append(cert_verify) self.msg = cert_verify return cert_verify
[docs] class ClearContext(Command): """ Object used to zero-out the context used in PHA. This is necessary if the conversation is executed more than once. """ def __init__(self, context): super(ClearContext, self).__init__() self.context = context
[docs] def process(self, state): """Zero out the associated context""" del state # unused self.context[:] = []
[docs] class ChangeCipherSpecGenerator(MessageGenerator): """ Generator for TLS Change Cipher Spec messages. .. note:: After sending the ChangeCipherSpec message, in TLS 1.2 and earlier, the record layer will switch to encrypted communication (or newly negotiated keys). In TLS 1.3 the message has no effect on encryption or record layer state. """ def __init__(self, extended_master_secret=None, fake=False): """Create an object for generating CCS messages.""" super(ChangeCipherSpecGenerator, self).__init__() self.extended_master_secret = extended_master_secret self.fake = fake
[docs] def generate(self, status): """Create a message for sending to server.""" ccs = ChangeCipherSpec() return ccs
[docs] def post_send(self, status): """Generate new encryption keys for connection.""" # in TLS 1.3 it's a fake message, doesn't cause calculation of new keys if status.version >= (3, 4) or self.fake: return cipher_suite = status.cipher status.msg_sock.encryptThenMAC = status.encrypt_then_mac if self.extended_master_secret is None: self.extended_master_secret = status.extended_master_secret if not status.resuming: if self.extended_master_secret: # in case client certificates are used, we need to omit # certificate verify message hh = status.certificate_verify_handshake_hashes if not hh: hh = status.handshake_hashes master_secret = \ calcExtendedMasterSecret(status.version, cipher_suite, status.key['premaster_secret'], hh) else: master_secret = calc_key( status.version, status.key['premaster_secret'], cipher_suite, b'master secret', client_random=status.client_random, server_random=status.server_random, output_length=48) status.key['master_secret'] = master_secret # in case of resumption, the pending states are generated # during receive of server sent CCS calc_pending_states(status) status.msg_sock.changeWriteState() if status._peer_record_size_limit: status.msg_sock.send_record_limit = \ status._peer_record_size_limit status.msg_sock.recordSize = status._peer_record_size_limit
[docs] class FinishedGenerator(HandshakeProtocolMessageGenerator): """ Generator for TLS handshake protocol Finished messages. .. note:: The FinishedGenerator may influence the record layer encryption. In SSLv2, the record layer will be configured to expect encrypted records and send encrypted records *before* the message is sent. In SSLv3 up to TLS 1.2 the message has no impact on state of encryption. In TLS 1.3, *after* the message is sent, the record layer will be switched to use ``client_application_traffic_secret`` keys for *sending*. """ def __init__(self, protocol=None, trunc_start=0, trunc_end=None, pad_byte=0, pad_left=0, pad_right=0, context=None): """Object to generate Finished messages.""" super(FinishedGenerator, self).__init__() self.protocol = protocol self.server_finish_hh = None self.trunc_start = trunc_start self.trunc_end = trunc_end self.pad_byte = pad_byte self.pad_left = pad_left self.pad_right = pad_right self.context = context
[docs] def generate(self, status): """Create a Finished message.""" if self.protocol is None: self.protocol = status.version if self.protocol in ((0, 2), (2, 0)): finished = ClientFinished() verify_data = status.session_id # in SSLv2 we're using it as a CCS-of-sorts too status.msg_sock.changeWriteState() status.msg_sock.changeReadState() elif self.protocol <= (3, 3): finished = Finished(self.protocol) verify_data = calc_key(status.version, status.key['master_secret'], status.cipher, b'client finished' if status.client else b'server finished', status.handshake_hashes, output_length=12) else: # TLS 1.3 finished = Finished(self.protocol, status.prf_size) if self.context: # post-handshake authentication in TLS 1.3 base_key = status.key['client application traffic secret'] else: base_key = status.key['client handshake traffic secret'] finished_key = HKDF_expand_label( base_key, b'finished', b'', status.prf_size, status.prf_name) if self.context: # post-handshake authentication in TLS 1.3 self.server_finish_hh = \ status.key['client finished handshake hashes'].copy() for ctx in self.context: self.server_finish_hh.update(ctx.write()) else: self.server_finish_hh = status.handshake_hashes.copy() verify_data = secureHMAC( finished_key, self.server_finish_hh.digest(status.prf_name), status.prf_name) # messing with the message - truncation verify_data = verify_data[self.trunc_start:self.trunc_end] # messing with the message - padding verify_data = bytearray([self.pad_byte]*self.pad_left) \ + verify_data \ + bytearray([self.pad_byte]*self.pad_right) status.key['client_verify_data'] = verify_data finished.create(verify_data) self.msg = finished return finished
[docs] def post_send(self, status): """Perform post-transmit changes needed by generation of Finished.""" super(FinishedGenerator, self).post_send(status) if self.context: return # resumption finished status.resuming = False if status.version <= (3, 3): return # Switch to application traffic secret for writing. # For reading we switched with the server Finished. status.msg_sock.changeWriteState() # derive resumption master secret key secret = status.key['master secret'] res_ms = derive_secret(secret, b'res master', status.handshake_hashes, status.prf_name) status.key['resumption master secret'] = res_ms # preserve the hash state for post-handshake authentication status.key['client finished handshake hashes'] = \ status.handshake_hashes.copy()
[docs] class AlertGenerator(MessageGenerator): """Generator for TLS Alert messages.""" def __init__(self, level=AlertLevel.warning, description=AlertDescription.close_notify): """Save the level and description of the Alert to send.""" super(AlertGenerator, self).__init__() self.level = level self.description = description
[docs] def generate(self, status): """Send the Alert to server.""" alert = Alert().create(self.description, self.level) return alert
[docs] class ApplicationDataGenerator(MessageGenerator): """Generator for TLS Application Data messages.""" def __init__(self, payload): """Save the data to send to server.""" super(ApplicationDataGenerator, self).__init__() self.payload = payload
[docs] def generate(self, status): """Send data to server in Application Data messages.""" app_data = ApplicationData().create(self.payload) return app_data
[docs] class KeyUpdateGenerator(MessageGenerator): """Generator for TLS 1.3 KeyUpdate message.""" def __init__(self, message_type=0): """Save the type of the KeyUpdate message.""" super(KeyUpdateGenerator, self).__init__() self.message_type = message_type
[docs] def generate(self, state): """Generate a KeyUpdate message.""" del state # needed only for API compatibility key_update = KeyUpdate().create(self.message_type) return key_update
[docs] def post_send(self, state): """Perform post-transmit changes needed by generation of KeyUpdate.""" super(KeyUpdateGenerator, self).post_send(state) cl_app_secret, _ = state.msg_sock.\ calcTLS1_3KeyUpdate_reciever( state.cipher, state.key['client application traffic secret'], state.key['server application traffic secret']) state.key['client application traffic secret'] = cl_app_secret
[docs] class HeartbeatGenerator(MessageGenerator): """ Generator for heartbeat messages. :ivar message_type: the type of the message to send, see :py:class:`HeartbeatMessageType` enum for values :vartype message_type: int :ivar payload: data to be sent to the other size for it to echo it back :vartype payload: bytearray :ivar padding: payload to be sent to the other side, it should be at least 16 bytes long for the message to be valid :vartype padding: bytearray """ def __init__(self, payload, message_type=HeartbeatMessageType.heartbeat_request, padding_length=None): """ Initialise and create instance of object. :type payload: bytes-like :param payload: payload to send to the other side; either a reply to received heartbeat request or a value that the other side will have to echo :type message_type: int :param message_type: the type of message to send, valid values are defined in :py:class:`HeartbeatMessageType` enum :type padding_length: int :param padding_length: the length (in bytes) of the random padding that will be generated; 16 by default (if special contents of padding are necessary, it's possible to set the :py:attr:`~HeartbeatGenerator.padding` field in the object's instance after the object was initialised) """ super(HeartbeatGenerator, self).__init__() self.message_type = message_type self.payload = payload if padding_length is None: padding_length = 16 self.padding = getRandomBytes(padding_length)
[docs] def generate(self, state): """ Create a Heartbeat message. :rtype: `~tlslite.messages.Heartbeat` :return: heartbeat message to be sent to the other side """ del state heartbeat = Heartbeat() heartbeat.message_type = self.message_type heartbeat.payload = self.payload heartbeat.padding = self.padding return heartbeat
[docs] def pad_handshake(generator, size=0, pad_byte=0, pad=None): """ Pad or truncate handshake messages. Pad or truncate a handshake message by given amount of bytes, use negative size to truncate. Update handshake protocol header to compensate. :param MessageGenerator generator: modified message :param int size: number of bytes to add at the end (if positive) or number of bytes to remove at the end of payload (if negative) :param int pad_byte: numerical value of added bytes, must be between 0 and 255 inclusive :param bytearray pad: bytes to add at the end of payload """ def new_generate(state, old_generate=generator.generate): """Monkey patch for the generate method of the Handshake generators.""" msg = old_generate(state) def post_write(writer, self=msg, size=size, pad_byte=pad_byte, pad=pad): """Monkey patch for the postWrite of handshake messages.""" if pad is not None: size = len(pad) header_writer = Writer() header_writer.add(self.handshakeType, 1) header_writer.add(len(writer.bytes) + size, 3) if pad is not None: return header_writer.bytes + writer.bytes + pad elif size < 0: return header_writer.bytes + writer.bytes[:size] else: return header_writer.bytes + writer.bytes + \ bytearray([pad_byte]*size) msg.postWrite = post_write return msg generator.generate = new_generate return generator
[docs] def truncate_handshake(generator, size=0, pad_byte=0): """ Truncate a handshake message. See :py:func:`pad_handshake` for inverse of this function """ return pad_handshake(generator, -size, pad_byte)
[docs] def _apply_function(data, settings, fun): """Modify data based on settings and function fun.""" for pos in settings: if settings[pos] == -1: data[pos] = fun(data[pos], random.randint(0, 255)) elif settings[pos] == -2: data[pos] = fun(data[pos], random.randint(1, 255)) else: assert settings[pos] >= 0 data[pos] = fun(data[pos], settings[pos])
[docs] def substitute_and_xor(data, substitutions, xors): """ Apply changes from substitutions and xors to data for fuzzing. (Method used internally by tlsfuzzer.) """ if substitutions: _apply_function(data, substitutions, lambda a, b: b) if xors: _apply_function(data, xors, lambda a, b: a ^ b) return data
[docs] def fuzz_message(generator, substitutions=None, xors=None): """ Change arbitrary bytes of the message after write. Modified data includes handshake protocol header but doesn't include record header, content type or record-level padding. :param MessageGenerator generator: modified message :param dict(int,int) substitutions: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to change the bytes to :param dict(int,int) xors: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to xor with """ def new_generate(state, old_generate=generator.generate): """Monkey patch for the generate method of the Handshake generators.""" msg = old_generate(state) def new_write(old_write=msg.write, substitutions=substitutions, xors=xors): """Monkey patch for the write method of messages.""" data = old_write() data = substitute_and_xor(data, substitutions, xors) return data msg.write = new_write return msg generator.generate = new_generate return generator
[docs] def post_send_msg_sock_restore(obj, method_name, old_method_name): """ Un-Monkey patch a method of msg_sock. (Method used internally by tlsfuzzer.) """ def new_post_send(state, obj=obj, method_name=method_name, old_method_name=old_method_name, old_post_send=obj.post_send): """Reverse the patching of a method in msg_sock.""" setattr(state.msg_sock, method_name, getattr(obj, old_method_name)) old_post_send(state) obj.post_send = new_post_send return obj
[docs] def fuzz_mac(generator, substitutions=None, xors=None): """ Change arbitrary bytes of the MAC value. Works with stream and CBC cipher suites in SSL 3 up to TLS 1.2. Works with both encrypt then MAC and MAC then encrypt connections. :param MessageGenerator generator: modified message :param dict(int,int) substitutions: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to change the bytes to :param dict(int,int) xors: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to xor with """ def new_generate(state, self=generator, old_generate=generator.generate, substitutions=substitutions, xors=xors): """Monkey patch to modify MAC calculation of created MAC.""" msg = old_generate(state) old_calculate_mac = state.msg_sock.calculateMAC self.old_calculate_mac = old_calculate_mac def new_calculate_mac(mac, seqnumBytes, contentType, data, old_calculate_mac=old_calculate_mac, substitutions=substitutions, xors=xors): """Monkey patch for the MAC calculation method of msg socket.""" mac_bytes = old_calculate_mac(mac, seqnumBytes, contentType, data) mac_bytes = substitute_and_xor(mac_bytes, substitutions, xors) return mac_bytes state.msg_sock.calculateMAC = new_calculate_mac return msg generator.generate = new_generate post_send_msg_sock_restore(generator, 'calculateMAC', 'old_calculate_mac') return generator
[docs] def fuzz_encrypted_message(generator, substitutions=None, xors=None): """ Change arbitrary bytes of the authenticated ciphertext block. Can modify authentication tag of AEAD ciphers and CBC ciphers working in encrypt then MAC mode. :param MessageGenerator generator: modified message :param dict(int,int) substitutions: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to change the bytes to :param dict(int,int) xors: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to xor with """ def new_generate(state, self=generator, old_generate=generator.generate, substitutions=substitutions, xors=xors): """Monkey patch to modify authenticated ciphertext block.""" msg = old_generate(state) old_send = state.msg_sock._recordSocket.send self.old_send = old_send def new_send(message, padding, old_send=old_send, substitutions=substitutions, xors=xors): """ Monkey patch for the send method of msg socket. message.data is the encrypted tls record, e.g. message.data = aead_encrypt(plain); defined by aead suite message.data = encrypt(plain + padding) + mac; etm message.data = encrypt(plain + mac + padding); mte """ data = message.write() data = substitute_and_xor(data, substitutions, xors) new_message = Message(message.contentType, data) return old_send(new_message, padding) state.msg_sock._recordSocket.send = new_send return msg generator.generate = new_generate post_send_msg_sock_restore(generator, '._recordSocket.send', 'old_send') return generator
[docs] def div_ceil(divident, divisor): """Perform integer division of divident by divisor, round up.""" quotient, reminder = divmod(divident, divisor) return quotient + int(bool(reminder))
[docs] def fuzz_padding(generator, min_length=None, substitutions=None, xors=None): """ Change the padding of the message. Works with CBC ciphers only. Note: the "-1" position is the byte with the length of padding while "-2" is the last byte of padding (if padding has non-zero length) :param MessageGenerator generator: modified message :param int min_length: the minimum length of padding created, including the byte specifying length of padding, must be smaller than 257 :param dict(int,int) substitutions: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to change the bytes to :param dict(int,int) xors: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to xor with """ if min_length is not None and min_length > 256: raise ValueError("Padding cannot be longer than 256 bytes") def new_generate(state, self=generator, old_generate=generator.generate, substitutions=substitutions, xors=xors): """Monkey patch to modify padding behaviour.""" msg = old_generate(state) self.old_add_padding = state.msg_sock.addPadding def new_add_padding(data, self=state.msg_sock, old_add_padding=self.old_add_padding, substitutions=substitutions, xors=xors): """Monkey patch the padding creating method.""" if min_length is None: # make a copy of data as we need it unmodified later padded_data = old_add_padding(bytearray(data)) padding_length = padded_data[-1] padding = padded_data[-(padding_length+1):] else: block_size = self.blockSize padding_length = div_ceil(len(data) + min_length, block_size) * block_size - len(data) if padding_length > 256: raise ValueError("min_length set too " "high for message: {0}" .format(padding_length)) padding = bytearray([padding_length - 1] * (padding_length)) padding = substitute_and_xor(padding, substitutions, xors) return data + padding state.msg_sock.addPadding = new_add_padding return msg generator.generate = new_generate post_send_msg_sock_restore(generator, 'addPadding', 'old_add_padding') return generator
[docs] def replace_plaintext(generator, new_plaintext): """ Change the plaintext of the message right before encryption. Will replace all data before encryption, including the IV, MAC and padding. Note: works only with CBC ciphers. in EtM mode will NOT modify MAC. Length of new_plaintext must be multiple of negotiated cipher block size (8 bytes for 3DES, 16 bytes for AES) """ def new_generate(state, self=generator, old_generate=generator.generate, new_plaintext=new_plaintext): """Monkey patch to modify padding behaviour.""" msg = old_generate(state) self.old_add_padding = state.msg_sock.addPadding def new_add_padding(data, old_add_padding=self.old_add_padding, self=state.msg_sock, new_plaintext=new_plaintext): """Monkey patch the padding creating method.""" del data del old_add_padding block_size = self.blockSize if len(new_plaintext) % block_size: raise ValueError("new_plaintext length not a multiple of " "cipher block size") return new_plaintext state.msg_sock.addPadding = new_add_padding return msg generator.generate = new_generate post_send_msg_sock_restore(generator, 'addPadding', 'old_add_padding') return generator
[docs] def fuzz_plaintext(generator, substitutions=None, xors=None): """ Change arbitrary bytes of the plaintext right before encryption. Get access to all data before encryption, including the IV, MAC and padding. Works only with CBC ciphers. in EtM mode will not include MAC. Note: the "-1" position is the byte with length of padding while "-2" is the last byte of padding (if padding has non-zero length) :param dict(int,int) substitutions: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to change the bytes to :param dict(int,int) xors: modify specified bytes of the message, the keys indicate the positions in the message (negative numbers count from the end of messages), the values of the dictionary specify the values to xor with """ def new_generate(state, self=generator, old_generate=generator.generate, substitutions=substitutions, xors=xors): """Monkey patch to modify padding behaviour.""" msg = old_generate(state) self.old_add_padding = state.msg_sock.addPadding def new_add_padding(data, old_add_padding=self.old_add_padding, substitutions=substitutions, xors=xors): """Monkey patch the padding creating method.""" data = old_add_padding(data) data = substitute_and_xor(data, substitutions, xors) return data state.msg_sock.addPadding = new_add_padding return msg generator.generate = new_generate post_send_msg_sock_restore(generator, 'addPadding', 'old_add_padding') return generator
[docs] def split_message(generator, fragment_list, size): """ Split a given message type to multiple messages. Allows for splicing message into the middle of a different message type """ def new_generate(state, old_generate=generator.generate, fragment_list=fragment_list, size=size): """Monkey patch for the generate method of the message generator.""" msg = old_generate(state) content_type = msg.contentType data = msg.write() # since empty messages can be created much more easily with # RawMessageGenerator, we don't handle 0 length messages here if not data: raise IndexError("Empty message payload") while len(data) > 0: # move the data to fragment_list (outside the method) fragment_list.append(Message(content_type, data[:size])) data = data[size:] fragment_list.append(generator.post_send) # make sure that the effect of the message is visible only once # all parts of the message have been sent generator.post_send = lambda x: None return fragment_list.pop(0) generator.generate = new_generate return generator
[docs] def queue_message(generator): """Queue message with other ones of the same content type. Allow coalescing of the message with other messages with the same content type, this allows for sending Certificate and CertificateVerify in a single record. """ assert generator.is_generator() generator.queue = True return generator
[docs] class FlushMessageQueue(Command): """Flush the record layer queue of messages.""" def __init__(self, description=None): super(FlushMessageQueue, self).__init__() self.description = description def __repr__(self): vals = [] if self.description: vals.append(('description', repr(self.description))) return 'FlushMessageQueue({0})'.format( ', '.join("{0}={1}".format(i[0], i[1]) for i in vals))
[docs] def process(self, state): state.msg_sock.flushBlocking()
[docs] class PopMessageFromList(MessageGenerator): """Takes a reference to list, pops a message from it to generate one.""" def __init__(self, fragment_list): """Link a list to store messages with the object.""" super(PopMessageFromList, self).__init__() self.fragment_list = fragment_list
[docs] def generate(self, state): """Create a message using the reference to list from init.""" msg = self.fragment_list.pop(0) return msg
[docs] def post_send(self, state): super(PopMessageFromList, self).post_send(state) if self.fragment_list and callable(self.fragment_list[0]): func = self.fragment_list.pop(0) func(state)
[docs] class FlushMessageList(PopMessageFromList): """Takes a reference to list, empties it to generate a message."""
[docs] def generate(self, state): """Creata a single message to empty the list.""" msg = self.fragment_list.pop(0) content_type = msg.contentType data = msg.write() while self.fragment_list and not callable(self.fragment_list[0]): msg_frag = self.fragment_list.pop(0) assert msg_frag.contentType == content_type data += msg_frag.write() msg_ret = Message(content_type, data) return msg_ret
[docs] def fuzz_pkcs1_padding(key, substitutions=None, xors=None, padding_byte=None): """ Fuzz the PKCS#1 padding used in signatures or encryption. Use to modify Client Key Exchange padding of encrypted value. """ if xors is None and substitutions is None and padding_byte is None: return key def new_addPKCS1Padding(bytes, blockType, self=key, old_add_padding=key._addPKCS1Padding, substitutions=substitutions, xors=xors, padding_byte=padding_byte): """Monkey patch for the _addPKCS1Padding() method of RSA key.""" ret = old_add_padding(bytes, blockType) pad_length = numBytes(self.n) - len(bytes) pad = ret[:pad_length] value = ret[pad_length:] if padding_byte is not None: # don't change version, type or payload separator for i in range(2, pad_length-1): pad[i] = padding_byte pad = substitute_and_xor(pad, substitutions, xors) return pad + value key._addPKCS1Padding = new_addPKCS1Padding return key