3# Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5# SPDX-License-Identifier: MPL-2.0
7# This Source Code Form is subject to the terms of the Mozilla Public
8# License, v. 2.0.  If a copy of the MPL was not distributed with this
9# file, you can obtain one at https://mozilla.org/MPL/2.0/.
11# See the COPYRIGHT file distributed with this work for additional
12# information regarding copyright ownership.
14import selectors
15import struct
16import subprocess
17import time
19import pytest
22import dns.exception
23import dns.message
24import dns.name
25import dns.rdataclass
26import dns.rdatatype
29def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport):
30    # Prepare the example/SOA query which will be sent over TLS.
31    query = dns.message.make_query("example.", dns.rdatatype.SOA)
32    query_wire = query.to_wire()
33    query_with_length = struct.pack(">H", len(query_wire)) + query_wire
35    # Run gnutls-cli.
36    gnutls_cli_args = [
37        gnutls_cli_executable,
38        "--no-ca-verification",
39        "-V",
40        "--no-ocsp",
41        "--alpn=dot",
42        "--logfile=gnutls-cli.log",
43        "--port=%d" % named_tlsport,
44        "",
45    ]
46    with open("gnutls-cli.err", "wb") as gnutls_cli_stderr, subprocess.Popen(
47        gnutls_cli_args,
48        stdin=subprocess.PIPE,
49        stdout=subprocess.PIPE,
50        stderr=gnutls_cli_stderr,
51        bufsize=0,
52    ) as gnutls_cli:
53        # Send the example/SOA query to the standard input of gnutls-cli.  Do
54        # not close standard input yet because that causes gnutls-cli to close
55        # the TLS connection immediately, preventing the response from being
56        # read.
57        gnutls_cli.stdin.write(query_with_length)
58        gnutls_cli.stdin.flush()
60        # Keep reading data from the standard output of gnutls-cli until a full
61        # DNS message is received or a timeout is exceeded or gnutls-cli exits.
62        # Popen.communicate() cannot be used here because: a) it closes
63        # standard input after sending data to the process (see above why this
64        # is a problem), b) gnutls-cli is not DNS-aware, so it does not exit
65        # upon receiving a DNS response.
66        selector = selectors.DefaultSelector()
67        selector.register(gnutls_cli.stdout, selectors.EVENT_READ)
68        deadline = time.time() + 10
69        gnutls_cli_output = b""
70        response = b""
71        while not response and not gnutls_cli.poll():
72            if not selector.select(timeout=deadline - time.time()):
73                break
74            gnutls_cli_output += gnutls_cli.stdout.read(512)
75            try:
76                # Ignore TCP length, just try to parse a DNS message from
77                # the rest of the data received.
78                response = dns.message.from_wire(gnutls_cli_output[2:])
79            except dns.exception.FormError:
80                continue
82        # At this point either a DNS response was received or a timeout fired
83        # or gnutls-cli exited prematurely.  Close the standard input of
84        # gnutls-cli.  Terminate it if that does not cause it to shut down
85        # gracefully.
86        gnutls_cli.stdin.close()
87        try:
88            gnutls_cli.wait(5)
89        except subprocess.TimeoutExpired:
90            gnutls_cli.kill()
92    # Store the response received for diagnostic purposes.
93    with open("gnutls-cli.out.bin", "wb") as response_bin:
94        response_bin.write(gnutls_cli_output)
95    if response:
96        with open("gnutls-cli.out.txt", "w", encoding="utf-8") as response_txt:
97            response_txt.write(response.to_text())
99    # Check whether a response was received and whether it is sane.
100    assert response
101    assert query.id == response.id
102    assert len(response.answer) == 1
103    assert response.answer[0].match(
104        dns.name.from_text("example."),
105        dns.rdataclass.IN,
106        dns.rdatatype.SOA,
107        dns.rdatatype.NONE,
108    )