1# mktestsupport.py -- Support code used by multiple test modules
2# $Id: mktestsupport.py 1230 2007-03-09 15:58:53Z jcw $
3# This is part of Metakit, see http://www.equi4.com/metakit/
4
5from test.test_support import TestFailed, verbose
6import metakit
7import sys
8import string
9
10# for overflow testing
11MAXINT = sys.maxint
12MININT = -MAXINT - 1
13MAXLONGLONG = 2**63 - 1
14MINLONGLONG = -2**63
15MAXULONGLONG = 2**64 - 1
16
17# Check for int/long integration (should fail in Python 2.2, pass in 2.3)
18try:
19    MAXLONGLONG = int(MAXLONGLONG)
20    int_long_integrated = True
21    int_long_error = OverflowError
22except OverflowError: # long int too large to convert to int
23    int_long_integrated = False
24    int_long_error = TypeError
25
26# Check for Unicode support (should fail in Python 1.5.2 or --disable-unicode, pass in 1.6)
27try:
28    UnicodeType = type(unicode(''))
29except NameError: # no Unicode support
30    UnicodeType = None
31    unicode = None
32
33class Failure:
34    """Keeps track of failures as they happen, but doesn't die on the first
35    one unless it's unrecoverable.  If failure_count > 0 when script
36    finishes, raise TestFailed."""
37
38    def __init__(self):
39        self.failure_count = 0
40
41    def fail(self, op, args, err=None, expected=None, actual=None):
42        print 'FAIL:', op, args
43        print '     ',
44        if err is not None: print err,
45        if actual is not None: print 'got', actual, actual.__class__,
46        if expected is not None: print 'expected', expected,
47        print
48        self.failure_count = self.failure_count + 1
49
50    def assess(self):
51        if self.failure_count > 0:
52            raise TestFailed(
53                '%d failures; run in verbose mode for details' % self.failure_count)
54
55class ViewTester:
56    """Inserts rows into view and Python array"""
57
58    def __init__(self, description):
59        self.storage = metakit.storage()
60        self.v = self.storage.getas(description)
61        self.arr = []
62        self.failure = Failure()
63        self.fail = self.failure.fail
64        self.columns = map(lambda c: string.split(c, ':')[0], string.split(description[string.index(description, '[') + 1:-1], ','))
65
66    def dump_view(self):
67        metakit.dump(self.v, 'VIEW CONTENTS:')
68
69    def checklen(self, args):
70        alen = len(self.arr)
71        vlen = len(self.v)
72        if alen != vlen:
73            self.fail('append', args, 'view length mismatch',
74                      actual=vlen, expected=alen)
75            try:
76                print 'ARRAY CONTENTS:'
77                for arow in self.arr: print arow
78                self.dump_view()
79            except: pass
80            raise TestFailed('unexpected number of rows in view, aborting; run in verbose mode for details')
81
82    def _append(self, args):
83        self.arr.append(args)
84
85    def insert(self, *args, **kw):
86        if kw:
87            if args:
88                raise TestFailed("can't have both positional and keyword arguments")
89            args = kw
90        try:
91            self.v.append(args)
92            self._append(args)
93        except Exception, e:
94            self.fail('append', args, actual=e)
95        try:
96            self.checklen(args)
97        except TestFailed:
98            raise
99        except Exception, e:
100            self.fail('append', args, 'spurious', actual=e)
101
102    def reject(self, exception_class=Exception, **args):
103        try:
104            ix = self.v.append(args)
105            self.fail('append', args, 'succeeded', expected=exception_class)
106            self.v.delete(ix)
107        except Exception, e:
108            if isinstance(e, exception_class):
109                if verbose:
110                    print 'PASS: rejected', args
111                    print '      as expected <%s> %s' % (e.__class__, e)
112            else:
113                self.fail('append', args, expected=exception_class, actual=e)
114        try:
115            self.checklen(args)
116        except TestFailed:
117            raise
118        except Exception, e:
119            self.fail('append', args, 'spurious', actual=e)
120
121    def finished(self):
122        if verbose:
123            self.dump_view()
124
125        # compare view with array
126        for arow, vrow in zip(self.arr, self.v):
127            failed = False
128            for f in arow.keys():
129                try:
130                    vf = getattr(vrow, f)
131                    af = arow[f]
132                    # Fix up Unicode
133                    if type(af) == UnicodeType:
134                        vf = unicode(vf, 'utf-8')
135                    if af == vf:
136                        continue
137                    # Perform the same implicit coercion as Mk4py should
138                    if type(af) != type(vf):
139                        try:
140                            af = type(vf)(af)
141                            if af == vf:
142                                continue
143                        except:
144                            pass
145                    # If we get here, we got an exception or the values didn't match
146                    # even with coercion
147                    failed = True
148                    self.fail('%s access' % f, arow, expected=af, actual=vf)
149                except Exception, e:
150                    failed = True
151                    self.fail('%s access' % f, arow, expected=arow[f], actual=e)
152            if not failed:
153                if verbose:
154                    print 'PASS: retrieved', arow
155
156        self.failure.assess()
157
158class HashedViewTester(ViewTester):
159    """Inserts rows into hashed view and Python array (where appropriate)"""
160
161    def __init__(self, description, numkeys):
162        ViewTester.__init__(self, description)
163        hv = self.storage.getas('hv[_H:I,_R:I]')
164        self.v = self.v.hash(hv, numkeys)
165
166    def _append(self, args):
167        if not hasattr(args, 'keys'): # operator module is broken in Python 2.3
168            argdict = {}
169            for i in range(len(args)):
170                argdict[self.columns[i]] = args[i]
171            args = argdict
172        if args not in self.arr:
173            self.arr.append(args)
174