1## This file is part of Scapy
2## See http://www.secdev.org/projects/scapy for more informations
3## Copyright (C) Philippe Biondi <phil@secdev.org>
4## This program is published under a GPLv2 license
5
6"""
7Unit testing infrastructure for Scapy
8"""
9
10from __future__ import absolute_import
11from __future__ import print_function
12import sys, getopt, imp, glob, importlib
13import hashlib, copy, bz2, base64, os.path, time, traceback, zlib
14from scapy.consts import WINDOWS
15import scapy.modules.six as six
16from scapy.modules.six.moves import range
17
18
19### Util class ###
20
21class Bunch:
22    __init__ = lambda self, **kw: setattr(self, '__dict__', kw)
23
24#### Import tool ####
25
26def import_module(name):
27    name = os.path.realpath(name)
28    thepath = os.path.dirname(name)
29    name = os.path.basename(name)
30    if name.endswith(".py"):
31        name = name[:-3]
32    f,path,desc = imp.find_module(name,[thepath])
33
34    try:
35        return imp.load_module(name, f, path, desc)
36    finally:
37        if f:
38            f.close()
39
40
41#### INTERNAL/EXTERNAL FILE EMBEDDING ####
42
43class File:
44    def __init__(self, name, URL, local):
45        self.name = name
46        self.local = local.encode("utf8")
47        self.URL = URL
48    def get_local(self):
49        return bz2.decompress(base64.decodestring(self.local))
50    def get_URL(self):
51        return self.URL
52    def write(self, dir):
53        if dir:
54            dir += "/"
55        open(dir+self.name,"wb").write(self.get_local())
56
57
58# Embed a base64 encoded bziped version of js and css files
59# to work if you can't reach Internet.
60class External_Files:
61    UTscapy_js = File("UTscapy.js", "http://www.secdev.org/projects/UTscapy/UTscapy.js",
62"""QlpoOTFBWSZTWWVijKQAAXxfgERUYOvAChIhBAC/79+qQAH8AFA0poANAMjQAAAG
63ABo0NGEZNBo00BhgAaNDRhGTQaNNAYFURJinplGaKbRkJiekzSenqmpA0Gm1LFMp
64RUklVQlK9WUTZYpNFI1IiEWEFT09Sfj5uO+qO6S5DQwKIxM92+Zku94wL6V/1KTK
65an2c66Ug6SmVKy1ZIrgauxMVLF5xLH0lJRQuKlqLF10iatlTzqvw7S9eS3+h4lu3
66GZyMgoOude3NJ1pQy8eo+X96IYZw+ynehsiPj73m0rnvQ3QXZ9BJQiZQYQ5/uNcl
672WOlC5vyQqV/BWsnr2NZYLYXQLDs/Bffk4ZfR4/SH6GfA5Xlek4xHNHqbSsRbREO
68gueXo3kcYi94K6hSO3ldD2O/qJXOFqJ8o3TE2aQahxtQpCVUKQMvODHwu2YkaORY
69ZC6gihEallcHDIAtRPScBACAJnUggYhLDX6DEko7nC9GvAw5OcEkiyDUbLdiGCzD
70aXWMC2DuQ2Y6sGf6NcRuON7QSbhHsPc4KKmZ/xdyRThQkGVijKQ=""")
71    UTscapy_css = File("UTscapy.css","http://www.secdev.org/projects/UTscapy/UTscapy.css",
72"""QlpoOTFBWSZTWTbBCNEAAE7fgHxwSB//+Cpj2QC//9/6UAR+63dxbNzO3ccmtGEk
73pM0m1I9E/Qp6g9Q09TNQ9QDR6gMgAkiBFG9U9TEGRkGgABoABoBmpJkRAaAxD1AN
74Gh6gNADQBzAATJgATCYJhDAEYAEiQkwIyJk0n6qenpqeoaMUeo9RgIxp6pX78kfx
75Jx4MUhDHKEb2pJAYAelG1cybiZBBDipH8ocxNyHDAqTUxiQmIAEDE3ApIBUUECAT
767Lvlf4xA/sVK0QHkSlYtT0JmErdOjx1v5NONPYSjrIhQnbl1MbG5m+InMYmVAWJp
77uklD9cNdmQv2YigxbEtgUrsY2pDDV/qMT2SHnHsViu2rrp2LA01YJIHZqjYCGIQN
78sGNobFxAYHLqqMOj9TI2Y4GRpRCUGu82PnMnXUBgDSkTY4EfmygaqvUwbGMbPwyE
79220Q4G+sDvw7+6in3CAOS634pcOEAdREUW+QqMjvWvECrGISo1piv3vqubTGOL1c
80ssrFnnSfU4T6KSCbPs98HJ2yjWN4i8Bk5WrM/JmELLNeZ4vgMkA4JVQInNnWTUTe
81gmMSlJd/b7JuRwiM5RUzXOBTa0e3spO/rsNJiylu0rCxygdRo2koXdSJzmUVjJUm
82BOFIkUKq8LrE+oT9h2qUqqUQ25fGV7e7OFkpmZopqUi0WeIBzlXdYY0Zz+WUJUTC
83RC+CIPFIYh1RkopswMAop6ZjuZKRqR0WNuV+rfuF5aCXPpxAm0F14tPyhf42zFMT
84GJUMxxowJnoauRq4xGQk+2lYFxbQ0FiC43WZSyYLHMuo5NTJ92QLAgs4FgOyZQqQ
85xpsGKMA0cIisNeiootpnlWQvkPzNGUTPg8jqkwTvqQLguZLKJudha1hqfBib1IfO
86LNChcU6OqF+3wyPKg5Y5oSbSJPAMcRDANwmS2i9oZm6vsD1pLkWtFGbAkEjjCuEU
87W1ev1IsF2UVmWYFtJkqLT708ApUBK/ig3rbJWSq7RGQd3sSrOKu3lyKzTBdkXK2a
88BGLV5dS1XURdKxaRkMplLLQxsimBYZEAa8KQkYyI+4EagMqycRR7RgwtZFxJSu0T
891q5wS2JG82iETHplbNj8DYo9IkmKzNAiw4FxK8bRfIYvwrbshbEagL11AQJFsqeZ
90WeXDoWEx2FMyyZRAB5QyCFnwYtwtWAQmmITY8aIM2SZyRnHH9Wi8+Sr2qyCscFYo
91vzM985aHXOHAxQN2UQZbQkUv3D4Vc+lyvalAffv3Tyg4ks3a22kPXiyeCGweviNX
920K8TKasyOhGsVamTUAZBXfQVw1zmdS4rHDnbHgtIjX3DcCt6UIr0BHTYjdV0JbPj
93r1APYgXihjQwM2M83AKIhwQQJv/F3JFOFCQNsEI0QA==""")
94    def get_local_dict(cls):
95        return {x: y.name for (x, y) in six.iteritems(cls.__dict__)
96                if isinstance(y, File)}
97    get_local_dict = classmethod(get_local_dict)
98    def get_URL_dict(cls):
99        return {x: y.URL for (x, y) in six.iteritems(cls.__dict__)
100                if isinstance(y, File)}
101    get_URL_dict = classmethod(get_URL_dict)
102
103
104#### HELPER CLASSES FOR PARAMETRING OUTPUT FORMAT ####
105
106class EnumClass:
107    def from_string(cls,x):
108        return cls.__dict__[x.upper()]
109    from_string = classmethod(from_string)
110
111class Format(EnumClass):
112    TEXT  = 1
113    ANSI  = 2
114    HTML  = 3
115    LATEX = 4
116    XUNIT = 5
117
118
119#### TEST CLASSES ####
120
121class TestClass:
122    def __getitem__(self, item):
123        return getattr(self, item)
124    def add_keywords(self, kws):
125        if isinstance(kws, six.string_types):
126            kws = [kws]
127        for kwd in kws:
128            if kwd.startswith('-'):
129                try:
130                    self.keywords.remove(kwd[1:])
131                except KeyError:
132                    pass
133            else:
134                self.keywords.add(kwd)
135
136class TestCampaign(TestClass):
137    def __init__(self, title):
138        self.title = title
139        self.filename = None
140        self.headcomments = ""
141        self.campaign = []
142        self.keywords = set()
143        self.crc = None
144        self.sha = None
145        self.preexec = None
146        self.preexec_output = None
147        self.end_pos = 0
148    def add_testset(self, testset):
149        self.campaign.append(testset)
150        testset.keywords.update(self.keywords)
151    def startNum(self, beginpos):
152        for ts in self:
153            for t in ts:
154                t.num = beginpos
155                beginpos += 1
156        self.end_pos = beginpos
157    def __iter__(self):
158        return self.campaign.__iter__()
159    def all_tests(self):
160        for ts in self:
161            for t in ts:
162                yield t
163
164class TestSet(TestClass):
165    def __init__(self, name):
166        self.name = name
167        self.tests = []
168        self.comments = ""
169        self.keywords = set()
170        self.crc = None
171        self.expand = 1
172    def add_test(self, test):
173        self.tests.append(test)
174        test.keywords.update(self.keywords)
175    def __iter__(self):
176        return self.tests.__iter__()
177
178class UnitTest(TestClass):
179    def __init__(self, name):
180        self.name = name
181        self.test = ""
182        self.comments = ""
183        self.result = ""
184        self.res = True  # must be True at init to have a different truth value than None
185        self.output = ""
186        self.num = -1
187        self.keywords = set()
188        self.crc = None
189        self.expand = 1
190    def decode(self):
191        if six.PY2:
192            self.test = self.test.decode("utf8", "ignore")
193            self.output = self.output.decode("utf8", "ignore")
194            self.comments = self.comments.decode("utf8", "ignore")
195            self.result = self.result.decode("utf8", "ignore")
196    def __nonzero__(self):
197        return self.res
198    __bool__ = __nonzero__
199
200
201# Careful note: all data not included will be set by default.
202# Use -c as first argument !!
203def parse_config_file(config_path, verb=3):
204    """Parse provided json to get configuration
205    Empty default json:
206    {
207      "testfiles": [],
208      "onlyfailed": false,
209      "verb": 2,
210      "dump": 0,
211      "crc": true,
212      "scapy": "scapy",
213      "preexec": {},
214      "global_preexec": "",
215      "outputfile": null,
216      "local": true,
217      "format": "ansi",
218      "num": null,
219      "modules": [],
220      "kw_ok": [],
221      "kw_ko": []
222    }
223
224    """
225    import json, unicodedata
226    with open(config_path) as config_file:
227        data = json.load(config_file, encoding="utf8")
228        if verb > 2:
229            print("### Loaded config file", config_path, file=sys.stderr)
230    def get_if_exist(key, default):
231        return data[key] if key in data else default
232    return Bunch(testfiles=get_if_exist("testfiles", []), onlyfailed=get_if_exist("onlyfailed", False),
233                 verb=get_if_exist("verb", 3), dump=get_if_exist("dump", 0), crc=get_if_exist("crc", 1),
234                 scapy=get_if_exist("scapy", "scapy"), preexec=get_if_exist("preexec", {}),
235                 global_preexec=get_if_exist("global_preexec", ""), outfile=get_if_exist("outputfile", sys.stdout),
236                 local=get_if_exist("local", 0), num=get_if_exist("num", None), modules=get_if_exist("modules", []),
237                 kw_ok=get_if_exist("kw_ok", []), kw_ko=get_if_exist("kw_ko", []), format=get_if_exist("format", "ansi"))
238
239#### PARSE CAMPAIGN ####
240
241def parse_campaign_file(campaign_file):
242    test_campaign = TestCampaign("Test campaign")
243    test_campaign.filename=  campaign_file.name
244    testset = None
245    test = None
246    testnb = 0
247
248    for l in campaign_file.readlines():
249        if l[0] == '#':
250            continue
251        if l[0] == "~":
252            (test or testset or test_campaign).add_keywords(l[1:].split())
253        elif l[0] == "%":
254            test_campaign.title = l[1:].strip()
255        elif l[0] == "+":
256            testset = TestSet(l[1:].strip())
257            test_campaign.add_testset(testset)
258            test = None
259        elif l[0] == "=":
260            test = UnitTest(l[1:].strip())
261            test.num = testnb
262            testnb += 1
263            testset.add_test(test)
264        elif l[0] == "*":
265            if test is not None:
266                test.comments += l[1:]
267            elif testset is not None:
268                testset.comments += l[1:]
269            else:
270                test_campaign.headcomments += l[1:]
271        else:
272            if test is None:
273                if l.strip():
274                    print("Unknown content [%s]" % l.strip(), file=sys.stderr)
275            else:
276                test.test += l
277    return test_campaign
278
279def dump_campaign(test_campaign):
280    print("#"*(len(test_campaign.title)+6))
281    print("## %(title)s ##" % test_campaign)
282    print("#"*(len(test_campaign.title)+6))
283    if test_campaign.sha and test_campaign.crc:
284        print("CRC=[%(crc)s] SHA=[%(sha)s]" % test_campaign)
285    print("from file %(filename)s" % test_campaign)
286    print()
287    for ts in test_campaign:
288        if ts.crc:
289            print("+--[%s]%s(%s)--" % (ts.name,"-"*max(2,80-len(ts.name)-18),ts.crc))
290        else:
291            print("+--[%s]%s" % (ts.name,"-"*max(2,80-len(ts.name)-6)))
292        if ts.keywords:
293            print("  kw=%s" % ",".join(ts.keywords))
294        for t in ts:
295            print("%(num)03i %(name)s" % t)
296            c = k = ""
297            if t.keywords:
298                k = "kw=%s" % ",".join(t.keywords)
299            if t.crc:
300                c = "[%(crc)s] " % t
301            if c or k:
302                print("    %s%s" % (c,k))
303
304#### COMPUTE CAMPAIGN DIGESTS ####
305if six.PY2:
306    def crc32(x):
307        return "%08X" % (0xffffffff & zlib.crc32(x))
308
309    def sha1(x):
310         return hashlib.sha1(x).hexdigest().upper()
311else:
312    def crc32(x):
313        return "%08X" % (0xffffffff & zlib.crc32(bytearray(x, "utf8")))
314
315    def sha1(x):
316        return hashlib.sha1(x.encode("utf8")).hexdigest().upper()
317
318def compute_campaign_digests(test_campaign):
319    dc = ""
320    for ts in test_campaign:
321        dts = ""
322        for t in ts:
323            dt = t.test.strip()
324            t.crc = crc32(dt)
325            dts += "\0"+dt
326        ts.crc = crc32(dts)
327        dc += "\0\x01"+dts
328    test_campaign.crc = crc32(dc)
329    test_campaign.sha = sha1(open(test_campaign.filename).read())
330
331
332#### FILTER CAMPAIGN #####
333
334def filter_tests_on_numbers(test_campaign, num):
335    if num:
336        for ts in test_campaign:
337            ts.tests = [t for t in ts.tests if t.num in num]
338        test_campaign.campaign = [ts for ts in test_campaign.campaign
339                                  if ts.tests]
340
341def filter_tests_keep_on_keywords(test_campaign, kw):
342    def kw_match(lst, kw):
343        for k in lst:
344            if k in kw:
345                return True
346        return False
347
348    if kw:
349        for ts in test_campaign:
350            ts.tests = [t for t in ts.tests if kw_match(t.keywords, kw)]
351
352def filter_tests_remove_on_keywords(test_campaign, kw):
353    def kw_match(lst, kw):
354        for k in kw:
355            if k in lst:
356                return True
357        return False
358
359    if kw:
360        for ts in test_campaign:
361            ts.tests = [t for t in ts.tests if not kw_match(t.keywords, kw)]
362
363
364def remove_empty_testsets(test_campaign):
365    test_campaign.campaign = [ts for ts in test_campaign.campaign if ts.tests]
366
367
368#### RUN CAMPAIGN #####
369
370def run_campaign(test_campaign, get_interactive_session, verb=3, ignore_globals=None):
371    passed=failed=0
372    if test_campaign.preexec:
373        test_campaign.preexec_output = get_interactive_session(test_campaign.preexec.strip(), ignore_globals=ignore_globals)[0]
374    for testset in test_campaign:
375        for t in testset:
376            t.output,res = get_interactive_session(t.test.strip(), ignore_globals=ignore_globals)
377            the_res = False
378            try:
379                if res is None or res:
380                    the_res= True
381            except Exception as msg:
382                t.output+="UTscapy: Error during result interpretation:\n"
383                t.output+="".join(traceback.format_exception(sys.exc_info()[0], sys.exc_info()[1], sys.exc_info()[2],))
384            if the_res:
385                t.res = True
386                res = "passed"
387                passed += 1
388            else:
389                t.res = False
390                res = "failed"
391                failed += 1
392            t.result = res
393            t.decode()
394            if verb > 1:
395                print("%(result)6s %(crc)s %(name)s" % t, file=sys.stderr)
396    test_campaign.passed = passed
397    test_campaign.failed = failed
398    if verb:
399        print("Campaign CRC=%(crc)s  SHA=%(sha)s" % test_campaign, file=sys.stderr)
400        print("PASSED=%i FAILED=%i" % (passed, failed), file=sys.stderr)
401    return failed
402
403
404#### INFO LINES ####
405
406def info_line(test_campaign):
407    filename = test_campaign.filename
408    if filename is None:
409        return "Run %s by UTscapy" % time.ctime()
410    else:
411        return "Run %s from [%s] by UTscapy" % (time.ctime(), filename)
412
413def html_info_line(test_campaign):
414    filename = test_campaign.filename
415    if filename is None:
416        return """Run %s by <a href="http://www.secdev.org/projects/UTscapy/">UTscapy</a><br>""" % time.ctime()
417    else:
418        return """Run %s from [%s] by <a href="http://www.secdev.org/projects/UTscapy/">UTscapy</a><br>""" % (time.ctime(), filename)
419
420
421#### CAMPAIGN TO something ####
422
423def campaign_to_TEXT(test_campaign):
424    output="%(title)s\n" % test_campaign
425    output += "-- "+info_line(test_campaign)+"\n\n"
426    output += "Passed=%(passed)i\nFailed=%(failed)i\n\n%(headcomments)s\n" % test_campaign
427
428    for testset in test_campaign:
429        if any(t.expand for t in testset):
430            output += "######\n## %(name)s\n######\n%(comments)s\n\n" % testset
431            for t in testset:
432                if t.expand:
433                    output += "###(%(num)03i)=[%(result)s] %(name)s\n%(comments)s\n%(output)s\n\n" % t
434
435    return output
436
437def campaign_to_ANSI(test_campaign):
438    output="%(title)s\n" % test_campaign
439    output += "-- "+info_line(test_campaign)+"\n\n"
440    output += "Passed=%(passed)i\nFailed=%(failed)i\n\n%(headcomments)s\n" % test_campaign
441
442    for testset in test_campaign:
443        if any(t.expand for t in testset):
444            output += "######\n## %(name)s\n######\n%(comments)s\n\n" % testset
445            for t in testset:
446                if t.expand:
447                    output += "###(%(num)03i)=[%(result)s] %(name)s\n%(comments)s\n%(output)s\n\n" % t
448
449    return output
450
451def campaign_to_xUNIT(test_campaign):
452    output='<?xml version="1.0" encoding="UTF-8" ?>\n<testsuite>\n'
453    for testset in test_campaign:
454        for t in testset:
455            output += ' <testcase classname="%s"\n' % testset.name.encode("string_escape").replace('"',' ')
456            output += '           name="%s"\n' % t.name.encode("string_escape").replace('"',' ')
457            output += '           duration="0">\n' % t
458            if not t.res:
459                output += '<error><![CDATA[%(output)s]]></error>\n' % t
460            output += "</testcase>\n"
461    output += '</testsuite>'
462    return output
463
464
465def campaign_to_HTML(test_campaign):
466    output = """
467<h1>%(title)s</h1>
468
469<p>
470""" % test_campaign
471
472    if test_campaign.crc is not None and test_campaign.sha is not None:
473        output += "CRC=<span class=crc>%(crc)s</span> SHA=<span class=crc>%(sha)s</span><br>" % test_campaign
474    output += "<small><em>"+html_info_line(test_campaign)+"</em></small>"
475    output += test_campaign.headcomments +  "\n<p>PASSED=%(passed)i FAILED=%(failed)i<p>\n\n" % test_campaign
476
477    for testset in test_campaign:
478        output += "<h2>" % testset
479        if testset.crc is not None:
480            output += "<span class=crc>%(crc)s</span> " % testset
481        output += "%(name)s</h2>\n%(comments)s\n<ul>\n" % testset
482        for t in testset:
483            output += """<li class=%(result)s id="tst%(num)il">\n""" % t
484            if t.expand == 2:
485                output +="""
486<span id="tst%(num)i+" class="button%(result)s" onClick="show('tst%(num)i')" style="POSITION: absolute; VISIBILITY: hidden;">+%(num)03i+</span>
487<span id="tst%(num)i-" class="button%(result)s" onClick="hide('tst%(num)i')">-%(num)03i-</span>
488""" % t
489            else:
490                output += """
491<span id="tst%(num)i+" class="button%(result)s" onClick="show('tst%(num)i')">+%(num)03i+</span>
492<span id="tst%(num)i-" class="button%(result)s" onClick="hide('tst%(num)i')" style="POSITION: absolute; VISIBILITY: hidden;">-%(num)03i-</span>
493""" % t
494            if t.crc is not None:
495                output += "<span class=crc>%(crc)s</span>\n" % t
496            output += """%(name)s\n<span class="comment %(result)s" id="tst%(num)i" """ % t
497            if t.expand < 2:
498                output += """ style="POSITION: absolute; VISIBILITY: hidden;" """
499            output += """><br>%(comments)s
500<pre>
501%(output)s</pre></span>
502""" % t
503        output += "\n</ul>\n\n"
504    return output
505
506def pack_html_campaigns(runned_campaigns, data, local=0, title=None):
507    output = """
508<html>
509<head>
510<title>%(title)s</title>
511<h1>UTScapy tests</h1>
512
513<span class=button onClick="hide_all('tst')">Shrink All</span>
514<span class=button onClick="show_all('tst')">Expand All</span>
515<span class=button onClick="show_passed('tst')">Expand Passed</span>
516<span class=button onClick="show_failed('tst')">Expand Failed</span>
517
518<p>
519"""
520    for test_campaign in runned_campaigns:
521        for ts in test_campaign:
522            for t in ts:
523                output += """<span class=button%(result)s onClick="goto_id('tst%(num)il')">%(num)03i</span>\n""" % t
524
525    output += """</p>\n\n
526<link rel="stylesheet" href="%(UTscapy_css)s" type="text/css">
527<script language="JavaScript" src="%(UTscapy_js)s" type="text/javascript"></script>
528</head>
529<body>
530%(data)s
531</body></html>
532"""
533    out_dict = {'data': data, 'title': title if title else "UTScapy tests"}
534    if local:
535        External_Files.UTscapy_js.write(os.path.dirname(test_campaign.output_file.name))
536        External_Files.UTscapy_css.write(os.path.dirname(test_campaign.output_file.name))
537        out_dict.update(External_Files.get_local_dict())
538    else:
539        out_dict.update(External_Files.get_URL_dict())
540
541    output %= out_dict
542    return output
543
544def campaign_to_LATEX(test_campaign):
545    output = r"""\documentclass{report}
546\usepackage{alltt}
547\usepackage{xcolor}
548\usepackage{a4wide}
549\usepackage{hyperref}
550
551\title{%(title)s}
552\date{%%s}
553
554\begin{document}
555\maketitle
556\tableofcontents
557
558\begin{description}
559\item[Passed:] %(passed)i
560\item[Failed:] %(failed)i
561\end{description}
562
563%(headcomments)s
564
565""" % test_campaign
566    output %= info_line(test_campaign)
567
568    for testset in test_campaign:
569        output += "\\chapter{%(name)s}\n\n%(comments)s\n\n" % testset
570        for t in testset:
571            if t.expand:
572                output += r"""\section{%(name)s}
573
574[%(num)03i] [%(result)s]
575
576%(comments)s
577\begin{alltt}
578%(output)s
579\end{alltt}
580
581""" % t
582
583    output += "\\end{document}\n"
584    return output
585
586
587
588#### USAGE ####
589
590def usage():
591    print("""Usage: UTscapy [-m module] [-f {text|ansi|HTML|LaTeX}] [-o output_file]
592               [-t testfile] [-T testfile] [-k keywords [-k ...]] [-K keywords [-K ...]]
593               [-l] [-d|-D] [-F] [-q[q]] [-P preexecute_python_code]
594               [-s /path/to/scapy] [-c configfile]
595-t\t\t: provide test files (can be used many times)
596-T\t\t: if -t is used with *, remove a specific file (can be used many times)
597-l\t\t: generate local files
598-F\t\t: expand only failed tests
599-d\t\t: dump campaign
600-D\t\t: dump campaign and stop
601-C\t\t: don't calculate CRC and SHA
602-s\t\t: path to scapy.py
603-c\t\t: load a .utsc config file
604-q\t\t: quiet mode
605-qq\t\t: [silent mode]
606-n <testnum>\t: only tests whose numbers are given (eg. 1,3-7,12)
607-m <module>\t: additional module to put in the namespace
608-k <kw1>,<kw2>,...\t: include only tests with one of those keywords (can be used many times)
609-K <kw1>,<kw2>,...\t: remove tests with one of those keywords (can be used many times)
610-P <preexecute_python_code>
611""", file=sys.stderr)
612    raise SystemExit
613
614
615#### MAIN ####
616
617def execute_campaign(TESTFILE, OUTPUTFILE, PREEXEC, NUM, KW_OK, KW_KO, DUMP,
618                     FORMAT, VERB, ONLYFAILED, CRC, autorun_func, pos_begin=0, ignore_globals=None):
619    # Parse test file
620    test_campaign = parse_campaign_file(TESTFILE)
621
622    # Report parameters
623    if PREEXEC:
624        test_campaign.preexec = PREEXEC
625
626    # Compute campaign CRC and SHA
627    if CRC:
628        compute_campaign_digests(test_campaign)
629
630    # Filter out unwanted tests
631    filter_tests_on_numbers(test_campaign, NUM)
632    for k in KW_OK:
633        filter_tests_keep_on_keywords(test_campaign, k)
634    for k in KW_KO:
635        filter_tests_remove_on_keywords(test_campaign, k)
636
637    remove_empty_testsets(test_campaign)
638
639
640    # Dump campaign
641    if DUMP:
642        dump_campaign(test_campaign)
643        if DUMP > 1:
644            sys.exit()
645
646    # Run tests
647    test_campaign.output_file = OUTPUTFILE
648    result = run_campaign(test_campaign, autorun_func[FORMAT], verb=VERB, ignore_globals=None)
649
650    # Shrink passed
651    if ONLYFAILED:
652        for t in test_campaign.all_tests():
653            if t:
654                t.expand = 0
655            else:
656                t.expand = 2
657
658    pos_end = 0
659    # Generate report
660    if FORMAT == Format.TEXT:
661        output = campaign_to_TEXT(test_campaign)
662    elif FORMAT == Format.ANSI:
663        output = campaign_to_ANSI(test_campaign)
664    elif FORMAT == Format.HTML:
665        test_campaign.startNum(pos_begin)
666        output = campaign_to_HTML(test_campaign)
667    elif FORMAT == Format.LATEX:
668        output = campaign_to_LATEX(test_campaign)
669    elif FORMAT == Format.XUNIT:
670        output = campaign_to_xUNIT(test_campaign)
671
672    return output, (result == 0), test_campaign
673
674def resolve_testfiles(TESTFILES):
675    for tfile in TESTFILES[:]:
676        if "*" in tfile:
677            TESTFILES.remove(tfile)
678            TESTFILES.extend(glob.glob(tfile))
679    return TESTFILES
680
681def main(argv):
682    ignore_globals = list(six.moves.builtins.__dict__.keys())
683
684    # Parse arguments
685
686    FORMAT = Format.ANSI
687    TESTFILE = sys.stdin
688    OUTPUTFILE = sys.stdout
689    LOCAL = 0
690    NUM = None
691    KW_OK = []
692    KW_KO = []
693    DUMP = 0
694    CRC = True
695    ONLYFAILED = False
696    VERB = 3
697    GLOB_PREEXEC = ""
698    PREEXEC_DICT = {}
699    SCAPY = "scapy"
700    MODULES = []
701    TESTFILES = []
702    try:
703        opts = getopt.getopt(argv, "o:t:T:c:f:hln:m:k:K:DdCFqP:s:")
704        for opt,optarg in opts[0]:
705            if opt == "-h":
706                usage()
707            elif opt == "-F":
708                ONLYFAILED = True
709            elif opt == "-q":
710                VERB -= 1
711            elif opt == "-D":
712                DUMP = 2
713            elif opt == "-d":
714                DUMP = 1
715            elif opt == "-C":
716                CRC = False
717            elif opt == "-s":
718                SCAPY = optarg
719            elif opt == "-P":
720                GLOB_PREEXEC += "\n"+optarg
721            elif opt == "-f":
722                try:
723                    FORMAT = Format.from_string(optarg)
724                except KeyError as msg:
725                    raise getopt.GetoptError("Unknown output format %s" % msg)
726            elif opt == "-t":
727                TESTFILES.append(optarg)
728                TESTFILES = resolve_testfiles(TESTFILES)
729            elif opt == "-T":
730                TESTFILES.remove(optarg)
731            elif opt == "-c":
732                data = parse_config_file(optarg, VERB)
733                ONLYFAILED = data.onlyfailed
734                VERB = data.verb
735                DUMP = data.dump
736                CRC = data.crc
737                SCAPY = data.scapy
738                PREEXEC_DICT = data.preexec
739                GLOB_PREEXEC = data.global_preexec
740                OUTPUTFILE = data.outfile
741                TESTFILES = data.testfiles
742                LOCAL = 1 if data.local else 0
743                NUM = data.num
744                MODULES = data.modules
745                KW_OK = [data.kw_ok]
746                KW_KO = [data.kw_ko]
747                try:
748                    FORMAT = Format.from_string(data.format)
749                except KeyError as msg:
750                    raise getopt.GetoptError("Unknown output format %s" % msg)
751                TESTFILES = resolve_testfiles(TESTFILES)
752            elif opt == "-o":
753                OUTPUTFILE = open(optarg, "wb")
754            elif opt == "-l":
755                LOCAL = 1
756            elif opt == "-n":
757                NUM = []
758                for v in (x.strip() for x in optarg.split(",")):
759                    try:
760                        NUM.append(int(v))
761                    except ValueError:
762                        v1, v2 = [int(e) for e in v.split('-', 1)]
763                        NUM.extend(range(v1, v2 + 1))
764            elif opt == "-m":
765                MODULES.append(optarg)
766            elif opt == "-k":
767                KW_OK.append(optarg.split(","))
768            elif opt == "-K":
769                KW_KO.append(optarg.split(","))
770
771        if VERB > 2:
772            print("### Booting scapy...", file=sys.stderr)
773        try:
774            from scapy import all as scapy
775        except ImportError as e:
776            raise getopt.GetoptError("cannot import [%s]: %s" % (SCAPY,e))
777
778        for m in MODULES:
779            try:
780                mod = import_module(m)
781                six.moves.builtins.__dict__.update(mod.__dict__)
782            except ImportError as e:
783                raise getopt.GetoptError("cannot import [%s]: %s" % (m,e))
784
785    except getopt.GetoptError as msg:
786        print("ERROR:",msg, file=sys.stderr)
787        raise SystemExit
788
789    autorun_func = {
790        Format.TEXT: scapy.autorun_get_text_interactive_session,
791        Format.ANSI: scapy.autorun_get_ansi_interactive_session,
792        Format.HTML: scapy.autorun_get_html_interactive_session,
793        Format.LATEX: scapy.autorun_get_latex_interactive_session,
794        Format.XUNIT: scapy.autorun_get_text_interactive_session,
795        }
796
797    if VERB > 2:
798        print("### Starting tests...", file=sys.stderr)
799
800    glob_output = ""
801    glob_result = 0
802    glob_title = None
803
804    UNIQUE = len(TESTFILES) == 1
805
806    # Resolve tags and asterix
807    for prex in six.iterkeys(copy.copy(PREEXEC_DICT)):
808        if "*" in prex:
809            pycode = PREEXEC_DICT[prex]
810            del PREEXEC_DICT[prex]
811            for gl in glob.iglob(prex):
812                _pycode = pycode.replace("%name%", os.path.splitext(os.path.split(gl)[1])[0])
813                PREEXEC_DICT[gl] = _pycode
814
815    pos_begin = 0
816
817    runned_campaigns = []
818    # Execute all files
819    for TESTFILE in TESTFILES:
820        if VERB > 2:
821            print("### Loading:", TESTFILE, file=sys.stderr)
822        PREEXEC = PREEXEC_DICT[TESTFILE] if TESTFILE in PREEXEC_DICT else GLOB_PREEXEC
823        output, result, campaign = execute_campaign(open(TESTFILE), OUTPUTFILE,
824                                          PREEXEC, NUM, KW_OK, KW_KO,
825                                          DUMP, FORMAT, VERB, ONLYFAILED,
826                                          CRC, autorun_func, pos_begin, ignore_globals)
827        runned_campaigns.append(campaign)
828        pos_begin = campaign.end_pos
829        if UNIQUE:
830            glob_title = campaign.title
831        glob_output += output
832        if not result:
833            glob_result = 1
834            break
835
836    if VERB > 2:
837            print("### Writing output...", file=sys.stderr)
838    # Concenate outputs
839    if FORMAT == Format.HTML:
840        glob_output = pack_html_campaigns(runned_campaigns, glob_output, LOCAL, glob_title)
841
842    OUTPUTFILE.write(glob_output.encode("utf8", "ignore")
843                     if 'b' in OUTPUTFILE.mode else glob_output)
844    OUTPUTFILE.close()
845
846    # Return state
847    return glob_result
848
849if __name__ == "__main__":
850    sys.exit(main(sys.argv[1:]))
851