1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: BSD-2-Clause 4# 5# Copyright (c) 2017 Kristof Provost <kp@FreeBSD.org> 6# 7# Redistribution and use in source and binary forms, with or without 8# modification, are permitted provided that the following conditions 9# are met: 10# 1. Redistributions of source code must retain the above copyright 11# notice, this list of conditions and the following disclaimer. 12# 2. Redistributions in binary form must reproduce the above copyright 13# notice, this list of conditions and the following disclaimer in the 14# documentation and/or other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26# SUCH DAMAGE. 27# 28 29import argparse 30import logging 31logging.getLogger("scapy").setLevel(logging.CRITICAL) 32import scapy.all as sp 33import socket 34import sys 35from sniffer import Sniffer 36 37PAYLOAD_MAGIC = bytes.fromhex('42c0ffee') 38 39dup_found = 0 40 41def check_dup(args, packet): 42 """ 43 Verify that this is an ICMP packet, and that we only see one 44 """ 45 global dup_found 46 47 icmp = packet.getlayer(sp.ICMP) 48 if not icmp: 49 return False 50 51 raw = packet.getlayer(sp.Raw) 52 if not raw: 53 return False 54 if raw.load != PAYLOAD_MAGIC: 55 return False 56 57 dup_found = dup_found + 1 58 return False 59 60def check_ping_request(args, packet): 61 if args.ip6: 62 return check_ping6_request(args, packet) 63 else: 64 return check_ping4_request(args, packet) 65 66def check_ping4_request(args, packet): 67 """ 68 Verify that the packet matches what we'd have sent 69 """ 70 dst_ip = args.to[0] 71 72 ip = packet.getlayer(sp.IP) 73 if not ip: 74 return False 75 if ip.dst != dst_ip: 76 return False 77 78 icmp = packet.getlayer(sp.ICMP) 79 if not icmp: 80 return False 81 if sp.icmptypes[icmp.type] != 'echo-request': 82 return False 83 84 raw = packet.getlayer(sp.Raw) 85 if not raw: 86 return False 87 if raw.load != PAYLOAD_MAGIC: 88 return False 89 90 # Wait to check expectations until we've established this is the packet we 91 # sent. 92 if args.expect_tos: 93 if ip.tos != int(args.expect_tos[0]): 94 print("Unexpected ToS value %d, expected %d" \ 95 % (ip.tos, int(args.expect_tos[0]))) 96 return False 97 98 return True 99 100def check_ping6_request(args, packet): 101 """ 102 Verify that the packet matches what we'd have sent 103 """ 104 dst_ip = args.to[0] 105 106 ip = packet.getlayer(sp.IPv6) 107 if not ip: 108 return False 109 if ip.dst != dst_ip: 110 return False 111 112 icmp = packet.getlayer(sp.ICMPv6EchoRequest) 113 if not icmp: 114 return False 115 if icmp.data != PAYLOAD_MAGIC: 116 return False 117 118 return True 119 120def check_ping_reply(args, packet): 121 return check_ping4_reply(args, packet) 122 123def check_ping4_reply(args, packet): 124 """ 125 Check that this is a reply to the ping request we sent 126 """ 127 dst_ip = args.to[0] 128 129 ip = packet.getlayer(sp.IP) 130 if not ip: 131 return False 132 if ip.src != dst_ip: 133 return False 134 135 icmp = packet.getlayer(sp.ICMP) 136 if not icmp: 137 return False 138 if sp.icmptypes[icmp.type] != 'echo-reply': 139 return False 140 141 raw = packet.getlayer(sp.Raw) 142 if not raw: 143 return False 144 if raw.load != PAYLOAD_MAGIC: 145 return False 146 147 return True 148 149def ping(send_if, dst_ip, args): 150 ether = sp.Ether() 151 ip = sp.IP(dst=dst_ip) 152 icmp = sp.ICMP(type='echo-request') 153 raw = sp.raw(PAYLOAD_MAGIC) 154 155 if args.send_tos: 156 ip.tos = int(args.send_tos[0]) 157 158 if args.fromaddr: 159 ip.src = args.fromaddr[0] 160 161 req = ether / ip / icmp / raw 162 sp.sendp(req, iface=send_if, verbose=False) 163 164def ping6(send_if, dst_ip, args): 165 ether = sp.Ether() 166 ip6 = sp.IPv6(dst=dst_ip) 167 icmp = sp.ICMPv6EchoRequest(data=sp.raw(PAYLOAD_MAGIC)) 168 169 if args.fromaddr: 170 ip.src = args.fromaddr[0] 171 172 req = ether / ip6 / icmp 173 sp.sendp(req, iface=send_if, verbose=False) 174 175def check_tcpsyn(args, packet): 176 dst_ip = args.to[0] 177 178 ip = packet.getlayer(sp.IP) 179 if not ip: 180 return False 181 if ip.dst != dst_ip: 182 return False 183 184 tcp = packet.getlayer(sp.TCP) 185 if not tcp: 186 return False 187 188 # Verify IP checksum 189 chksum = ip.chksum 190 ip.chksum = None 191 new_chksum = sp.IP(sp.raw(ip)).chksum 192 if chksum != new_chksum: 193 print("Expected IP checksum %x but found %x\n" % (new_cshkum, chksum)) 194 return False 195 196 # Verify TCP checksum 197 chksum = tcp.chksum 198 packet_raw = sp.raw(packet) 199 tcp.chksum = None 200 newpacket = sp.Ether(sp.raw(packet[sp.Ether])) 201 new_chksum = newpacket[sp.TCP].chksum 202 if chksum != new_chksum: 203 print("Expected TCP checksum %x but found %x\n" % (new_chksum, chksum)) 204 return False 205 206 return True 207 208def tcpsyn(send_if, dst_ip, args): 209 opts=[('Timestamp', (1, 1)), ('MSS', 1280)] 210 211 if args.tcpopt_unaligned: 212 opts = [('NOP', 0 )] + opts 213 214 ether = sp.Ether() 215 ip = sp.IP(dst=dst_ip) 216 tcp = sp.TCP(dport=666, flags='S', options=opts) 217 218 req = ether / ip / tcp 219 sp.sendp(req, iface=send_if, verbose=False) 220 221 222def main(): 223 parser = argparse.ArgumentParser("pft_ping.py", 224 description="Ping test tool") 225 parser.add_argument('--sendif', nargs=1, 226 required=True, 227 help='The interface through which the packet(s) will be sent') 228 parser.add_argument('--recvif', nargs=1, 229 help='The interface on which to expect the ICMP echo request') 230 parser.add_argument('--replyif', nargs=1, 231 help='The interface on which to expect the ICMP echo response') 232 parser.add_argument('--checkdup', nargs=1, 233 help='The interface on which to expect the duplicated ICMP packets') 234 parser.add_argument('--ip6', action='store_true', 235 help='Use IPv6') 236 parser.add_argument('--to', nargs=1, 237 required=True, 238 help='The destination IP address for the ICMP echo request') 239 parser.add_argument('--fromaddr', nargs=1, 240 help='The source IP address for the ICMP echo request') 241 242 # TCP options 243 parser.add_argument('--tcpsyn', action='store_true', 244 help='Send a TCP SYN packet') 245 parser.add_argument('--tcpopt_unaligned', action='store_true', 246 help='Include unaligned TCP options') 247 248 # Packet settings 249 parser.add_argument('--send-tos', nargs=1, 250 help='Set the ToS value for the transmitted packet') 251 252 # Expectations 253 parser.add_argument('--expect-tos', nargs=1, 254 help='The expected ToS value in the received packet') 255 256 args = parser.parse_args() 257 258 # We may not have a default route. Tell scapy where to start looking for routes 259 sp.conf.iface6 = args.sendif[0] 260 261 sniffer = None 262 if not args.recvif is None: 263 checkfn=check_ping_request 264 if args.tcpsyn: 265 checkfn=check_tcpsyn 266 267 sniffer = Sniffer(args, checkfn) 268 269 replysniffer = None 270 if not args.replyif is None: 271 checkfn=check_ping_reply 272 replysniffer = Sniffer(args, checkfn, recvif=args.replyif[0]) 273 274 dupsniffer = None 275 if args.checkdup is not None: 276 dupsniffer = Sniffer(args, check_dup, recvif=args.checkdup[0]) 277 278 if args.tcpsyn: 279 tcpsyn(args.sendif[0], args.to[0], args) 280 else: 281 if args.ip6: 282 ping6(args.sendif[0], args.to[0], args) 283 else: 284 ping(args.sendif[0], args.to[0], args) 285 286 if dupsniffer: 287 dupsniffer.join() 288 if dup_found != 1: 289 sys.exit(1) 290 291 if sniffer: 292 sniffer.join() 293 294 if sniffer.foundCorrectPacket: 295 sys.exit(0) 296 else: 297 sys.exit(1) 298 299 if replysniffer: 300 replysniffer.join() 301 302 if replysniffer.foundCorrectPacket: 303 sys.exit(0) 304 else: 305 sys.exit(1) 306 307if __name__ == '__main__': 308 main() 309