 61a8712286
			
		
	
	
	61a8712286
	
	
	
		
			
			RT-mutex tester: scriptable tester for rt mutexes, which allows userspace scripting of mutex unit-tests (and dynamic tests as well), using the actual rt-mutex implementation of the kernel. [akpm@osdl.org: fixlet] Signed-off-by: Thomas Gleixner <tglx@linutronix.de> Signed-off-by: Ingo Molnar <mingo@elte.hu> Signed-off-by: Arjan van de Ven <arjan@linux.intel.com> Signed-off-by: Andrew Morton <akpm@osdl.org> Signed-off-by: Linus Torvalds <torvalds@osdl.org>
		
			
				
	
	
		
			222 lines
		
	
	
	
		
			5.2 KiB
			
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			222 lines
		
	
	
	
		
			5.2 KiB
			
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/env python
 | |
| #
 | |
| # rt-mutex tester
 | |
| #
 | |
| # (C) 2006 Thomas Gleixner <tglx@linutronix.de>
 | |
| #
 | |
| # This program is free software; you can redistribute it and/or modify
 | |
| # it under the terms of the GNU General Public License version 2 as
 | |
| # published by the Free Software Foundation.
 | |
| #
 | |
| import os
 | |
| import sys
 | |
| import getopt
 | |
| import shutil
 | |
| import string
 | |
| 
 | |
| # Globals
 | |
| quiet = 0
 | |
| test = 0
 | |
| comments = 0
 | |
| 
 | |
| sysfsprefix = "/sys/devices/system/rttest/rttest"
 | |
| statusfile = "/status"
 | |
| commandfile = "/command"
 | |
| 
 | |
| # Command opcodes
 | |
| cmd_opcodes = {
 | |
|     "schedother"    : "1",
 | |
|     "schedfifo"     : "2",
 | |
|     "lock"          : "3",
 | |
|     "locknowait"    : "4",
 | |
|     "lockint"       : "5",
 | |
|     "lockintnowait" : "6",
 | |
|     "lockcont"      : "7",
 | |
|     "unlock"        : "8",
 | |
|     "lockbkl"       : "9",
 | |
|     "unlockbkl"     : "10",
 | |
|     "signal"        : "11",
 | |
|     "resetevent"    : "98",
 | |
|     "reset"         : "99",
 | |
|     }
 | |
| 
 | |
| test_opcodes = {
 | |
|     "prioeq"        : ["P" , "eq" , None],
 | |
|     "priolt"        : ["P" , "lt" , None],
 | |
|     "priogt"        : ["P" , "gt" , None],
 | |
|     "nprioeq"       : ["N" , "eq" , None],
 | |
|     "npriolt"       : ["N" , "lt" , None],
 | |
|     "npriogt"       : ["N" , "gt" , None],
 | |
|     "unlocked"      : ["M" , "eq" , 0],
 | |
|     "trylock"       : ["M" , "eq" , 1],
 | |
|     "blocked"       : ["M" , "eq" , 2],
 | |
|     "blockedwake"   : ["M" , "eq" , 3],
 | |
|     "locked"        : ["M" , "eq" , 4],
 | |
|     "opcodeeq"      : ["O" , "eq" , None],
 | |
|     "opcodelt"      : ["O" , "lt" , None],
 | |
|     "opcodegt"      : ["O" , "gt" , None],
 | |
|     "eventeq"       : ["E" , "eq" , None],
 | |
|     "eventlt"       : ["E" , "lt" , None],
 | |
|     "eventgt"       : ["E" , "gt" , None],
 | |
|     }
 | |
| 
 | |
| # Print usage information
 | |
| def usage():
 | |
|     print "rt-tester.py <-c -h -q -t> <testfile>"
 | |
|     print " -c    display comments after first command"
 | |
|     print " -h    help"
 | |
|     print " -q    quiet mode"
 | |
|     print " -t    test mode (syntax check)"
 | |
|     print " testfile: read test specification from testfile"
 | |
|     print " otherwise from stdin"
 | |
|     return
 | |
| 
 | |
| # Print progress when not in quiet mode
 | |
| def progress(str):
 | |
|     if not quiet:
 | |
|         print str
 | |
| 
 | |
| # Analyse a status value
 | |
| def analyse(val, top, arg):
 | |
| 
 | |
|     intval = int(val)
 | |
| 
 | |
|     if top[0] == "M":
 | |
|         intval = intval / (10 ** int(arg))
 | |
| 	intval = intval % 10
 | |
|         argval = top[2]
 | |
|     elif top[0] == "O":
 | |
|         argval = int(cmd_opcodes.get(arg, arg))
 | |
|     else:
 | |
|         argval = int(arg)
 | |
| 
 | |
