Branden Bonaby | c48d8b0 | 2019-10-03 17:02:05 -0400 | [diff] [blame] | 1 | #!/usr/bin/env python3 |
| 2 | # SPDX-License-Identifier: GPL-2.0 |
| 3 | # |
| 4 | # Program to allow users to fuzz test Hyper-V drivers |
| 5 | # by interfacing with Hyper-V debugfs attributes. |
| 6 | # Current test methods available: |
| 7 | # 1. delay testing |
| 8 | # |
| 9 | # Current file/directory structure of hyper-V debugfs: |
| 10 | # /sys/kernel/debug/hyperv/UUID |
| 11 | # /sys/kernel/debug/hyperv/UUID/<test-state filename> |
| 12 | # /sys/kernel/debug/hyperv/UUID/<test-method sub-directory> |
| 13 | # |
| 14 | # author: Branden Bonaby <brandonbonaby94@gmail.com> |
| 15 | |
| 16 | import os |
| 17 | import cmd |
| 18 | import argparse |
| 19 | import glob |
| 20 | from argparse import RawDescriptionHelpFormatter |
| 21 | from argparse import RawTextHelpFormatter |
| 22 | from enum import Enum |
| 23 | |
| 24 | # Do not change unless, you change the debugfs attributes |
| 25 | # in /drivers/hv/debugfs.c. All fuzz testing |
| 26 | # attributes will start with "fuzz_test". |
| 27 | |
| 28 | # debugfs path for hyperv must exist before proceeding |
| 29 | debugfs_hyperv_path = "/sys/kernel/debug/hyperv" |
| 30 | if not os.path.isdir(debugfs_hyperv_path): |
| 31 | print("{} doesn't exist/check permissions".format(debugfs_hyperv_path)) |
| 32 | exit(-1) |
| 33 | |
| 34 | class dev_state(Enum): |
| 35 | off = 0 |
| 36 | on = 1 |
| 37 | |
| 38 | # File names, that correspond to the files created in |
| 39 | # /drivers/hv/debugfs.c |
| 40 | class f_names(Enum): |
| 41 | state_f = "fuzz_test_state" |
| 42 | buff_f = "fuzz_test_buffer_interrupt_delay" |
| 43 | mess_f = "fuzz_test_message_delay" |
| 44 | |
| 45 | # Both single_actions and all_actions are used |
| 46 | # for error checking and to allow for some subparser |
| 47 | # names to be abbreviated. Do not abbreviate the |
| 48 | # test method names, as it will become less intuitive |
| 49 | # as to what the user can do. If you do decide to |
| 50 | # abbreviate the test method name, make sure the main |
| 51 | # function reflects this change. |
| 52 | |
| 53 | all_actions = [ |
| 54 | "disable_all", |
| 55 | "D", |
| 56 | "enable_all", |
| 57 | "view_all", |
| 58 | "V" |
| 59 | ] |
| 60 | |
| 61 | single_actions = [ |
| 62 | "disable_single", |
| 63 | "d", |
| 64 | "enable_single", |
| 65 | "view_single", |
| 66 | "v" |
| 67 | ] |
| 68 | |
| 69 | def main(): |
| 70 | |
| 71 | file_map = recursive_file_lookup(debugfs_hyperv_path, dict()) |
| 72 | args = parse_args() |
| 73 | if (not args.action): |
| 74 | print ("Error, no options selected...exiting") |
| 75 | exit(-1) |
| 76 | arg_set = { k for (k,v) in vars(args).items() if v and k != "action" } |
| 77 | arg_set.add(args.action) |
| 78 | path = args.path if "path" in arg_set else None |
| 79 | if (path and path[-1] == "/"): |
| 80 | path = path[:-1] |
| 81 | validate_args_path(path, arg_set, file_map) |
| 82 | if (path and "enable_single" in arg_set): |
| 83 | state_path = locate_state(path, file_map) |
| 84 | set_test_state(state_path, dev_state.on.value, args.quiet) |
| 85 | |
| 86 | # Use subparsers as the key for different actions |
| 87 | if ("delay" in arg_set): |
| 88 | validate_delay_values(args.delay_time) |
| 89 | if (args.enable_all): |
| 90 | set_delay_all_devices(file_map, args.delay_time, |
| 91 | args.quiet) |
| 92 | else: |
| 93 | set_delay_values(path, file_map, args.delay_time, |
| 94 | args.quiet) |
| 95 | elif ("disable_all" in arg_set or "D" in arg_set): |
| 96 | disable_all_testing(file_map) |
| 97 | elif ("disable_single" in arg_set or "d" in arg_set): |
| 98 | disable_testing_single_device(path, file_map) |
| 99 | elif ("view_all" in arg_set or "V" in arg_set): |
| 100 | get_all_devices_test_status(file_map) |
| 101 | elif ("view_single" in arg_set or "v" in arg_set): |
| 102 | get_device_test_values(path, file_map) |
| 103 | |
| 104 | # Get the state location |
| 105 | def locate_state(device, file_map): |
| 106 | return file_map[device][f_names.state_f.value] |
| 107 | |
| 108 | # Validate delay values to make sure they are acceptable to |
| 109 | # enable delays on a device |
| 110 | def validate_delay_values(delay): |
| 111 | |
| 112 | if (delay[0] == -1 and delay[1] == -1): |
| 113 | print("\nError, At least 1 value must be greater than 0") |
| 114 | exit(-1) |
| 115 | for i in delay: |
| 116 | if (i < -1 or i == 0 or i > 1000): |
| 117 | print("\nError, Values must be equal to -1 " |
| 118 | "or be > 0 and <= 1000") |
| 119 | exit(-1) |
| 120 | |
| 121 | # Validate argument path |
| 122 | def validate_args_path(path, arg_set, file_map): |
| 123 | |
| 124 | if (not path and any(element in arg_set for element in single_actions)): |
| 125 | print("Error, path (-p) REQUIRED for the specified option. " |
| 126 | "Use (-h) to check usage.") |
| 127 | exit(-1) |
| 128 | elif (path and any(item in arg_set for item in all_actions)): |
| 129 | print("Error, path (-p) NOT REQUIRED for the specified option. " |
| 130 | "Use (-h) to check usage." ) |
| 131 | exit(-1) |
| 132 | elif (path not in file_map and any(item in arg_set |
| 133 | for item in single_actions)): |
| 134 | print("Error, path '{}' not a valid vmbus device".format(path)) |
| 135 | exit(-1) |
| 136 | |
| 137 | # display Testing status of single device |
| 138 | def get_device_test_values(path, file_map): |
| 139 | |
| 140 | for name in file_map[path]: |
| 141 | file_location = file_map[path][name] |
| 142 | print( name + " = " + str(read_test_files(file_location))) |
| 143 | |
| 144 | # Create a map of the vmbus devices and their associated files |
| 145 | # [key=device, value = [key = filename, value = file path]] |
| 146 | def recursive_file_lookup(path, file_map): |
| 147 | |
| 148 | for f_path in glob.iglob(path + '**/*'): |
| 149 | if (os.path.isfile(f_path)): |
| 150 | if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path): |
| 151 | directory = f_path.rsplit("/",1)[0] |
| 152 | else: |
| 153 | directory = f_path.rsplit("/",2)[0] |
| 154 | f_name = f_path.split("/")[-1] |
| 155 | if (file_map.get(directory)): |
| 156 | file_map[directory].update({f_name:f_path}) |
| 157 | else: |
| 158 | file_map[directory] = {f_name:f_path} |
| 159 | elif (os.path.isdir(f_path)): |
| 160 | recursive_file_lookup(f_path,file_map) |
| 161 | return file_map |
| 162 | |
| 163 | # display Testing state of devices |
| 164 | def get_all_devices_test_status(file_map): |
| 165 | |
| 166 | for device in file_map: |
| 167 | if (get_test_state(locate_state(device, file_map)) is 1): |
| 168 | print("Testing = ON for: {}" |
| 169 | .format(device.split("/")[5])) |
| 170 | else: |
| 171 | print("Testing = OFF for: {}" |
| 172 | .format(device.split("/")[5])) |
| 173 | |
| 174 | # read the vmbus device files, path must be absolute path before calling |
| 175 | def read_test_files(path): |
| 176 | try: |
| 177 | with open(path,"r") as f: |
| 178 | file_value = f.readline().strip() |
| 179 | return int(file_value) |
| 180 | |
| 181 | except IOError as e: |
| 182 | errno, strerror = e.args |
| 183 | print("I/O error({0}): {1} on file {2}" |
| 184 | .format(errno, strerror, path)) |
| 185 | exit(-1) |
| 186 | except ValueError: |
| 187 | print ("Element to int conversion error in: \n{}".format(path)) |
| 188 | exit(-1) |
| 189 | |
| 190 | # writing to vmbus device files, path must be absolute path before calling |
| 191 | def write_test_files(path, value): |
| 192 | |
| 193 | try: |
| 194 | with open(path,"w") as f: |
| 195 | f.write("{}".format(value)) |
| 196 | except IOError as e: |
| 197 | errno, strerror = e.args |
| 198 | print("I/O error({0}): {1} on file {2}" |
| 199 | .format(errno, strerror, path)) |
| 200 | exit(-1) |
| 201 | |
| 202 | # set testing state of device |
| 203 | def set_test_state(state_path, state_value, quiet): |
| 204 | |
| 205 | write_test_files(state_path, state_value) |
| 206 | if (get_test_state(state_path) is 1): |
| 207 | if (not quiet): |
| 208 | print("Testing = ON for device: {}" |
| 209 | .format(state_path.split("/")[5])) |
| 210 | else: |
| 211 | if (not quiet): |
| 212 | print("Testing = OFF for device: {}" |
| 213 | .format(state_path.split("/")[5])) |
| 214 | |
| 215 | # get testing state of device |
| 216 | def get_test_state(state_path): |
| 217 | #state == 1 - test = ON |
| 218 | #state == 0 - test = OFF |
| 219 | return read_test_files(state_path) |
| 220 | |
| 221 | # write 1 - 1000 microseconds, into a single device using the |
| 222 | # fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay |
| 223 | # debugfs attributes |
| 224 | def set_delay_values(device, file_map, delay_length, quiet): |
| 225 | |
| 226 | try: |
| 227 | interrupt = file_map[device][f_names.buff_f.value] |
| 228 | message = file_map[device][f_names.mess_f.value] |
| 229 | |
| 230 | # delay[0]- buffer interrupt delay, delay[1]- message delay |
| 231 | if (delay_length[0] >= 0 and delay_length[0] <= 1000): |
| 232 | write_test_files(interrupt, delay_length[0]) |
| 233 | if (delay_length[1] >= 0 and delay_length[1] <= 1000): |
| 234 | write_test_files(message, delay_length[1]) |
| 235 | if (not quiet): |
| 236 | print("Buffer delay testing = {} for: {}" |
| 237 | .format(read_test_files(interrupt), |
| 238 | interrupt.split("/")[5])) |
| 239 | print("Message delay testing = {} for: {}" |
| 240 | .format(read_test_files(message), |
| 241 | message.split("/")[5])) |
| 242 | except IOError as e: |
| 243 | errno, strerror = e.args |
| 244 | print("I/O error({0}): {1} on files {2}{3}" |
| 245 | .format(errno, strerror, interrupt, message)) |
| 246 | exit(-1) |
| 247 | |
| 248 | # enabling delay testing on all devices |
| 249 | def set_delay_all_devices(file_map, delay, quiet): |
| 250 | |
| 251 | for device in (file_map): |
| 252 | set_test_state(locate_state(device, file_map), |
| 253 | dev_state.on.value, |
| 254 | quiet) |
| 255 | set_delay_values(device, file_map, delay, quiet) |
| 256 | |
| 257 | # disable all testing on a SINGLE device. |
| 258 | def disable_testing_single_device(device, file_map): |
| 259 | |
| 260 | for name in file_map[device]: |
| 261 | file_location = file_map[device][name] |
| 262 | write_test_files(file_location, dev_state.off.value) |
| 263 | print("ALL testing now OFF for {}".format(device.split("/")[-1])) |
| 264 | |
| 265 | # disable all testing on ALL devices |
| 266 | def disable_all_testing(file_map): |
| 267 | |
| 268 | for device in file_map: |
| 269 | disable_testing_single_device(device, file_map) |
| 270 | |
| 271 | def parse_args(): |
| 272 | parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n" |
| 273 | "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n" |
| 274 | "%(prog)s [view_all | V] [-h]\n" |
| 275 | "%(prog)s [disable_all | D] [-h]\n" |
| 276 | "%(prog)s [disable_single | d] [-h|-p]\n" |
| 277 | "%(prog)s [view_single | v] [-h|-p]\n" |
| 278 | "%(prog)s --version\n", |
| 279 | description = "\nUse lsvmbus to get vmbus device type " |
| 280 | "information.\n" "\nThe debugfs root path is " |
| 281 | "/sys/kernel/debug/hyperv", |
| 282 | formatter_class = RawDescriptionHelpFormatter) |
| 283 | subparsers = parser.add_subparsers(dest = "action") |
| 284 | parser.add_argument("--version", action = "version", |
| 285 | version = '%(prog)s 0.1.0') |
| 286 | parser.add_argument("-q","--quiet", action = "store_true", |
| 287 | help = "silence none important test messages." |
| 288 | " This will only work when enabling testing" |
| 289 | " on a device.") |
| 290 | # Use the path parser to hold the --path attribute so it can |
| 291 | # be shared between subparsers. Also do the same for the state |
| 292 | # parser, as all testing methods will use --enable_all and |
| 293 | # enable_single. |
| 294 | path_parser = argparse.ArgumentParser(add_help=False) |
| 295 | path_parser.add_argument("-p","--path", metavar = "", |
| 296 | help = "Debugfs path to a vmbus device. The path " |
| 297 | "must be the absolute path to the device.") |
| 298 | state_parser = argparse.ArgumentParser(add_help=False) |
| 299 | state_group = state_parser.add_mutually_exclusive_group(required = True) |
| 300 | state_group.add_argument("-E", "--enable_all", action = "store_const", |
| 301 | const = "enable_all", |
| 302 | help = "Enable the specified test type " |
| 303 | "on ALL vmbus devices.") |
| 304 | state_group.add_argument("-e", "--enable_single", |
| 305 | action = "store_const", |
| 306 | const = "enable_single", |
| 307 | help = "Enable the specified test type on a " |
| 308 | "SINGLE vmbus device.") |
| 309 | parser_delay = subparsers.add_parser("delay", |
| 310 | parents = [state_parser, path_parser], |
| 311 | help = "Delay the ring buffer interrupt or the " |
| 312 | "ring buffer message reads in microseconds.", |
| 313 | prog = "vmbus_testing", |
| 314 | usage = "%(prog)s [-h]\n" |
| 315 | "%(prog)s -E -t [value] [value]\n" |
| 316 | "%(prog)s -e -t [value] [value] -p", |
| 317 | description = "Delay the ring buffer interrupt for " |
| 318 | "vmbus devices, or delay the ring buffer message " |
| 319 | "reads for vmbus devices (both in microseconds). This " |
| 320 | "is only on the host to guest channel.") |
| 321 | parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2, |
| 322 | type = check_range, default =[0,0], required = (True), |
| 323 | help = "Set [buffer] & [message] delay time. " |
| 324 | "Value constraints: -1 == value " |
| 325 | "or 0 < value <= 1000.\n" |
| 326 | "Use -1 to keep the previous value for that delay " |
| 327 | "type, or a value > 0 <= 1000 to change the delay " |
| 328 | "time.") |
| 329 | parser_dis_all = subparsers.add_parser("disable_all", |
| 330 | aliases = ['D'], prog = "vmbus_testing", |
| 331 | usage = "%(prog)s [disable_all | D] -h\n" |
| 332 | "%(prog)s [disable_all | D]\n", |
| 333 | help = "Disable ALL testing on ALL vmbus devices.", |
| 334 | description = "Disable ALL testing on ALL vmbus " |
| 335 | "devices.") |
| 336 | parser_dis_single = subparsers.add_parser("disable_single", |
| 337 | aliases = ['d'], |
| 338 | parents = [path_parser], prog = "vmbus_testing", |
| 339 | usage = "%(prog)s [disable_single | d] -h\n" |
| 340 | "%(prog)s [disable_single | d] -p\n", |
| 341 | help = "Disable ALL testing on a SINGLE vmbus device.", |
| 342 | description = "Disable ALL testing on a SINGLE vmbus " |
| 343 | "device.") |
| 344 | parser_view_all = subparsers.add_parser("view_all", aliases = ['V'], |
| 345 | help = "View the test state for ALL vmbus devices.", |
| 346 | prog = "vmbus_testing", |
| 347 | usage = "%(prog)s [view_all | V] -h\n" |
| 348 | "%(prog)s [view_all | V]\n", |
| 349 | description = "This shows the test state for ALL the " |
| 350 | "vmbus devices.") |
| 351 | parser_view_single = subparsers.add_parser("view_single", |
| 352 | aliases = ['v'],parents = [path_parser], |
| 353 | help = "View the test values for a SINGLE vmbus " |
| 354 | "device.", |
| 355 | description = "This shows the test values for a SINGLE " |
| 356 | "vmbus device.", prog = "vmbus_testing", |
| 357 | usage = "%(prog)s [view_single | v] -h\n" |
| 358 | "%(prog)s [view_single | v] -p") |
| 359 | |
| 360 | return parser.parse_args() |
| 361 | |
| 362 | # value checking for range checking input in parser |
| 363 | def check_range(arg1): |
| 364 | |
| 365 | try: |
| 366 | val = int(arg1) |
| 367 | except ValueError as err: |
| 368 | raise argparse.ArgumentTypeError(str(err)) |
| 369 | if val < -1 or val > 1000: |
| 370 | message = ("\n\nvalue must be -1 or 0 < value <= 1000. " |
| 371 | "Value program received: {}\n").format(val) |
| 372 | raise argparse.ArgumentTypeError(message) |
| 373 | return val |
| 374 | |
| 375 | if __name__ == "__main__": |
| 376 | main() |