1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import importlib
15import json
16import subprocess
17import time
18import traceback
19from collections import OrderedDict
20from string import Template
21
22from tdc_config import *
23from tdc_helper import *
24
25import TdcPlugin
26
27
28class PluginMgrTestFail(Exception):
29    def __init__(self, stage, output, message):
30        self.stage = stage
31        self.output = output
32        self.message = message
33
34class PluginMgr:
35    def __init__(self, argparser):
36        super().__init__()
37        self.plugins = {}
38        self.plugin_instances = []
39        self.args = []
40        self.argparser = argparser
41
42        # TODO, put plugins in order
43        plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
44        for dirpath, dirnames, filenames in os.walk(plugindir):
45            for fn in filenames:
46                if (fn.endswith('.py') and
47                    not fn == '__init__.py' and
48                    not fn.startswith('#') and
49                    not fn.startswith('.#')):
50                    mn = fn[0:-3]
51                    foo = importlib.import_module('plugins.' + mn)
52                    self.plugins[mn] = foo
53                    self.plugin_instances.append(foo.SubPlugin())
54
55    def call_pre_suite(self, testcount, testidlist):
56        for pgn_inst in self.plugin_instances:
57            pgn_inst.pre_suite(testcount, testidlist)
58
59    def call_post_suite(self, index):
60        for pgn_inst in reversed(self.plugin_instances):
61            pgn_inst.post_suite(index)
62
63    def call_pre_case(self, test_ordinal, testid):
64        for pgn_inst in self.plugin_instances:
65            try:
66                pgn_inst.pre_case(test_ordinal, testid)
67            except Exception as ee:
68                print('exception {} in call to pre_case for {} plugin'.
69                      format(ee, pgn_inst.__class__))
70                print('test_ordinal is {}'.format(test_ordinal))
71                print('testid is {}'.format(testid))
72                raise
73
74    def call_post_case(self):
75        for pgn_inst in reversed(self.plugin_instances):
76            pgn_inst.post_case()
77
78    def call_pre_execute(self):
79        for pgn_inst in self.plugin_instances:
80            pgn_inst.pre_execute()
81
82    def call_post_execute(self):
83        for pgn_inst in reversed(self.plugin_instances):
84            pgn_inst.post_execute()
85
86    def call_add_args(self, parser):
87        for pgn_inst in self.plugin_instances:
88            parser = pgn_inst.add_args(parser)
89        return parser
90
91    def call_check_args(self, args, remaining):
92        for pgn_inst in self.plugin_instances:
93            pgn_inst.check_args(args, remaining)
94
95    def call_adjust_command(self, stage, command):
96        for pgn_inst in self.plugin_instances:
97            command = pgn_inst.adjust_command(stage, command)
98        return command
99
100    @staticmethod
101    def _make_argparser(args):
102        self.argparser = argparse.ArgumentParser(
103            description='Linux TC unit tests')
104
105
106def replace_keywords(cmd):
107    """
108    For a given executable command, substitute any known
109    variables contained within NAMES with the correct values
110    """
111    tcmd = Template(cmd)
112    subcmd = tcmd.safe_substitute(NAMES)
113    return subcmd
114
115
116def exec_cmd(args, pm, stage, command):
117    """
118    Perform any required modifications on an executable command, then run
119    it in a subprocess and return the results.
120    """
121    if len(command.strip()) == 0:
122        return None, None
123    if '$' in command:
124        command = replace_keywords(command)
125
126    command = pm.call_adjust_command(stage, command)
127    if args.verbose > 0:
128        print('command "{}"'.format(command))
129    proc = subprocess.Popen(command,
130        shell=True,
131        stdout=subprocess.PIPE,
132        stderr=subprocess.PIPE,
133        env=ENVIR)
134    (rawout, serr) = proc.communicate()
135
136    if proc.returncode != 0 and len(serr) > 0:
137        foutput = serr.decode("utf-8", errors="ignore")
138    else:
139        foutput = rawout.decode("utf-8", errors="ignore")
140
141    proc.stdout.close()
142    proc.stderr.close()
143    return proc, foutput
144
145
146def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
147    """
148    Execute the setup/teardown commands for a test case.
149    Optionally terminate test execution if the command fails.
150    """
151    if args.verbose > 0:
152        print('{}'.format(prefix))
153    for cmdinfo in cmdlist:
154        if isinstance(cmdinfo, list):
155            exit_codes = cmdinfo[1:]
156            cmd = cmdinfo[0]
157        else:
158            exit_codes = [0]
159            cmd = cmdinfo
160
161        if not cmd:
162            continue
163
164        (proc, foutput) = exec_cmd(args, pm, stage, cmd)
165
166        if proc and (proc.returncode not in exit_codes):
167            print('', file=sys.stderr)
168            print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169                  file=sys.stderr)
170            print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171                  file=sys.stderr)
172            print("returncode {}; expected {}".format(proc.returncode,
173                                                      exit_codes))
174            print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
175            print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
176            print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
177            raise PluginMgrTestFail(
178                stage, output,
179                '"{}" did not complete successfully'.format(prefix))
180
181def run_one_test(pm, args, index, tidx):
182    global NAMES
183    result = True
184    tresult = ""
185    tap = ""
186    if args.verbose > 0:
187        print("\t====================\n=====> ", end="")
188    print("Test " + tidx["id"] + ": " + tidx["name"])
189
190    # populate NAMES with TESTID for this test
191    NAMES['TESTID'] = tidx['id']
192
193    pm.call_pre_case(index, tidx['id'])
194    prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
195
196    if (args.verbose > 0):
197        print('-----> execute stage')
198    pm.call_pre_execute()
199    (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
200    if p:
201        exit_code = p.returncode
202    else:
203        exit_code = None
204
205    pm.call_post_execute()
206
207    if (exit_code is None or exit_code != int(tidx["expExitCode"])):
208        result = False
209        print("exit: {!r}".format(exit_code))
210        print("exit: {}".format(int(tidx["expExitCode"])))
211        #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
212        print(procout)
213    else:
214        if args.verbose > 0:
215            print('-----> verify stage')
216        match_pattern = re.compile(
217            str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
218        (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
219        if procout:
220            match_index = re.findall(match_pattern, procout)
221            if len(match_index) != int(tidx["matchCount"]):
222                result = False
223        elif int(tidx["matchCount"]) != 0:
224            result = False
225
226    if not result:
227        tresult += 'not '
228    tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
229    tap += tresult
230
231    if result == False:
232        if procout:
233            tap += procout
234        else:
235            tap += 'No output!\n'
236
237    prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
238    pm.call_post_case()
239
240    index += 1
241
242    # remove TESTID from NAMES
243    del(NAMES['TESTID'])
244    return tap
245
246def test_runner(pm, args, filtered_tests):
247    """
248    Driver function for the unit tests.
249
250    Prints information about the tests being run, executes the setup and
251    teardown commands and the command under test itself. Also determines
252    success/failure based on the information in the test case and generates
253    TAP output accordingly.
254    """
255    testlist = filtered_tests
256    tcount = len(testlist)
257    index = 1
258    tap = ''
259    badtest = None
260    stage = None
261    emergency_exit = False
262    emergency_exit_message = ''
263
264    if args.notap:
265        if args.verbose:
266            tap = 'notap requested:  omitting test plan\n'
267    else:
268        tap = str(index) + ".." + str(tcount) + "\n"
269    try:
270        pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
271    except Exception as ee:
272        ex_type, ex, ex_tb = sys.exc_info()
273        print('Exception {} {} (caught in pre_suite).'.
274              format(ex_type, ex))
275        # when the extra print statements are uncommented,
276        # the traceback does not appear between them
277        # (it appears way earlier in the tdc.py output)
278        # so don't bother ...
279        # print('--------------------(')
280        # print('traceback')
281        traceback.print_tb(ex_tb)
282        # print('--------------------)')
283        emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
284        emergency_exit = True
285        stage = 'pre-SUITE'
286
287    if emergency_exit:
288        pm.call_post_suite(index)
289        return emergency_exit_message
290    if args.verbose > 1:
291        print('give test rig 2 seconds to stabilize')
292    time.sleep(2)
293    for tidx in testlist:
294        if "flower" in tidx["category"] and args.device == None:
295            if args.verbose > 1:
296                print('Not executing test {} {} because DEV2 not defined'.
297                      format(tidx['id'], tidx['name']))
298            continue
299        try:
300            badtest = tidx  # in case it goes bad
301            tap += run_one_test(pm, args, index, tidx)
302        except PluginMgrTestFail as pmtf:
303            ex_type, ex, ex_tb = sys.exc_info()
304            stage = pmtf.stage
305            message = pmtf.message
306            output = pmtf.output
307            print(message)
308            print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
309                  format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
310            print('---------------')
311            print('traceback')
312            traceback.print_tb(ex_tb)
313            print('---------------')
314            if stage == 'teardown':
315                print('accumulated output for this test:')
316                if pmtf.output:
317                    print(pmtf.output)
318            print('---------------')
319            break
320        index += 1
321
322    # if we failed in setup or teardown,
323    # fill in the remaining tests with ok-skipped
324    count = index
325    if not args.notap:
326        tap += 'about to flush the tap output if tests need to be skipped\n'
327        if tcount + 1 != index:
328            for tidx in testlist[index - 1:]:
329                msg = 'skipped - previous {} failed'.format(stage)
330                tap += 'ok {} - {} # {} {} {}\n'.format(
331                    count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
332                count += 1
333
334        tap += 'done flushing skipped test tap output\n'
335
336    if args.pause:
337        print('Want to pause\nPress enter to continue ...')
338        if input(sys.stdin):
339            print('got something on stdin')
340
341    pm.call_post_suite(index)
342
343    return tap
344
345def has_blank_ids(idlist):
346    """
347    Search the list for empty ID fields and return true/false accordingly.
348    """
349    return not(all(k for k in idlist))
350
351
352def load_from_file(filename):
353    """
354    Open the JSON file containing the test cases and return them
355    as list of ordered dictionary objects.
356    """
357    try:
358        with open(filename) as test_data:
359            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
360    except json.JSONDecodeError as jde:
361        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
362        testlist = list()
363    else:
364        idlist = get_id_list(testlist)
365        if (has_blank_ids(idlist)):
366            for k in testlist:
367                k['filename'] = filename
368    return testlist
369
370
371def args_parse():
372    """
373    Create the argument parser.
374    """
375    parser = argparse.ArgumentParser(description='Linux TC unit tests')
376    return parser
377
378
379def set_args(parser):
380    """
381    Set the command line arguments for tdc.
382    """
383    parser.add_argument(
384        '-p', '--path', type=str,
385        help='The full path to the tc executable to use')
386    sg = parser.add_argument_group(
387        'selection', 'select which test cases: ' +
388        'files plus directories; filtered by categories plus testids')
389    ag = parser.add_argument_group(
390        'action', 'select action to perform on selected test cases')
391
392    sg.add_argument(
393        '-D', '--directory', nargs='+', metavar='DIR',
394        help='Collect tests from the specified directory(ies) ' +
395        '(default [tc-tests])')
396    sg.add_argument(
397        '-f', '--file', nargs='+', metavar='FILE',
398        help='Run tests from the specified file(s)')
399    sg.add_argument(
400        '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
401        help='Run tests only from the specified category/ies, ' +
402        'or if no category/ies is/are specified, list known categories.')
403    sg.add_argument(
404        '-e', '--execute', nargs='+', metavar='ID',
405        help='Execute the specified test cases with specified IDs')
406    ag.add_argument(
407        '-l', '--list', action='store_true',
408        help='List all test cases, or those only within the specified category')
409    ag.add_argument(
410        '-s', '--show', action='store_true', dest='showID',
411        help='Display the selected test cases')
412    ag.add_argument(
413        '-i', '--id', action='store_true', dest='gen_id',
414        help='Generate ID numbers for new test cases')
415    parser.add_argument(
416        '-v', '--verbose', action='count', default=0,
417        help='Show the commands that are being run')
418    parser.add_argument(
419        '-N', '--notap', action='store_true',
420        help='Suppress tap results for command under test')
421    parser.add_argument('-d', '--device',
422                        help='Execute the test case in flower category')
423    parser.add_argument(
424        '-P', '--pause', action='store_true',
425        help='Pause execution just before post-suite stage')
426    return parser
427
428
429def check_default_settings(args, remaining, pm):
430    """
431    Process any arguments overriding the default settings,
432    and ensure the settings are correct.
433    """
434    # Allow for overriding specific settings
435    global NAMES
436
437    if args.path != None:
438        NAMES['TC'] = args.path
439    if args.device != None:
440        NAMES['DEV2'] = args.device
441    if not os.path.isfile(NAMES['TC']):
442        print("The specified tc path " + NAMES['TC'] + " does not exist.")
443        exit(1)
444
445    pm.call_check_args(args, remaining)
446
447
448def get_id_list(alltests):
449    """
450    Generate a list of all IDs in the test cases.
451    """
452    return [x["id"] for x in alltests]
453
454
455def check_case_id(alltests):
456    """
457    Check for duplicate test case IDs.
458    """
459    idl = get_id_list(alltests)
460    return [x for x in idl if idl.count(x) > 1]
461
462
463def does_id_exist(alltests, newid):
464    """
465    Check if a given ID already exists in the list of test cases.
466    """
467    idl = get_id_list(alltests)
468    return (any(newid == x for x in idl))
469
470
471def generate_case_ids(alltests):
472    """
473    If a test case has a blank ID field, generate a random hex ID for it
474    and then write the test cases back to disk.
475    """
476    import random
477    for c in alltests:
478        if (c["id"] == ""):
479            while True:
480                newid = str('{:04x}'.format(random.randrange(16**4)))
481                if (does_id_exist(alltests, newid)):
482                    continue
483                else:
484                    c['id'] = newid
485                    break
486
487    ufilename = []
488    for c in alltests:
489        if ('filename' in c):
490            ufilename.append(c['filename'])
491    ufilename = get_unique_item(ufilename)
492    for f in ufilename:
493        testlist = []
494        for t in alltests:
495            if 'filename' in t:
496                if t['filename'] == f:
497                    del t['filename']
498                    testlist.append(t)
499        outfile = open(f, "w")
500        json.dump(testlist, outfile, indent=4)
501        outfile.write("\n")
502        outfile.close()
503
504def filter_tests_by_id(args, testlist):
505    '''
506    Remove tests from testlist that are not in the named id list.
507    If id list is empty, return empty list.
508    '''
509    newlist = list()
510    if testlist and args.execute:
511        target_ids = args.execute
512
513        if isinstance(target_ids, list) and (len(target_ids) > 0):
514            newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
515    return newlist
516
517def filter_tests_by_category(args, testlist):
518    '''
519    Remove tests from testlist that are not in a named category.
520    '''
521    answer = list()
522    if args.category and testlist:
523        test_ids = list()
524        for catg in set(args.category):
525            if catg == '+c':
526                continue
527            print('considering category {}'.format(catg))
528            for tc in testlist:
529                if catg in tc['category'] and tc['id'] not in test_ids:
530                    answer.append(tc)
531                    test_ids.append(tc['id'])
532
533    return answer
534
535def get_test_cases(args):
536    """
537    If a test case file is specified, retrieve tests from that file.
538    Otherwise, glob for all json files in subdirectories and load from
539    each one.
540    Also, if requested, filter by category, and add tests matching
541    certain ids.
542    """
543    import fnmatch
544
545    flist = []
546    testdirs = ['tc-tests']
547
548    if args.file:
549        # at least one file was specified - remove the default directory
550        testdirs = []
551
552        for ff in args.file:
553            if not os.path.isfile(ff):
554                print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
555            else:
556                flist.append(os.path.abspath(ff))
557
558    if args.directory:
559        testdirs = args.directory
560
561    for testdir in testdirs:
562        for root, dirnames, filenames in os.walk(testdir):
563            for filename in fnmatch.filter(filenames, '*.json'):
564                candidate = os.path.abspath(os.path.join(root, filename))
565                if candidate not in testdirs:
566                    flist.append(candidate)
567
568    alltestcases = list()
569    for casefile in flist:
570        alltestcases = alltestcases + (load_from_file(casefile))
571
572    allcatlist = get_test_categories(alltestcases)
573    allidlist = get_id_list(alltestcases)
574
575    testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
576    idtestcases = filter_tests_by_id(args, alltestcases)
577    cattestcases = filter_tests_by_category(args, alltestcases)
578
579    cat_ids = [x['id'] for x in cattestcases]
580    if args.execute:
581        if args.category:
582            alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
583        else:
584            alltestcases = idtestcases
585    else:
586        if cat_ids:
587            alltestcases = cattestcases
588        else:
589            # just accept the existing value of alltestcases,
590            # which has been filtered by file/directory
591            pass
592
593    return allcatlist, allidlist, testcases_by_cats, alltestcases
594
595
596def set_operation_mode(pm, args):
597    """
598    Load the test case data and process remaining arguments to determine
599    what the script should do for this run, and call the appropriate
600    function.
601    """
602    ucat, idlist, testcases, alltests = get_test_cases(args)
603
604    if args.gen_id:
605        if (has_blank_ids(idlist)):
606            alltests = generate_case_ids(alltests)
607        else:
608            print("No empty ID fields found in test files.")
609        exit(0)
610
611    duplicate_ids = check_case_id(alltests)
612    if (len(duplicate_ids) > 0):
613        print("The following test case IDs are not unique:")
614        print(str(set(duplicate_ids)))
615        print("Please correct them before continuing.")
616        exit(1)
617
618    if args.showID:
619        for atest in alltests:
620            print_test_case(atest)
621        exit(0)
622
623    if isinstance(args.category, list) and (len(args.category) == 0):
624        print("Available categories:")
625        print_sll(ucat)
626        exit(0)
627
628    if args.list:
629        if args.list:
630            list_test_cases(alltests)
631            exit(0)
632
633    if len(alltests):
634        catresults = test_runner(pm, args, alltests)
635    else:
636        catresults = 'No tests found\n'
637    if args.notap:
638        print('Tap output suppression requested\n')
639    else:
640        print('All test results: \n\n{}'.format(catresults))
641
642def main():
643    """
644    Start of execution; set up argument parser and get the arguments,
645    and start operations.
646    """
647    parser = args_parse()
648    parser = set_args(parser)
649    pm = PluginMgr(parser)
650    parser = pm.call_add_args(parser)
651    (args, remaining) = parser.parse_known_args()
652    args.NAMES = NAMES
653    check_default_settings(args, remaining, pm)
654    if args.verbose > 2:
655        print('args is {}'.format(args))
656
657    set_operation_mode(pm, args)
658
659    exit(0)
660
661
662if __name__ == "__main__":
663    main()
664