tools: tc-testing: Introduce plugin architecture
This should be a general test architecture, and yet allow specific
tests to be done. Introduce a plugin architecture.
An individual test has 4 stages, setup/execute/verify/teardown. Each
plugin gets a chance to run a function at each stage, plus one call
before all the tests are called ("pre" suite) and one after all the
tests are called ("post" suite). In addition, just before each
command is executed, the plugin gets a chance to modify the command
using the "adjust_command" hook. This makes the test suite quite
flexible.
Future patches will take some functionality out of the tdc.py script and
place it in plugins.
To use the plugins, place the implementation in the plugins directory
and run tdc.py. It will notice the plugins and use them.
Signed-off-by: Brenda J. Butler <bjb@mojatatu.com>
Acked-by: Lucas Bates <lucasb@mojatatu.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/tools/testing/selftests/tc-testing/tdc.py b/tools/testing/selftests/tc-testing/tdc.py
index a2624ed..3e6f9f2 100755
--- a/tools/testing/selftests/tc-testing/tdc.py
+++ b/tools/testing/selftests/tc-testing/tdc.py
@@ -11,17 +11,91 @@
import os
import sys
import argparse
+import importlib
import json
import subprocess
+import time
from collections import OrderedDict
from string import Template
from tdc_config import *
from tdc_helper import *
+import TdcPlugin
USE_NS = True
+class PluginMgr:
+ def __init__(self, argparser):
+ super().__init__()
+ self.plugins = {}
+ self.plugin_instances = []
+ self.args = []
+ self.argparser = argparser
+
+ # TODO, put plugins in order
+ plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
+ for dirpath, dirnames, filenames in os.walk(plugindir):
+ for fn in filenames:
+ if (fn.endswith('.py') and
+ not fn == '__init__.py' and
+ not fn.startswith('#') and
+ not fn.startswith('.#')):
+ mn = fn[0:-3]
+ foo = importlib.import_module('plugins.' + mn)
+ self.plugins[mn] = foo
+ self.plugin_instances.append(foo.SubPlugin())
+
+ def call_pre_suite(self, testcount, testidlist):
+ for pgn_inst in self.plugin_instances:
+ pgn_inst.pre_suite(testcount, testidlist)
+
+ def call_post_suite(self, index):
+ for pgn_inst in reversed(self.plugin_instances):
+ pgn_inst.post_suite(index)
+
+ def call_pre_case(self, test_ordinal, testid):
+ for pgn_inst in self.plugin_instances:
+ try:
+ pgn_inst.pre_case(test_ordinal, testid)
+ except Exception as ee:
+ print('exception {} in call to pre_case for {} plugin'.
+ format(ee, pgn_inst.__class__))
+ print('test_ordinal is {}'.format(test_ordinal))
+ print('testid is {}'.format(testid))
+ raise
+
+ def call_post_case(self):
+ for pgn_inst in reversed(self.plugin_instances):
+ pgn_inst.post_case()
+
+ def call_pre_execute(self):
+ for pgn_inst in self.plugin_instances:
+ pgn_inst.pre_execute()
+
+ def call_post_execute(self):
+ for pgn_inst in reversed(self.plugin_instances):
+ pgn_inst.post_execute()
+
+ def call_add_args(self, parser):
+ for pgn_inst in self.plugin_instances:
+ parser = pgn_inst.add_args(parser)
+ return parser
+
+ def call_check_args(self, args, remaining):
+ for pgn_inst in self.plugin_instances:
+ pgn_inst.check_args(args, remaining)
+
+ def call_adjust_command(self, stage, command):
+ for pgn_inst in self.plugin_instances:
+ command = pgn_inst.adjust_command(stage, command)
+ return command
+
+ @staticmethod
+ def _make_argparser(args):
+ self.argparser = argparse.ArgumentParser(
+ description='Linux TC unit tests')
+
def replace_keywords(cmd):
"""
@@ -33,21 +107,27 @@
return subcmd
-def exec_cmd(command, nsonly=True):
+def exec_cmd(args, pm, stage, command, nsonly=True):
"""
Perform any required modifications on an executable command, then run
it in a subprocess and return the results.
"""
+ if len(command.strip()) == 0:
+ return None, None
if (USE_NS and nsonly):
command = 'ip netns exec $NS ' + command
if '$' in command:
command = replace_keywords(command)
+ command = pm.call_adjust_command(stage, command)
+ if args.verbose > 0:
+ print('command "{}"'.format(command))
proc = subprocess.Popen(command,
shell=True,
stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stderr=subprocess.PIPE,
+ env=ENVIR)
(rawout, serr) = proc.communicate()
if proc.returncode != 0 and len(serr) > 0:
@@ -60,69 +140,85 @@
return proc, foutput
-def prepare_env(cmdlist):
+def prepare_env(args, pm, stage, prefix, cmdlist):
"""
- Execute the setup/teardown commands for a test case. Optionally
- terminate test execution if the command fails.
+ Execute the setup/teardown commands for a test case.
+ Optionally terminate test execution if the command fails.
"""
+ if args.verbose > 0:
+ print('{}'.format(prefix))
for cmdinfo in cmdlist:
- if (type(cmdinfo) == list):
+ if isinstance(cmdinfo, list):
exit_codes = cmdinfo[1:]
cmd = cmdinfo[0]
else:
exit_codes = [0]
cmd = cmdinfo
- if (len(cmd) == 0):
+ if not cmd:
continue
- (proc, foutput) = exec_cmd(cmd)
+ (proc, foutput) = exec_cmd(args, pm, stage, cmd)
- if proc.returncode not in exit_codes:
- print
- print("Could not execute:")
- print(cmd)
- print("\nError message:")
- print(foutput)
- print("\nAborting test run.")
- # ns_destroy()
- raise Exception('prepare_env did not complete successfully')
+ if proc and (proc.returncode not in exit_codes):
+ print('', file=sys.stderr)
+ print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
+ file=sys.stderr)
+ print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
+ file=sys.stderr)
+ print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
+ print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
+ print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
+ raise Exception('"{}" did not complete successfully'.format(prefix))
-def run_one_test(index, tidx):
+def run_one_test(pm, args, index, tidx):
result = True
tresult = ""
tap = ""
+ if args.verbose > 0:
+ print("\t====================\n=====> ", end="")
print("Test " + tidx["id"] + ": " + tidx["name"])
- prepare_env(tidx["setup"])
- (p, procout) = exec_cmd(tidx["cmdUnderTest"])
+
+ pm.call_pre_case(index, tidx['id'])
+ prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
+
+ if (args.verbose > 0):
+ print('-----> execute stage')
+ pm.call_pre_execute()
+ (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
exit_code = p.returncode
+ pm.call_post_execute()
if (exit_code != int(tidx["expExitCode"])):
result = False
print("exit:", exit_code, int(tidx["expExitCode"]))
print(procout)
else:
- match_pattern = re.compile(str(tidx["matchPattern"]),
- re.DOTALL | re.MULTILINE)
- (p, procout) = exec_cmd(tidx["verifyCmd"])
+ if args.verbose > 0:
+ print('-----> verify stage')
+ match_pattern = re.compile(
+ str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
+ (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
match_index = re.findall(match_pattern, procout)
if len(match_index) != int(tidx["matchCount"]):
result = False
if not result:
- tresult += "not "
- tresult += "ok {} - {} # {}\n".format(str(index), tidx['id'], tidx["name"])
+ tresult += 'not '
+ tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
tap += tresult
if result == False:
tap += procout
- prepare_env(tidx["teardown"])
+ prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'])
+ pm.call_post_case()
+
index += 1
return tap
-def test_runner(filtered_tests, args):
+def test_runner(pm, args, filtered_tests):
"""
Driver function for the unit tests.
@@ -135,63 +231,71 @@
tcount = len(testlist)
index = 1
tap = str(index) + ".." + str(tcount) + "\n"
+ badtest = None
+ pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
+
+ if args.verbose > 1:
+ print('Run tests here')
for tidx in testlist:
if "flower" in tidx["category"] and args.device == None:
continue
try:
badtest = tidx # in case it goes bad
- tap += run_one_test(index, tidx)
+ tap += run_one_test(pm, args, index, tidx)
except Exception as ee:
print('Exception {} (caught in test_runner, running test {} {} {})'.
format(ee, index, tidx['id'], tidx['name']))
break
index += 1
+ # if we failed in setup or teardown,
+ # fill in the remaining tests with not ok
count = index
tap += 'about to flush the tap output if tests need to be skipped\n'
if tcount + 1 != index:
for tidx in testlist[index - 1:]:
msg = 'skipped - previous setup or teardown failed'
- tap += 'ok {} - {} # {} {} {} \n'.format(
+ tap += 'ok {} - {} # {} {} {}\n'.format(
count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
count += 1
tap += 'done flushing skipped test tap output\n'
+ pm.call_post_suite(index)
return tap
-def ns_create():
+def ns_create(args, pm):
"""
Create the network namespace in which the tests will be run and set up
the required network devices for it.
"""
if (USE_NS):
cmd = 'ip netns add $NS'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
cmd = 'ip link add $DEV0 type veth peer name $DEV1'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
cmd = 'ip link set $DEV1 netns $NS'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
cmd = 'ip link set $DEV0 up'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
cmd = 'ip -n $NS link set $DEV1 up'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
cmd = 'ip link set $DEV2 netns $NS'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
cmd = 'ip -n $NS link set $DEV2 up'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'pre', cmd, False)
-def ns_destroy():
+def ns_destroy(args, pm):
"""
Destroy the network namespace for testing (and any associated network
devices as well)
"""
if (USE_NS):
cmd = 'ip netns delete $NS'
- exec_cmd(cmd, False)
+ exec_cmd(args, pm, 'post', cmd, False)
def has_blank_ids(idlist):
@@ -272,10 +376,10 @@
return parser
-def check_default_settings(args):
+def check_default_settings(args, remaining, pm):
"""
- Process any arguments overriding the default settings, and ensure the
- settings are correct.
+ Process any arguments overriding the default settings,
+ and ensure the settings are correct.
"""
# Allow for overriding specific settings
global NAMES
@@ -288,6 +392,8 @@
print("The specified tc path " + NAMES['TC'] + " does not exist.")
exit(1)
+ pm.call_check_args(args, remaining)
+
def get_id_list(alltests):
"""
@@ -301,16 +407,7 @@
Check for duplicate test case IDs.
"""
idl = get_id_list(alltests)
- # print('check_case_id: idl is {}'.format(idl))
- # answer = list()
- # for x in idl:
- # print('Looking at {}'.format(x))
- # print('what the heck is idl.count(x)??? {}'.format(idl.count(x)))
- # if idl.count(x) > 1:
- # answer.append(x)
- # print(' ... append it {}'.format(x))
return [x for x in idl if idl.count(x) > 1]
- return answer
def does_id_exist(alltests, newid):
@@ -403,7 +500,7 @@
for ff in args.file:
if not os.path.isfile(ff):
- print("IGNORING file " + ff + " \n\tBECAUSE does not exist.")
+ print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
else:
flist.append(os.path.abspath(ff))
@@ -445,7 +542,7 @@
return allcatlist, allidlist, testcases_by_cats, alltestcases
-def set_operation_mode(args):
+def set_operation_mode(pm, args):
"""
Load the test case data and process remaining arguments to determine
what the script should do for this run, and call the appropriate
@@ -486,12 +583,15 @@
print("This script must be run with root privileges.\n")
exit(1)
- ns_create()
+ ns_create(args, pm)
- catresults = test_runner(alltests, args)
+ if len(alltests):
+ catresults = test_runner(pm, args, alltests)
+ else:
+ catresults = 'No tests found\n'
print('All test results: \n\n{}'.format(catresults))
- ns_destroy()
+ ns_destroy(args, pm)
def main():
@@ -501,10 +601,15 @@
"""
parser = args_parse()
parser = set_args(parser)
+ pm = PluginMgr(parser)
+ parser = pm.call_add_args(parser)
(args, remaining) = parser.parse_known_args()
- check_default_settings(args)
+ args.NAMES = NAMES
+ check_default_settings(args, remaining, pm)
+ if args.verbose > 2:
+ print('args is {}'.format(args))
- set_operation_mode(args)
+ set_operation_mode(pm, args)
exit(0)