1##########################################################################
2# Copyright (c) 2009, ETH Zurich.
3# All rights reserved.
4#
5# This file is distributed under the terms in the attached LICENSE file.
6# If you do not find this file, copies can be found by writing to:
7# ETH Zurich D-INFK, Haldeneggsteig 4, CH-8092 Zurich. Attn: Systems Group.
8##########################################################################
9
10import re, socket, httplib, traceback, os, subprocess, datetime, glob, time
11import tests, debug, siteconfig
12from common import TestCommon, TimeoutError, select_timeout
13from results import ResultsBase, PassFailResult, RowResults
14
15
16WEBSERVER_TEST_FILES=['index.html', 'barrelfish.gif', 'barrelfish_sosp09.pdf', 'nevill-master-capabilities.pdf', 'razavi-master-performanceisolation.pdf']
17
18WEBSERVER_TIMEOUT=5 # seconds
19TEST_LOG_NAME = 'testlog.txt'
20
21HTTPERF_BASE_ARGS='--hog --close-with-reset --timeout 2 '
22HTTPERF_URI = '/index.html'
23
24# Webserver stress test, It will download index page repeatedly for following
25#       number of times
26WEBSERVER_STRESS_COUNTER = 3000
27
28# desired duration of an httperf test run (seconds)
29HTTPERF_DURATION = 20
30
31# sleep time between runs (seconds)
32HTTPERF_SLEEPTIME = 20
33
34# timeout for a complete run, including setup etc.
35HTTPERF_TIMEOUT = datetime.timedelta(seconds=(HTTPERF_DURATION + 30))
36
37# connection rates across all client machines
38HTTPERF_STARTRATE = 1000  # initial rate
39HTTPERF_RATEINCREMENT = 1000  # amount to increment by for each new run
40
41
42class WebCommon(TestCommon):
43
44    def __init__(self, options):
45        super(WebCommon, self).__init__(options)
46        self.test_timeout_delta = datetime.timedelta(seconds=600)
47        self.read_after_finished = True
48        self.server_failures = []
49
50    def setup(self, build, machine, testdir):
51        super(WebCommon, self).setup(build, machine, testdir)
52        self.testdir = testdir
53        self.finished = False
54        self.ip = None
55
56    def get_modules(self, build, machine):
57        modules = super(WebCommon, self).get_modules(build, machine)
58        modules.add_module("e1000n", ["auto"])
59        modules.add_module("net_sockets_server", ["nospawn"])
60        nfsip = socket.gethostbyname(siteconfig.get('WEBSERVER_NFS_HOST'))
61        modules.add_module("webserver", ["core=%d" % machine.get_coreids()[0], #2
62				nfsip, siteconfig.get('WEBSERVER_NFS_PATH')])
63#                                         siteconfig.get('WEBSERVER_NFS_TEST_PATH')])
64        return modules
65
66    def process_line(self, line):
67        m = re.match(r'# IP Addr (\d+\.\d+\.\d+\.\d+)', line)
68        if m:
69            self.ip = m.group(1)
70        elif self.ip and 'Starting webserver' in line:
71            debug.verbose("Running the tests")
72            self.runtests(self.ip)
73            self.finished = True
74        elif line.startswith("kernel PANIC!") or \
75             line.startswith("Assertion failed on core") or \
76             re.match("Assertion .* failed at line", line) or \
77             line.startswith("Aborted"):
78            # Severe error in webserver, failing test
79            if line.startswith("Aborted") and \
80               self.previous_line not in self.server_failures:
81                line = self.previous_line
82            self.server_failures.append(line.strip())
83            self.finished = True
84
85        self.previous_line = line.strip()
86
87    def passed(self):
88        return len(self.server_failures) == 0
89
90    def is_finished(self, line):
91        return self.finished or super(WebCommon, self).is_finished(line)
92
93
94@tests.add_test
95class WebserverTest(WebCommon):
96    '''tests webserver functionality'''
97    name = "webserver"
98
99    def setup(self, *args):
100        super(WebserverTest, self).setup(*args)
101        self.testlog = None
102
103    def getpage_stress(self, server, page, count):
104        debug.verbose('requesting http://%s/%s' % (server, page))
105        failure_count = 0;
106        #c = httplib.HTTPConnection(server, timeout=WEBSERVER_TIMEOUT)
107        for i in range(count):
108            try:
109                c = httplib.HTTPConnection(server, timeout=WEBSERVER_TIMEOUT)
110                c.request('GET', '/' + page)
111                r = c.getresponse()
112                if (r.status / 100) != 2 :
113                    print "HTTP request failed for %d" % (i)
114                assert((r.status / 100) == 2) # check for success response
115                data = r.read()
116                # Reset failure count after sucessful retrival
117                failure_count = 0
118                c.close()
119            except Exception as e:
120                print "HTTP request failed for %d, (failure count %d)" % (i,
121                        failure_count)
122                print "Exception: ", e
123                failure_count = failure_count + 1
124                if failure_count >= 3:
125                    print "HTTP request failed for 3 successive times."
126                    print "Giving up for %d, (failure count %d)" % (i,
127                        failure_count)
128                    raise
129
130            #c.close()
131        debug.verbose('server replied %s %s for %d times' % (r.status, r.reason, count))
132
133
134    def getpage(self, server, page):
135        debug.verbose('requesting http://%s/%s' % (server, page))
136        c = httplib.HTTPConnection(server, timeout=WEBSERVER_TIMEOUT)
137        c.request('GET', '/' + page)
138        r = c.getresponse()
139
140        debug.verbose('server replied %s %s' % (r.status, r.reason))
141        assert((r.status / 100) == 2) # check for success response
142
143        try:
144            local_path = siteconfig.get('WEBSERVER_LOCAL_PATH')
145        except AttributeError:
146            local_path = None
147        local = os.path.join(local_path, page) if local_path else None
148        if local and os.path.isfile(local) and os.access(local, os.R_OK):
149            debug.verbose('comparing content to %s' % local)
150            l = open(local, 'r')
151            # read from both files and compare
152            CHUNKSIZE = 4096
153            while True:
154                remote_data = r.read(CHUNKSIZE)
155                local_data = l.read(CHUNKSIZE)
156                if remote_data != local_data:
157                    print "Remote and local data did not match:"
158                    print "Remote data\n"
159                    print remote_data
160                    print "Local data\n"
161                    print local_data
162                assert(remote_data == local_data)
163                if len(local_data) < CHUNKSIZE:
164                    break
165
166            debug.verbose('contents matched for %s' % local)
167        c.close()
168
169    def dotest(self, func, args):
170        exception = None
171        r = None
172        try:
173            r = func(*args)
174        except Exception as e:
175            exception = e
176
177        s = 'Test: %s%s\t%s\n' % (func.__name__, str(args),
178                                 'FAIL' if exception else 'PASS')
179        if exception:
180            debug.verbose('Exception while running test: %s\n'
181                          % traceback.format_exc())
182            s += 'Error was: %s\n' % traceback.format_exc()
183        self.testlog.write(s)
184
185        return r
186
187    def runtests(self, server):
188        stress_counter = WEBSERVER_STRESS_COUNTER
189        self.testlog = open(os.path.join(self.testdir, TEST_LOG_NAME), 'w')
190        for f in WEBSERVER_TEST_FILES:
191            self.dotest(self.getpage, (server, f))
192            debug.verbose("Running stresstest: (%d GET %s)" %
193                    (stress_counter, str(f)))
194            self.dotest(self.getpage_stress, (server, f, stress_counter))
195        self.testlog.close()
196
197    def process_data(self, testdir, rawiter):
198        # the test passed iff we see at least one PASS and no FAILs in the log
199        passed = None
200        try:
201            testlog = open(os.path.join(testdir, TEST_LOG_NAME), 'r')
202        except IOError as e:
203            debug.verbose("Cannot find test log, failing test")
204            return PassFailResult(False, reason="Cannot find test log")
205
206        for line in testlog:
207            if re.match('Test:.*FAIL$', line):
208                passed = False
209            elif passed != False and re.match('Test:.*PASS$', line):
210                passed = True
211        testlog.close()
212        server_ok = super(WebserverTest, self).passed()
213        return PassFailResult(passed and server_ok)
214
215
216@tests.add_test
217class WebserverTestSf(WebserverTest):
218    '''tests webserver functionality solarflare'''
219    name = "webserver_sf"
220
221    def get_modules(self, build, machine):
222        modules = super(WebCommon, self).get_modules(build, machine)
223        modules.add_module("sfn5122f", ["auto"])
224        modules.add_module("net_sockets_server", ["nospawn"])
225        nfsip = socket.gethostbyname(siteconfig.get('WEBSERVER_NFS_HOST'))
226        modules.add_module("webserver", ["core=%d" % machine.get_coreids()[0], #2
227				nfsip, siteconfig.get('WEBSERVER_NFS_PATH')])
228        return modules
229
230@tests.add_test
231class WebserverTestE10k(WebserverTest):
232    '''tests webserver functionality e10k'''
233    name = "webserver_e10k"
234
235    def get_modules(self, build, machine):
236        modules = super(WebCommon, self).get_modules(build, machine)
237        modules.add_module("e10k", ["auto"])
238        modules.add_module("net_sockets_server", ["nospawn"])
239        nfsip = socket.gethostbyname(siteconfig.get('WEBSERVER_NFS_HOST'))
240        modules.add_module("webserver", ["core=%d" % machine.get_coreids()[0], #2
241				nfsip, siteconfig.get('WEBSERVER_NFS_PATH')])
242        return modules
243
244
245@tests.add_test
246class HTTPerfTest(WebCommon):
247    '''httperf webserver performance benchmark'''
248    name = "httperf"
249
250    def setup(self, *args):
251        super(HTTPerfTest, self).setup(*args)
252        self.nruns = 0
253
254    def _runtest(self, target, nclients, nconns, rate):
255        self.nruns += 1
256        nrun = self.nruns
257        httperfs = []
258        try:
259            for nclient in range(nclients):
260                user, host = siteconfig.site.get_load_generator()
261                assert(nrun < 100 and nclient < 100)
262                filename = 'httperf_run%02d_%02d.txt' % (nrun, nclient)
263                logfile = open(os.path.join(self.testdir, filename), 'w')
264                debug.verbose('spawning httperf on %s' % host)
265                hp = HTTPerfClient(logfile, user, host, target, nconns, rate)
266                httperfs.append(hp)
267
268            # loop collecting output from all of them
269            busy_httperfs = list(httperfs) # copy list
270            timeout = datetime.datetime.now() + HTTPERF_TIMEOUT
271            while busy_httperfs:
272                (ready, _, _) = select_timeout(timeout, busy_httperfs)
273                if not ready:
274                    raise TimeoutError('waiting for httperfs')
275                for hp in ready:
276                    try:
277                        hp.read()
278                    except EOFError:
279                        busy_httperfs.remove(hp)
280        finally:
281            debug.log('cleaning up httperf test...')
282            for hp in httperfs:
283                hp.cleanup()
284
285    def runtests(self, target):
286        nclients = siteconfig.get('HTTPERF_MAXCLIENTS')
287        firstrun = True
288        totalrate = HTTPERF_STARTRATE
289        while True:
290            if firstrun:
291                firstrun = False
292            else:
293                # sleep a moment to let things settle down between runs
294                debug.verbose('sleeping between httperf runs')
295                time.sleep(HTTPERF_SLEEPTIME)
296
297            # compute rate and total number of connections for each client
298            rate = totalrate / nclients
299            nconns = HTTPERF_DURATION * rate
300
301            debug.log('starting httperf: %d clients, %d conns, rate %d (%d per client)' %
302                      (nclients, nconns, totalrate, rate))
303            self._runtest(target, nclients, nconns, rate)
304
305            # decide whether to keep going...
306            results = self._process_run(self.nruns)
307            if not results.passed():
308                debug.log('previous test failed, stopping')
309                break
310            elif results.request_rate < (0.9 * results.connect_rate):
311                debug.log('request rate below 90% of connect rate, stopping')
312                break
313            elif results.reply_rate < (0.9 * results.request_rate):
314                debug.log('reply rate below 90% of request rate, stopping')
315                break
316            else:
317                totalrate += HTTPERF_RATEINCREMENT
318                continue
319
320    def _process_one(self, logfile):
321        ret = HTTPerfResults()
322        matches = 0
323
324        for line in logfile:
325            # Connection rate
326            m = re.match('Connection rate: (\d+\.\d+) conn/s', line)
327            if m:
328                matches += 1
329                ret.connect_rate = float(m.group(1))
330
331            # Request rate
332            m = re.match('Request rate: (\d+\.\d+) req/s', line)
333            if m:
334                matches += 1
335                ret.request_rate = float(m.group(1))
336
337            # Reply rate
338            m = re.search('Reply rate \[replies/s\]: min .* avg (\d+\.\d+)'
339                          ' max .* stddev .*', line)
340            if m:
341                matches += 1
342                ret.reply_rate = float(m.group(1))
343
344            # Bandwidth
345            m = re.match('Net I/O: .* KB/s \((\d+\.\d+)\*10\^6 bps\)', line)
346            if m:
347                matches += 1
348                ret.bandwidth = float(m.group(1))
349
350            # client-side errors
351            m = re.match('Errors: fd-unavail (\d+) addrunavail (\d+)'
352                         ' ftab-full (\d+) other (\d+)', line)
353            if m:
354                matches += 1
355                ret.fd_unavail = int(m.group(1))
356                ret.addrunavail = int(m.group(2))
357                ret.ftab_full = int(m.group(3))
358                ret.other_err = int(m.group(4))
359
360            # server-side errors
361            m = re.match('Errors: total \d+ client-timo (\d+) socket-timo (\d+)'
362                         ' connrefused (\d+) connreset (\d+)', line)
363            if m:
364                matches += 1
365                ret.client_timo = int(m.group(1))
366                ret.socket_timo = int(m.group(2))
367                ret.connrefused = int(m.group(3))
368                ret.connreset = int(m.group(4))
369
370        if matches != 6 : # otherwise we have an invalid log
371            print "Instead of 6, only %d matches found\n" % (matches)
372
373        return ret
374
375
376    def _process_run(self, nrun):
377        nameglob = 'httperf_run%02d_*.txt' % nrun
378        results = []
379        for filename in glob.iglob(os.path.join(self.testdir, nameglob)):
380            with open(filename, 'r') as logfile:
381                results.append(self._process_one(logfile))
382        return sum(results, HTTPerfResults())
383
384    def process_data(self, testdir, raw_iter):
385        self.testdir = testdir
386        totals = {}
387        for filename in glob.iglob(os.path.join(testdir, 'httperf_run*.txt')):
388            nrun = int(re.match('.*/httperf_run(\d+)_', filename).group(1))
389            result = self._process_run(nrun)
390            totals[nrun] = result
391
392        fields = 'run connect_rate request_rate reply_rate bandwidth errors'.split()
393        final = RowResults(fields)
394
395        for run in sorted(totals.keys()):
396            total = totals[run]
397            errsum = sum([getattr(total, f) for f in total._err_fields])
398            final.add_row([run, total.connect_rate, total.request_rate,
399                           total.reply_rate, total.bandwidth, errsum])
400            # XXX: often the last run will have errors in it, due to the control algorithm
401            #if errsum:
402            #    final.mark_failed()
403
404        # If we saw a severe failure (assertion failure, kernel panic, or user
405        # level panic) in the webserver, fail the test
406        if not super(HTTPerfTest, self).passed():
407            final.mark_failed('\n'.join(self.server_failures))
408
409        return final
410
411class HTTPerfResults(ResultsBase):
412    _err_fields = 'fd_unavail addrunavail ftab_full other_err'.split()
413    _result_fields = ('client_timo socket_timo connrefused connreset'
414                      ' connect_rate request_rate bandwidth reply_rate').split()
415    _fields = _err_fields + _result_fields
416
417    def __init__(self):
418        super(HTTPerfResults, self).__init__()
419        for f in self._fields:
420            setattr(self, f, 0)
421
422    def __add__(self, other):
423        ret = HTTPerfResults()
424        for f in self._fields:
425            setattr(ret, f, getattr(self, f) + getattr(other, f))
426        return ret
427
428    def passed(self):
429        return all([getattr(self, field) == 0 for field in self._err_fields])
430
431    def to_file(self, fh):
432        errs = [(f, getattr(self,f)) for f in self._err_fields if getattr(self,f)]
433        if errs:
434            fh.write('Failed run: ' + ' '.join(['%s %d' % e for e in errs]))
435
436        fh.write('Request rate:\t%f\n' % self.request_rate)
437        fh.write('Bandwidth:\t%f\n' % self.bandwidth)
438        fh.write('Reply rate:\t%f\n' % self.reply_rate)
439
440
441class HTTPerfClient(object):
442    def __init__(self, logfile, user, host, target, nconns, rate):
443        self.user = user
444        self.host = host
445        self.httperf_path = siteconfig.get('HTTPERF_PATH')
446        cmd = '%s %s' % (self.httperf_path, HTTPERF_BASE_ARGS)
447        cmd += ' --num-conns %d --rate %d --server %s --uri %s' % (
448                        nconns, rate, target, HTTPERF_URI)
449        self.proc = self._launchssh(cmd, stdout=subprocess.PIPE, bufsize=0)
450        self.logfile = logfile
451
452    def _launchssh(self, remotecmd, **kwargs):
453        ssh_dest = '%s@%s' % (self.user, self.host)
454        cmd = ['ssh'] + siteconfig.get('SSH_ARGS').split() + [ssh_dest, remotecmd]
455        return subprocess.Popen(cmd, **kwargs)
456
457    # mirror builtin file method so that we can pass this to select()
458    def fileno(self):
459        return self.proc.stdout.fileno()
460
461    def read(self):
462        # read only a single character to avoid blocking!
463        s = self.proc.stdout.read(1)
464        if s == '':
465            raise EOFError
466        self.logfile.write(s)
467
468    def cleanup(self):
469        """perform cleanup if necessary"""
470        self.logfile.close()
471        if self.proc is None or self.proc.poll() == 0:
472            return # clean exit
473
474        if self.proc.returncode:
475            debug.warning('httperf: SSH to %s exited with error %d'
476                          % (self.host, self.proc.returncode))
477        else: # kill SSH if still up
478            debug.warning('httperf: killing SSH child for %s' % self.host)
479            self.proc.terminate()
480            self.proc.wait()
481
482        # run a remote killall to get rid of any errant httperfs
483        debug.verbose('killing any errant httperfs on %s' % self.host)
484        p = self._launchssh('killall -q %s' % self.httperf_path)
485        retcode = p.wait()
486        if retcode != 0:
487            debug.warning('failed to killall httperf on %s!' % self.host)
488