blob: e7212903dd1d92b80c3a82b9ab0bcbd1f138f882 [file] [log] [blame]
Branden Bonabyc48d8b02019-10-03 17:02:05 -04001#!/usr/bin/env python3
2# SPDX-License-Identifier: GPL-2.0
3#
4# Program to allow users to fuzz test Hyper-V drivers
5# by interfacing with Hyper-V debugfs attributes.
6# Current test methods available:
7# 1. delay testing
8#
9# Current file/directory structure of hyper-V debugfs:
10# /sys/kernel/debug/hyperv/UUID
11# /sys/kernel/debug/hyperv/UUID/<test-state filename>
12# /sys/kernel/debug/hyperv/UUID/<test-method sub-directory>
13#
14# author: Branden Bonaby <brandonbonaby94@gmail.com>
15
16import os
17import cmd
18import argparse
19import glob
20from argparse import RawDescriptionHelpFormatter
21from argparse import RawTextHelpFormatter
22from enum import Enum
23
24# Do not change unless, you change the debugfs attributes
25# in /drivers/hv/debugfs.c. All fuzz testing
26# attributes will start with "fuzz_test".
27
28# debugfs path for hyperv must exist before proceeding
29debugfs_hyperv_path = "/sys/kernel/debug/hyperv"
30if not os.path.isdir(debugfs_hyperv_path):
31 print("{} doesn't exist/check permissions".format(debugfs_hyperv_path))
32 exit(-1)
33
34class dev_state(Enum):
35 off = 0
36 on = 1
37
38# File names, that correspond to the files created in
39# /drivers/hv/debugfs.c
40class f_names(Enum):
41 state_f = "fuzz_test_state"
42 buff_f = "fuzz_test_buffer_interrupt_delay"
43 mess_f = "fuzz_test_message_delay"
44
45# Both single_actions and all_actions are used
46# for error checking and to allow for some subparser
47# names to be abbreviated. Do not abbreviate the
48# test method names, as it will become less intuitive
49# as to what the user can do. If you do decide to
50# abbreviate the test method name, make sure the main
51# function reflects this change.
52
53all_actions = [
54 "disable_all",
55 "D",
56 "enable_all",
57 "view_all",
58 "V"
59]
60
61single_actions = [
62 "disable_single",
63 "d",
64 "enable_single",
65 "view_single",
66 "v"
67]
68
69def main():
70
71 file_map = recursive_file_lookup(debugfs_hyperv_path, dict())
72 args = parse_args()
73 if (not args.action):
74 print ("Error, no options selected...exiting")
75 exit(-1)
76 arg_set = { k for (k,v) in vars(args).items() if v and k != "action" }
77 arg_set.add(args.action)
78 path = args.path if "path" in arg_set else None
79 if (path and path[-1] == "/"):
80 path = path[:-1]
81 validate_args_path(path, arg_set, file_map)
82 if (path and "enable_single" in arg_set):
83 state_path = locate_state(path, file_map)
84 set_test_state(state_path, dev_state.on.value, args.quiet)
85
86 # Use subparsers as the key for different actions
87 if ("delay" in arg_set):
88 validate_delay_values(args.delay_time)
89 if (args.enable_all):
90 set_delay_all_devices(file_map, args.delay_time,
91 args.quiet)
92 else:
93 set_delay_values(path, file_map, args.delay_time,
94 args.quiet)
95 elif ("disable_all" in arg_set or "D" in arg_set):
96 disable_all_testing(file_map)
97 elif ("disable_single" in arg_set or "d" in arg_set):
98 disable_testing_single_device(path, file_map)
99 elif ("view_all" in arg_set or "V" in arg_set):
100 get_all_devices_test_status(file_map)
101 elif ("view_single" in arg_set or "v" in arg_set):
102 get_device_test_values(path, file_map)
103
104# Get the state location
105def locate_state(device, file_map):
106 return file_map[device][f_names.state_f.value]
107
108# Validate delay values to make sure they are acceptable to
109# enable delays on a device
110def validate_delay_values(delay):
111
112 if (delay[0] == -1 and delay[1] == -1):
113 print("\nError, At least 1 value must be greater than 0")
114 exit(-1)
115 for i in delay:
116 if (i < -1 or i == 0 or i > 1000):
117 print("\nError, Values must be equal to -1 "
118 "or be > 0 and <= 1000")
119 exit(-1)
120
121# Validate argument path
122def validate_args_path(path, arg_set, file_map):
123
124 if (not path and any(element in arg_set for element in single_actions)):
125 print("Error, path (-p) REQUIRED for the specified option. "
126 "Use (-h) to check usage.")
127 exit(-1)
128 elif (path and any(item in arg_set for item in all_actions)):
129 print("Error, path (-p) NOT REQUIRED for the specified option. "
130 "Use (-h) to check usage." )
131 exit(-1)
132 elif (path not in file_map and any(item in arg_set
133 for item in single_actions)):
134 print("Error, path '{}' not a valid vmbus device".format(path))
135 exit(-1)
136
137# display Testing status of single device
138def get_device_test_values(path, file_map):
139
140 for name in file_map[path]:
141 file_location = file_map[path][name]
142 print( name + " = " + str(read_test_files(file_location)))
143
144# Create a map of the vmbus devices and their associated files
145# [key=device, value = [key = filename, value = file path]]
146def recursive_file_lookup(path, file_map):
147
148 for f_path in glob.iglob(path + '**/*'):
149 if (os.path.isfile(f_path)):
150 if (f_path.rsplit("/",2)[0] == debugfs_hyperv_path):
151 directory = f_path.rsplit("/",1)[0]
152 else:
153 directory = f_path.rsplit("/",2)[0]
154 f_name = f_path.split("/")[-1]
155 if (file_map.get(directory)):
156 file_map[directory].update({f_name:f_path})
157 else:
158 file_map[directory] = {f_name:f_path}
159 elif (os.path.isdir(f_path)):
160 recursive_file_lookup(f_path,file_map)
161 return file_map
162
163# display Testing state of devices
164def get_all_devices_test_status(file_map):
165
166 for device in file_map:
167 if (get_test_state(locate_state(device, file_map)) is 1):
168 print("Testing = ON for: {}"
169 .format(device.split("/")[5]))
170 else:
171 print("Testing = OFF for: {}"
172 .format(device.split("/")[5]))
173
174# read the vmbus device files, path must be absolute path before calling
175def read_test_files(path):
176 try:
177 with open(path,"r") as f:
178 file_value = f.readline().strip()
179 return int(file_value)
180
181 except IOError as e:
182 errno, strerror = e.args
183 print("I/O error({0}): {1} on file {2}"
184 .format(errno, strerror, path))
185 exit(-1)
186 except ValueError:
187 print ("Element to int conversion error in: \n{}".format(path))
188 exit(-1)
189
190# writing to vmbus device files, path must be absolute path before calling
191def write_test_files(path, value):
192
193 try:
194 with open(path,"w") as f:
195 f.write("{}".format(value))
196 except IOError as e:
197 errno, strerror = e.args
198 print("I/O error({0}): {1} on file {2}"
199 .format(errno, strerror, path))
200 exit(-1)
201
202# set testing state of device
203def set_test_state(state_path, state_value, quiet):
204
205 write_test_files(state_path, state_value)
206 if (get_test_state(state_path) is 1):
207 if (not quiet):
208 print("Testing = ON for device: {}"
209 .format(state_path.split("/")[5]))
210 else:
211 if (not quiet):
212 print("Testing = OFF for device: {}"
213 .format(state_path.split("/")[5]))
214
215# get testing state of device
216def get_test_state(state_path):
217 #state == 1 - test = ON
218 #state == 0 - test = OFF
219 return read_test_files(state_path)
220
221# write 1 - 1000 microseconds, into a single device using the
222# fuzz_test_buffer_interrupt_delay and fuzz_test_message_delay
223# debugfs attributes
224def set_delay_values(device, file_map, delay_length, quiet):
225
226 try:
227 interrupt = file_map[device][f_names.buff_f.value]
228 message = file_map[device][f_names.mess_f.value]
229
230 # delay[0]- buffer interrupt delay, delay[1]- message delay
231 if (delay_length[0] >= 0 and delay_length[0] <= 1000):
232 write_test_files(interrupt, delay_length[0])
233 if (delay_length[1] >= 0 and delay_length[1] <= 1000):
234 write_test_files(message, delay_length[1])
235 if (not quiet):
236 print("Buffer delay testing = {} for: {}"
237 .format(read_test_files(interrupt),
238 interrupt.split("/")[5]))
239 print("Message delay testing = {} for: {}"
240 .format(read_test_files(message),
241 message.split("/")[5]))
242 except IOError as e:
243 errno, strerror = e.args
244 print("I/O error({0}): {1} on files {2}{3}"
245 .format(errno, strerror, interrupt, message))
246 exit(-1)
247
248# enabling delay testing on all devices
249def set_delay_all_devices(file_map, delay, quiet):
250
251 for device in (file_map):
252 set_test_state(locate_state(device, file_map),
253 dev_state.on.value,
254 quiet)
255 set_delay_values(device, file_map, delay, quiet)
256
257# disable all testing on a SINGLE device.
258def disable_testing_single_device(device, file_map):
259
260 for name in file_map[device]:
261 file_location = file_map[device][name]
262 write_test_files(file_location, dev_state.off.value)
263 print("ALL testing now OFF for {}".format(device.split("/")[-1]))
264
265# disable all testing on ALL devices
266def disable_all_testing(file_map):
267
268 for device in file_map:
269 disable_testing_single_device(device, file_map)
270
271def parse_args():
272 parser = argparse.ArgumentParser(prog = "vmbus_testing",usage ="\n"
273 "%(prog)s [delay] [-h] [-e|-E] -t [-p]\n"
274 "%(prog)s [view_all | V] [-h]\n"
275 "%(prog)s [disable_all | D] [-h]\n"
276 "%(prog)s [disable_single | d] [-h|-p]\n"
277 "%(prog)s [view_single | v] [-h|-p]\n"
278 "%(prog)s --version\n",
279 description = "\nUse lsvmbus to get vmbus device type "
280 "information.\n" "\nThe debugfs root path is "
281 "/sys/kernel/debug/hyperv",
282 formatter_class = RawDescriptionHelpFormatter)
283 subparsers = parser.add_subparsers(dest = "action")
284 parser.add_argument("--version", action = "version",
285 version = '%(prog)s 0.1.0')
286 parser.add_argument("-q","--quiet", action = "store_true",
287 help = "silence none important test messages."
288 " This will only work when enabling testing"
289 " on a device.")
290 # Use the path parser to hold the --path attribute so it can
291 # be shared between subparsers. Also do the same for the state
292 # parser, as all testing methods will use --enable_all and
293 # enable_single.
294 path_parser = argparse.ArgumentParser(add_help=False)
295 path_parser.add_argument("-p","--path", metavar = "",
296 help = "Debugfs path to a vmbus device. The path "
297 "must be the absolute path to the device.")
298 state_parser = argparse.ArgumentParser(add_help=False)
299 state_group = state_parser.add_mutually_exclusive_group(required = True)
300 state_group.add_argument("-E", "--enable_all", action = "store_const",
301 const = "enable_all",
302 help = "Enable the specified test type "
303 "on ALL vmbus devices.")
304 state_group.add_argument("-e", "--enable_single",
305 action = "store_const",
306 const = "enable_single",
307 help = "Enable the specified test type on a "
308 "SINGLE vmbus device.")
309 parser_delay = subparsers.add_parser("delay",
310 parents = [state_parser, path_parser],
311 help = "Delay the ring buffer interrupt or the "
312 "ring buffer message reads in microseconds.",
313 prog = "vmbus_testing",
314 usage = "%(prog)s [-h]\n"
315 "%(prog)s -E -t [value] [value]\n"
316 "%(prog)s -e -t [value] [value] -p",
317 description = "Delay the ring buffer interrupt for "
318 "vmbus devices, or delay the ring buffer message "
319 "reads for vmbus devices (both in microseconds). This "
320 "is only on the host to guest channel.")
321 parser_delay.add_argument("-t", "--delay_time", metavar = "", nargs = 2,
322 type = check_range, default =[0,0], required = (True),
323 help = "Set [buffer] & [message] delay time. "
324 "Value constraints: -1 == value "
325 "or 0 < value <= 1000.\n"
326 "Use -1 to keep the previous value for that delay "
327 "type, or a value > 0 <= 1000 to change the delay "
328 "time.")
329 parser_dis_all = subparsers.add_parser("disable_all",
330 aliases = ['D'], prog = "vmbus_testing",
331 usage = "%(prog)s [disable_all | D] -h\n"
332 "%(prog)s [disable_all | D]\n",
333 help = "Disable ALL testing on ALL vmbus devices.",
334 description = "Disable ALL testing on ALL vmbus "
335 "devices.")
336 parser_dis_single = subparsers.add_parser("disable_single",
337 aliases = ['d'],
338 parents = [path_parser], prog = "vmbus_testing",
339 usage = "%(prog)s [disable_single | d] -h\n"
340 "%(prog)s [disable_single | d] -p\n",
341 help = "Disable ALL testing on a SINGLE vmbus device.",
342 description = "Disable ALL testing on a SINGLE vmbus "
343 "device.")
344 parser_view_all = subparsers.add_parser("view_all", aliases = ['V'],
345 help = "View the test state for ALL vmbus devices.",
346 prog = "vmbus_testing",
347 usage = "%(prog)s [view_all | V] -h\n"
348 "%(prog)s [view_all | V]\n",
349 description = "This shows the test state for ALL the "
350 "vmbus devices.")
351 parser_view_single = subparsers.add_parser("view_single",
352 aliases = ['v'],parents = [path_parser],
353 help = "View the test values for a SINGLE vmbus "
354 "device.",
355 description = "This shows the test values for a SINGLE "
356 "vmbus device.", prog = "vmbus_testing",
357 usage = "%(prog)s [view_single | v] -h\n"
358 "%(prog)s [view_single | v] -p")
359
360 return parser.parse_args()
361
362# value checking for range checking input in parser
363def check_range(arg1):
364
365 try:
366 val = int(arg1)
367 except ValueError as err:
368 raise argparse.ArgumentTypeError(str(err))
369 if val < -1 or val > 1000:
370 message = ("\n\nvalue must be -1 or 0 < value <= 1000. "
371 "Value program received: {}\n").format(val)
372 raise argparse.ArgumentTypeError(message)
373 return val
374
375if __name__ == "__main__":
376 main()