|     # progress("%d %s %d" %(intval, top[1], argval))
 | |
| 
 | |
|     if top[1] == "eq" and intval == argval:
 | |
| 	return 1
 | |
|     if top[1] == "lt" and intval < argval:
 | |
|         return 1
 | |
|     if top[1] == "gt" and intval > argval:
 | |
| 	return 1
 | |
|     return 0
 | |
| 
 | |
| # Parse the commandline
 | |
| try:
 | |
|     (options, arguments) = getopt.getopt(sys.argv[1:],'chqt')
 | |
| except getopt.GetoptError, ex:
 | |
|     usage()
 | |
|     sys.exit(1)
 | |
| 
 | |
| # Parse commandline options
 | |
| for option, value in options:
 | |
|     if option == "-c":
 | |
|         comments = 1
 | |
|     elif option == "-q":
 | |
|         quiet = 1
 | |
|     elif option == "-t":
 | |
|         test = 1
 | |
|     elif option == '-h':
 | |
|         usage()
 | |
|         sys.exit(0)
 | |
| 
 | |
| # Select the input source
 | |
| if arguments:
 | |
|     try:
 | |
|         fd = open(arguments[0])
 | |
|     except Exception,ex:
 | |
|         sys.stderr.write("File not found %s\n" %(arguments[0]))
 | |
|         sys.exit(1)
 | |
| else:
 | |
|     fd = sys.stdin
 | |
| 
 | |
| linenr = 0
 | |
| 
 | |
| # Read the test patterns
 | |
| while 1:
 | |
| 
 | |
|     linenr = linenr + 1
 | |
|     line = fd.readline()
 | |
|     if not len(line):
 | |
|         break
 | |
| 
 | |
|     line = line.strip()
 | |
|     parts = line.split(":")
 | |
| 
 | |
|     if not parts or len(parts) < 1:
 | |
|         continue
 | |
| 
 | |
|     if len(parts[0]) == 0:
 | |
|         continue
 | |
| 
 | |
|     if parts[0].startswith("#"):
 | |
| 	if comments > 1:
 | |
| 	    progress(line)
 | |
| 	continue
 | |
| 
 | |
|     if comments == 1:
 | |
| 	comments = 2
 | |
| 
 | |
|     progress(line)
 | |
| 
 | |
|     cmd = parts[0].strip().lower()
 | |
|     opc = parts[1].strip().lower()
 | |
|     tid = parts[2].strip()
 | |
|     dat = parts[3].strip()
 | |
| 
 | |
|     try:
 | |
|         # Test or wait for a status value
 | |
|         if cmd == "t" or cmd == "w":
 | |
|             testop = test_opcodes[opc]
 | |
| 
 | |
|             fname = "%s%s%s" %(sysfsprefix, tid, statusfile)
 | |
|             if test:
 | |
| 		print fname
 | |
|                 continue
 | |
| 
 | |
|             while 1:
 | |
|                 query = 1
 | |
|                 fsta = open(fname, 'r')
 | |
|                 status = fsta.readline().strip()
 | |
|                 fsta.close()
 | |
|                 stat = status.split(",")
 | |
|                 for s in stat:
 | |
| 		    s = s.strip()
 | |
|                     if s.startswith(testop[0]):
 | |
|                         # Seperate status value
 | |
|                         val = s[2:].strip()
 | |
|                         query = analyse(val, testop, dat)
 | |
|                         break
 | |
|                 if query or cmd == "t":
 | |
|                     break
 | |
| 
 | |
|             progress("   " + status)
 | |
| 
 | |
|             if not query:
 | |
|                 sys.stderr.write("Test failed in line %d\n" %(linenr))
 | |
| 		sys.exit(1)
 | |
| 
 | |
|         # Issue a command to the tester
 | |
|         elif cmd == "c":
 | |
|             cmdnr = cmd_opcodes[opc]
 | |
|             # Build command string and sys filename
 | |
|             cmdstr = "%s:%s" %(cmdnr, dat)
 | |
|             fname = "%s%s%s" %(sysfsprefix, tid, commandfile)
 | |
|             if test:
 | |
| 		print fname
 | |
|                 continue
 | |
|             fcmd = open(fname, 'w')
 | |
|             fcmd.write(cmdstr)
 | |
|             fcmd.close()
 | |
| 
 | |
|     except Exception,ex:
 | |
|     	sys.stderr.write(str(ex))
 | |
|         sys.stderr.write("\nSyntax error in line %d\n" %(linenr))
 | |
|         if not test:
 | |
|             fd.close()
 | |
|             sys.exit(1)
 | |
| 
 | |
| # Normal exit pass
 | |
| print "Pass"
 | |
| sys.exit(0)
 | |
| 
 | |
| 
 |