1# $FreeBSD$ 2# $Id: dpkt.py 114 2005-09-11 15:15:12Z dugsong $ 3 4"""fast, simple packet creation / parsing, with definitions for the 5basic TCP/IP protocols. 6""" 7 8__author__ = 'Dug Song <dugsong@monkey.org>' 9__copyright__ = 'Copyright (c) 2004 Dug Song' 10__license__ = 'BSD' 11__url__ = 'http://monkey.org/~dugsong/dpkt/' 12__version__ = '1.2' 13 14try: 15 from itertools import izip as _it_izip 16except ImportError: 17 _it_izip = zip 18 19from struct import calcsize as _st_calcsize, \ 20 pack as _st_pack, unpack as _st_unpack, error as _st_error 21from re import compile as _re_compile 22 23intchr = _re_compile(r"(?P<int>[0-9]+)(?P<chr>.)") 24 25class MetaPacket(type): 26 def __new__(cls, clsname, clsbases, clsdict): 27 if '__hdr__' in clsdict: 28 st = clsdict['__hdr__'] 29 clsdict['__hdr_fields__'] = [ x[0] for x in st ] 30 clsdict['__hdr_fmt__'] = clsdict.get('__byte_order__', '>') + \ 31 ''.join([ x[1] for x in st ]) 32 clsdict['__hdr_len__'] = _st_calcsize(clsdict['__hdr_fmt__']) 33 clsdict['__hdr_defaults__'] = \ 34 dict(zip(clsdict['__hdr_fields__'], [ x[2] for x in st ])) 35 clsdict['__slots__'] = clsdict['__hdr_fields__'] 36 return type.__new__(cls, clsname, clsbases, clsdict) 37 38class Packet(object): 39 """Packet class 40 41 __hdr__ should be defined as a list of (name, structfmt, default) tuples 42 __byte_order__ can be set to override the default ('>') 43 """ 44 __metaclass__ = MetaPacket 45 data = '' 46 47 def __init__(self, *args, **kwargs): 48 """Packet constructor with ([buf], [field=val,...]) prototype. 49 50 Arguments: 51 52 buf -- packet buffer to unpack 53 54 Optional keyword arguments correspond to packet field names. 55 """ 56 if args: 57 self.unpack(args[0]) 58 else: 59 for k in self.__hdr_fields__: 60 setattr(self, k, self.__hdr_defaults__[k]) 61 for k, v in kwargs.iteritems(): 62 setattr(self, k, v) 63 64 def __len__(self): 65 return self.__hdr_len__ + len(self.data) 66 67 def __repr__(self): 68 l = [ '%s=%r' % (k, getattr(self, k)) 69 for k in self.__hdr_defaults__ 70 if getattr(self, k) != self.__hdr_defaults__[k] ] 71 if self.data: 72 l.append('data=%r' % self.data) 73 return '%s(%s)' % (self.__class__.__name__, ', '.join(l)) 74 75 def __str__(self): 76 return self.pack_hdr() + str(self.data) 77 78 def pack_hdr(self): 79 """Return packed header string.""" 80 try: 81 return _st_pack(self.__hdr_fmt__, 82 *[ getattr(self, k) for k in self.__hdr_fields__ ]) 83 except _st_error: 84 vals = [] 85 for k in self.__hdr_fields__: 86 v = getattr(self, k) 87 if isinstance(v, tuple): 88 vals.extend(v) 89 else: 90 vals.append(v) 91 return _st_pack(self.__hdr_fmt__, *vals) 92 93 def unpack(self, buf): 94 """Unpack packet header fields from buf, and set self.data.""" 95 96 res = list(_st_unpack(self.__hdr_fmt__, buf[:self.__hdr_len__])) 97 for e, k in enumerate(self.__slots__): 98 sfmt = self.__hdr__[e][1] 99 mat = intchr.match(sfmt) 100 if mat and mat.group('chr') != 's': 101 cnt = int(mat.group('int')) 102 setattr(self, k, list(res[:cnt])) 103 del res[:cnt] 104 else: 105 if sfmt[-1] == 's': 106 i = res[0].find('\x00') 107 if i != -1: 108 res[0] = res[0][:i] 109 setattr(self, k, res[0]) 110 del res[0] 111 assert len(res) == 0 112 self.data = buf[self.__hdr_len__:] 113 114# XXX - ''.join([(len(`chr(x)`)==3) and chr(x) or '.' for x in range(256)]) 115__vis_filter = """................................ !"#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[.]^_`abcdefghijklmnopqrstuvwxyz{|}~.................................................................................................................................""" 116 117def hexdump(buf, length=16): 118 """Return a hexdump output string of the given buffer.""" 119 n = 0 120 res = [] 121 while buf: 122 line, buf = buf[:length], buf[length:] 123 hexa = ' '.join(['%02x' % ord(x) for x in line]) 124 line = line.translate(__vis_filter) 125 res.append(' %04d: %-*s %s' % (n, length * 3, hexa, line)) 126 n += length 127 return '\n'.join(res) 128 129def in_cksum_add(s, buf): 130 """in_cksum_add(cksum, buf) -> cksum 131 132 Return accumulated Internet checksum. 133 """ 134 nleft = len(buf) 135 i = 0 136 while nleft > 1: 137 s += ord(buf[i]) * 256 + ord(buf[i+1]) 138 i += 2 139 nleft -= 2 140 if nleft: 141 s += ord(buf[i]) * 256 142 return s 143 144def in_cksum_done(s): 145 """Fold and return Internet checksum.""" 146 while (s >> 16): 147 s = (s >> 16) + (s & 0xffff) 148 return (~s & 0xffff) 149 150def in_cksum(buf): 151 """Return computed Internet checksum.""" 152 return in_cksum_done(in_cksum_add(0, buf)) 153 154try: 155 import psyco 156 psyco.bind(in_cksum) 157 psyco.bind(Packet) 158except ImportError: 159 pass 160 161