1#!/usr/bin/python3 2 3# Copyright (C) Internet Systems Consortium, Inc. ("ISC") 4# 5# SPDX-License-Identifier: MPL-2.0 6# 7# This Source Code Form is subject to the terms of the Mozilla Public 8# License, v. 2.0. If a copy of the MPL was not distributed with this 9# file, you can obtain one at https://mozilla.org/MPL/2.0/. 10# 11# See the COPYRIGHT file distributed with this work for additional 12# information regarding copyright ownership. 13 14import selectors 15import struct 16import subprocess 17import time 18 19import pytest 20 21pytest.importorskip("dns") 22import dns.exception 23import dns.message 24import dns.name 25import dns.rdataclass 26import dns.rdatatype 27 28 29def test_gnutls_cli_query(gnutls_cli_executable, named_tlsport): 30 # Prepare the example/SOA query which will be sent over TLS. 31 query = dns.message.make_query("example.", dns.rdatatype.SOA) 32 query_wire = query.to_wire() 33 query_with_length = struct.pack(">H", len(query_wire)) + query_wire 34 35 # Run gnutls-cli. 36 gnutls_cli_args = [ 37 gnutls_cli_executable, 38 "--no-ca-verification", 39 "-V", 40 "--no-ocsp", 41 "--alpn=dot", 42 "--logfile=gnutls-cli.log", 43 "--port=%d" % named_tlsport, 44 "10.53.0.1", 45 ] 46 with open("gnutls-cli.err", "wb") as gnutls_cli_stderr, subprocess.Popen( 47 gnutls_cli_args, 48 stdin=subprocess.PIPE, 49 stdout=subprocess.PIPE, 50 stderr=gnutls_cli_stderr, 51 bufsize=0, 52 ) as gnutls_cli: 53 # Send the example/SOA query to the standard input of gnutls-cli. Do 54 # not close standard input yet because that causes gnutls-cli to close 55 # the TLS connection immediately, preventing the response from being 56 # read. 57 gnutls_cli.stdin.write(query_with_length) 58 gnutls_cli.stdin.flush() 59 60 # Keep reading data from the standard output of gnutls-cli until a full 61 # DNS message is received or a timeout is exceeded or gnutls-cli exits. 62 # Popen.communicate() cannot be used here because: a) it closes 63 # standard input after sending data to the process (see above why this 64 # is a problem), b) gnutls-cli is not DNS-aware, so it does not exit 65 # upon receiving a DNS response. 66 selector = selectors.DefaultSelector() 67 selector.register(gnutls_cli.stdout, selectors.EVENT_READ) 68 deadline = time.time() + 10 69 gnutls_cli_output = b"" 70 response = b"" 71 while not response and not gnutls_cli.poll(): 72 if not selector.select(timeout=deadline - time.time()): 73 break 74 gnutls_cli_output += gnutls_cli.stdout.read(512) 75 try: 76 # Ignore TCP length, just try to parse a DNS message from 77 # the rest of the data received. 78 response = dns.message.from_wire(gnutls_cli_output[2:]) 79 except dns.exception.FormError: 80 continue 81 82 # At this point either a DNS response was received or a timeout fired 83 # or gnutls-cli exited prematurely. Close the standard input of 84 # gnutls-cli. Terminate it if that does not cause it to shut down 85 # gracefully. 86 gnutls_cli.stdin.close() 87 try: 88 gnutls_cli.wait(5) 89 except subprocess.TimeoutExpired: 90 gnutls_cli.kill() 91 92 # Store the response received for diagnostic purposes. 93 with open("gnutls-cli.out.bin", "wb") as response_bin: 94 response_bin.write(gnutls_cli_output) 95 if response: 96 with open("gnutls-cli.out.txt", "w", encoding="utf-8") as response_txt: 97 response_txt.write(response.to_text()) 98 99 # Check whether a response was received and whether it is sane. 100 assert response 101 assert query.id == response.id 102 assert len(response.answer) == 1 103 assert response.answer[0].match( 104 dns.name.from_text("example."), 105 dns.rdataclass.IN, 106 dns.rdatatype.SOA, 107 dns.rdatatype.NONE, 108 ) 109