1#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3
4"""
5tdc.py - Linux tc (Traffic Control) unit test driver
6
7Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8"""
9
10import re
11import os
12import sys
13import argparse
14import json
15import subprocess
16from collections import OrderedDict
17from string import Template
18
19from tdc_config import *
20from tdc_helper import *
21
22
23USE_NS = True
24
25
26def replace_keywords(cmd):
27    """
28    For a given executable command, substitute any known
29    variables contained within NAMES with the correct values
30    """
31    tcmd = Template(cmd)
32    subcmd = tcmd.safe_substitute(NAMES)
33    return subcmd
34
35
36def exec_cmd(command, nsonly=True):
37    """
38    Perform any required modifications on an executable command, then run
39    it in a subprocess and return the results.
40    """
41    if (USE_NS and nsonly):
42        command = 'ip netns exec $NS ' + command
43
44    if '$' in command:
45        command = replace_keywords(command)
46
47    proc = subprocess.Popen(command,
48        shell=True,
49        stdout=subprocess.PIPE,
50        stderr=subprocess.PIPE)
51    (rawout, serr) = proc.communicate()
52
53    if proc.returncode != 0 and len(serr) > 0:
54        foutput = serr.decode("utf-8")
55    else:
56        foutput = rawout.decode("utf-8")
57
58    proc.stdout.close()
59    proc.stderr.close()
60    return proc, foutput
61
62
63def prepare_env(cmdlist):
64    """
65    Execute the setup/teardown commands for a test case. Optionally
66    terminate test execution if the command fails.
67    """
68    for cmdinfo in cmdlist:
69        if (type(cmdinfo) == list):
70            exit_codes = cmdinfo[1:]
71            cmd = cmdinfo[0]
72        else:
73            exit_codes = [0]
74            cmd = cmdinfo
75
76        if (len(cmd) == 0):
77            continue
78
79        (proc, foutput) = exec_cmd(cmd)
80
81        if proc.returncode not in exit_codes:
82            print
83            print("Could not execute:")
84            print(cmd)
85            print("\nError message:")
86            print(foutput)
87            print("\nAborting test run.")
88            ns_destroy()
89            exit(1)
90
91
92def test_runner(filtered_tests, args):
93    """
94    Driver function for the unit tests.
95
96    Prints information about the tests being run, executes the setup and
97    teardown commands and the command under test itself. Also determines
98    success/failure based on the information in the test case and generates
99    TAP output accordingly.
100    """
101    testlist = filtered_tests
102    tcount = len(testlist)
103    index = 1
104    tap = str(index) + ".." + str(tcount) + "\n"
105
106    for tidx in testlist:
107        result = True
108        tresult = ""
109        if "flower" in tidx["category"] and args.device == None:
110            continue
111        print("Test " + tidx["id"] + ": " + tidx["name"])
112        prepare_env(tidx["setup"])
113        (p, procout) = exec_cmd(tidx["cmdUnderTest"])
114        exit_code = p.returncode
115
116        if (exit_code != int(tidx["expExitCode"])):
117            result = False
118            print("exit:", exit_code, int(tidx["expExitCode"]))
119            print(procout)
120        else:
121            match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
122            (p, procout) = exec_cmd(tidx["verifyCmd"])
123            match_index = re.findall(match_pattern, procout)
124            if len(match_index) != int(tidx["matchCount"]):
125                result = False
126
127        if result == True:
128            tresult += "ok "
129        else:
130            tresult += "not ok "
131        tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
132
133        if result == False:
134            tap += procout
135
136        prepare_env(tidx["teardown"])
137        index += 1
138
139    return tap
140
141
142def ns_create():
143    """
144    Create the network namespace in which the tests will be run and set up
145    the required network devices for it.
146    """
147    if (USE_NS):
148        cmd = 'ip netns add $NS'
149        exec_cmd(cmd, False)
150        cmd = 'ip link add $DEV0 type veth peer name $DEV1'
151        exec_cmd(cmd, False)
152        cmd = 'ip link set $DEV1 netns $NS'
153        exec_cmd(cmd, False)
154        cmd = 'ip link set $DEV0 up'
155        exec_cmd(cmd, False)
156        cmd = 'ip -n $NS link set $DEV1 up'
157        exec_cmd(cmd, False)
158        cmd = 'ip link set $DEV2 netns $NS'
159        exec_cmd(cmd, False)
160        cmd = 'ip -n $NS link set $DEV2 up'
161        exec_cmd(cmd, False)
162
163
164def ns_destroy():
165    """
166    Destroy the network namespace for testing (and any associated network
167    devices as well)
168    """
169    if (USE_NS):
170        cmd = 'ip netns delete $NS'
171        exec_cmd(cmd, False)
172
173
174def has_blank_ids(idlist):
175    """
176    Search the list for empty ID fields and return true/false accordingly.
177    """
178    return not(all(k for k in idlist))
179
180
181def load_from_file(filename):
182    """
183    Open the JSON file containing the test cases and return them
184    as list of ordered dictionary objects.
185    """
186    try:
187        with open(filename) as test_data:
188            testlist = json.load(test_data, object_pairs_hook=OrderedDict)
189    except json.JSONDecodeError as jde:
190        print('IGNORING test case file {}\n\tBECAUSE:  {}'.format(filename, jde))
191        testlist = list()
192    else:
193        idlist = get_id_list(testlist)
194        if (has_blank_ids(idlist)):
195            for k in testlist:
196                k['filename'] = filename
197    return testlist
198
199
200def args_parse():
201    """
202    Create the argument parser.
203    """
204    parser = argparse.ArgumentParser(description='Linux TC unit tests')
205    return parser
206
207
208def set_args(parser):
209    """
210    Set the command line arguments for tdc.
211    """
212    parser.add_argument('-p', '--path', type=str,
213                        help='The full path to the tc executable to use')
214    parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
215                        help='Run tests only from the specified category, or if no category is specified, list known categories.')
216    parser.add_argument('-f', '--file', type=str,
217                        help='Run tests from the specified file')
218    parser.add_argument('-l', '--list', type=str, nargs='?', const="++", metavar='CATEGORY',
219                        help='List all test cases, or those only within the specified category')
220    parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
221                        help='Display the test case with specified id')
222    parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
223                        help='Execute the single test case with specified ID')
224    parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
225                        help='Generate ID numbers for new test cases')
226    parser.add_argument('-d', '--device',
227                        help='Execute the test case in flower category')
228    return parser
229
230
231def check_default_settings(args):
232    """
233    Process any arguments overriding the default settings, and ensure the
234    settings are correct.
235    """
236    # Allow for overriding specific settings
237    global NAMES
238
239    if args.path != None:
240         NAMES['TC'] = args.path
241    if args.device != None:
242         NAMES['DEV2'] = args.device
243    if not os.path.isfile(NAMES['TC']):
244        print("The specified tc path " + NAMES['TC'] + " does not exist.")
245        exit(1)
246
247
248def get_id_list(alltests):
249    """
250    Generate a list of all IDs in the test cases.
251    """
252    return [x["id"] for x in alltests]
253
254
255def check_case_id(alltests):
256    """
257    Check for duplicate test case IDs.
258    """
259    idl = get_id_list(alltests)
260    return [x for x in idl if idl.count(x) > 1]
261
262
263def does_id_exist(alltests, newid):
264    """
265    Check if a given ID already exists in the list of test cases.
266    """
267    idl = get_id_list(alltests)
268    return (any(newid == x for x in idl))
269
270
271def generate_case_ids(alltests):
272    """
273    If a test case has a blank ID field, generate a random hex ID for it
274    and then write the test cases back to disk.
275    """
276    import random
277    for c in alltests:
278        if (c["id"] == ""):
279            while True:
280                newid = str('%04x' % random.randrange(16**4))
281                if (does_id_exist(alltests, newid)):
282                    continue
283                else:
284                    c['id'] = newid
285                    break
286
287    ufilename = []
288    for c in alltests:
289        if ('filename' in c):
290            ufilename.append(c['filename'])
291    ufilename = get_unique_item(ufilename)
292    for f in ufilename:
293        testlist = []
294        for t in alltests:
295            if 'filename' in t:
296                if t['filename'] == f:
297                    del t['filename']
298                    testlist.append(t)
299        outfile = open(f, "w")
300        json.dump(testlist, outfile, indent=4)
301        outfile.close()
302
303
304def get_test_cases(args):
305    """
306    If a test case file is specified, retrieve tests from that file.
307    Otherwise, glob for all json files in subdirectories and load from
308    each one.
309    """
310    import fnmatch
311    if args.file != None:
312        if not os.path.isfile(args.file):
313            print("The specified test case file " + args.file + " does not exist.")
314            exit(1)
315        flist = [args.file]
316    else:
317        flist = []
318        for root, dirnames, filenames in os.walk('tc-tests'):
319            for filename in fnmatch.filter(filenames, '*.json'):
320                flist.append(os.path.join(root, filename))
321    alltests = list()
322    for casefile in flist:
323        alltests = alltests + (load_from_file(casefile))
324    return alltests
325
326
327def set_operation_mode(args):
328    """
329    Load the test case data and process remaining arguments to determine
330    what the script should do for this run, and call the appropriate
331    function.
332    """
333    alltests = get_test_cases(args)
334
335    if args.gen_id:
336        idlist = get_id_list(alltests)
337        if (has_blank_ids(idlist)):
338            alltests = generate_case_ids(alltests)
339        else:
340            print("No empty ID fields found in test files.")
341        exit(0)
342
343    duplicate_ids = check_case_id(alltests)
344    if (len(duplicate_ids) > 0):
345        print("The following test case IDs are not unique:")
346        print(str(set(duplicate_ids)))
347        print("Please correct them before continuing.")
348        exit(1)
349
350    ucat = get_test_categories(alltests)
351
352    if args.showID:
353        show_test_case_by_id(alltests, args.showID[0])
354        exit(0)
355
356    if args.execute:
357        target_id = args.execute[0]
358    else:
359        target_id = ""
360
361    if args.category:
362        if (args.category == '+c'):
363            print("Available categories:")
364            print_sll(ucat)
365            exit(0)
366        else:
367            target_category = args.category
368    else:
369        target_category = ""
370
371
372    testcases = get_categorized_testlist(alltests, ucat)
373
374    if args.list:
375        if (args.list == "++"):
376            list_test_cases(alltests)
377            exit(0)
378        elif(len(args.list) > 0):
379            if (args.list not in ucat):
380                print("Unknown category " + args.list)
381                print("Available categories:")
382                print_sll(ucat)
383                exit(1)
384            list_test_cases(testcases[args.list])
385            exit(0)
386
387    if (os.geteuid() != 0):
388        print("This script must be run with root privileges.\n")
389        exit(1)
390
391    ns_create()
392
393    if (len(target_category) == 0):
394        if (len(target_id) > 0):
395            alltests = list(filter(lambda x: target_id in x['id'], alltests))
396            if (len(alltests) == 0):
397                print("Cannot find a test case with ID matching " + target_id)
398                exit(1)
399        catresults = test_runner(alltests, args)
400        print("All test results: " + "\n\n" + catresults)
401    elif (len(target_category) > 0):
402        if (target_category == "flower") and args.device == None:
403            print("Please specify a NIC device (-d) to run category flower")
404            exit(1)
405        if (target_category not in ucat):
406            print("Specified category is not present in this file.")
407            exit(1)
408        else:
409            catresults = test_runner(testcases[target_category], args)
410            print("Category " + target_category + "\n\n" + catresults)
411
412    ns_destroy()
413
414
415def main():
416    """
417    Start of execution; set up argument parser and get the arguments,
418    and start operations.
419    """
420    parser = args_parse()
421    parser = set_args(parser)
422    (args, remaining) = parser.parse_known_args()
423    check_default_settings(args)
424
425    set_operation_mode(args)
426
427    exit(0)
428
429
430if __name__ == "__main__":
431    main()
432