mirror of
				https://github.com/torvalds/linux.git
				synced 2025-11-04 02:30:34 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			218 lines
		
	
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			218 lines
		
	
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/python
 | 
						|
#
 | 
						|
# rt-mutex tester
 | 
						|
#
 | 
						|
# (C) 2006 Thomas Gleixner <tglx@linutronix.de>
 | 
						|
#
 | 
						|
# This program is free software; you can redistribute it and/or modify
 | 
						|
# it under the terms of the GNU General Public License version 2 as
 | 
						|
# published by the Free Software Foundation.
 | 
						|
#
 | 
						|
import os
 | 
						|
import sys
 | 
						|
import getopt
 | 
						|
import shutil
 | 
						|
import string
 | 
						|
 | 
						|
# Globals
 | 
						|
quiet = 0
 | 
						|
test = 0
 | 
						|
comments = 0
 | 
						|
 | 
						|
sysfsprefix = "/sys/devices/system/rttest/rttest"
 | 
						|
statusfile = "/status"
 | 
						|
commandfile = "/command"
 | 
						|
 | 
						|
# Command opcodes
 | 
						|
cmd_opcodes = {
 | 
						|
    "schedother"    : "1",
 | 
						|
    "schedfifo"     : "2",
 | 
						|
    "lock"          : "3",
 | 
						|
    "locknowait"    : "4",
 | 
						|
    "lockint"       : "5",
 | 
						|
    "lockintnowait" : "6",
 | 
						|
    "lockcont"      : "7",
 | 
						|
    "unlock"        : "8",
 | 
						|
    "signal"        : "11",
 | 
						|
    "resetevent"    : "98",
 | 
						|
    "reset"         : "99",
 | 
						|
    }
 | 
						|
 | 
						|
test_opcodes = {
 | 
						|
    "prioeq"        : ["P" , "eq" , None],
 | 
						|
    "priolt"        : ["P" , "lt" , None],
 | 
						|
    "priogt"        : ["P" , "gt" , None],
 | 
						|
    "nprioeq"       : ["N" , "eq" , None],
 | 
						|
    "npriolt"       : ["N" , "lt" , None],
 | 
						|
    "npriogt"       : ["N" , "gt" , None],
 | 
						|
    "unlocked"      : ["M" , "eq" , 0],
 | 
						|
    "trylock"       : ["M" , "eq" , 1],
 | 
						|
    "blocked"       : ["M" , "eq" , 2],
 | 
						|
    "blockedwake"   : ["M" , "eq" , 3],
 | 
						|
    "locked"        : ["M" , "eq" , 4],
 | 
						|
    "opcodeeq"      : ["O" , "eq" , None],
 | 
						|
    "opcodelt"      : ["O" , "lt" , None],
 | 
						|
    "opcodegt"      : ["O" , "gt" , None],
 | 
						|
    "eventeq"       : ["E" , "eq" , None],
 | 
						|
    "eventlt"       : ["E" , "lt" , None],
 | 
						|
    "eventgt"       : ["E" , "gt" , None],
 | 
						|
    }
 | 
						|
 | 
						|
# Print usage information
 | 
						|
def usage():
 | 
						|
    print "rt-tester.py <-c -h -q -t> <testfile>"
 | 
						|
    print " -c    display comments after first command"
 | 
						|
    print " -h    help"
 | 
						|
    print " -q    quiet mode"
 | 
						|
    print " -t    test mode (syntax check)"
 | 
						|
    print " testfile: read test specification from testfile"
 | 
						|
    print " otherwise from stdin"
 | 
						|
    return
 | 
						|
 | 
						|
# Print progress when not in quiet mode
 | 
						|
def progress(str):
 | 
						|
    if not quiet:
 | 
						|
        print str
 | 
						|
 | 
						|
# Analyse a status value
 | 
						|
def analyse(val, top, arg):
 | 
						|
 | 
						|
    intval = int(val)
 | 
						|
 | 
						|
    if top[0] == "M":
 | 
						|
        intval = intval / (10 ** int(arg))
 | 
						|
	intval = intval % 10
 | 
						|
        argval = top[2]
 | 
						|
    elif top[0] == "O":
 | 
						|
        argval = int(cmd_opcodes.get(arg, arg))
 | 
						|
    else:
 | 
						|
        argval = int(arg)
 | 
						|
 | 
						|
    # progress("%d %s %d" %(intval, top[1], argval))
 | 
						|
 | 
						|
    if top[1] == "eq" and intval == argval:
 | 
						|
	return 1
 | 
						|
    if top[1] == "lt" and intval < argval:
 | 
						|
        return 1
 | 
						|
    if top[1] == "gt" and intval > argval:
 | 
						|
	return 1
 | 
						|
    return 0
 | 
						|
 | 
						|
# Parse the commandline
 | 
						|
try:
 | 
						|
    (options, arguments) = getopt.getopt(sys.argv[1:],'chqt')
 | 
						|
except getopt.GetoptError, ex:
 | 
						|
    usage()
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
# Parse commandline options
 | 
						|
for option, value in options:
 | 
						|
    if option == "-c":
 | 
						|
        comments = 1
 | 
						|
    elif option == "-q":
 | 
						|
        quiet = 1
 | 
						|
    elif option == "-t":
 | 
						|
        test = 1
 | 
						|
    elif option == '-h':
 | 
						|
        usage()
 | 
						|
        sys.exit(0)
 | 
						|
 | 
						|
# Select the input source
 | 
						|
if arguments:
 | 
						|
    try:
 | 
						|
        fd = open(arguments[0])
 | 
						|
    except Exception,ex:
 | 
						|
        sys.stderr.write("File not found %s\n" %(arguments[0]))
 | 
						|
        sys.exit(1)
 | 
						|
else:
 | 
						|
    fd = sys.stdin
 | 
						|
 | 
						|
linenr = 0
 | 
						|
 | 
						|
# Read the test patterns
 | 
						|
while 1:
 | 
						|
 | 
						|
    linenr = linenr + 1
 | 
						|
    line = fd.readline()
 | 
						|
    if not len(line):
 | 
						|
        break
 | 
						|
 | 
						|
    line = line.strip()
 | 
						|
    parts = line.split(":")
 | 
						|
 | 
						|
    if not parts or len(parts) < 1:
 | 
						|
        continue
 | 
						|
 | 
						|
    if len(parts[0]) == 0:
 | 
						|
        continue
 | 
						|
 | 
						|
    if parts[0].startswith("#"):
 | 
						|
	if comments > 1:
 | 
						|
	    progress(line)
 | 
						|
	continue
 | 
						|
 | 
						|
    if comments == 1:
 | 
						|
	comments = 2
 | 
						|
 | 
						|
    progress(line)
 | 
						|
 | 
						|
    cmd = parts[0].strip().lower()
 | 
						|
    opc = parts[1].strip().lower()
 | 
						|
    tid = parts[2].strip()
 | 
						|
    dat = parts[3].strip()
 | 
						|
 | 
						|
    try:
 | 
						|
        # Test or wait for a status value
 | 
						|
        if cmd == "t" or cmd == "w":
 | 
						|
            testop = test_opcodes[opc]
 | 
						|
 | 
						|
            fname = "%s%s%s" %(sysfsprefix, tid, statusfile)
 | 
						|
            if test:
 | 
						|
		print fname
 | 
						|
                continue
 | 
						|
 | 
						|
            while 1:
 | 
						|
                query = 1
 | 
						|
                fsta = open(fname, 'r')
 | 
						|
                status = fsta.readline().strip()
 | 
						|
                fsta.close()
 | 
						|
                stat = status.split(",")
 | 
						|
                for s in stat:
 | 
						|
		    s = s.strip()
 | 
						|
                    if s.startswith(testop[0]):
 | 
						|
                        # Separate status value
 | 
						|
                        val = s[2:].strip()
 | 
						|
                        query = analyse(val, testop, dat)
 | 
						|
                        break
 | 
						|
                if query or cmd == "t":
 | 
						|
                    break
 | 
						|
 | 
						|
            progress("   " + status)
 | 
						|
 | 
						|
            if not query:
 | 
						|
                sys.stderr.write("Test failed in line %d\n" %(linenr))
 | 
						|
		sys.exit(1)
 | 
						|
 | 
						|
        # Issue a command to the tester
 | 
						|
        elif cmd == "c":
 | 
						|
            cmdnr = cmd_opcodes[opc]
 | 
						|
            # Build command string and sys filename
 | 
						|
            cmdstr = "%s:%s" %(cmdnr, dat)
 | 
						|
            fname = "%s%s%s" %(sysfsprefix, tid, commandfile)
 | 
						|
            if test:
 | 
						|
		print fname
 | 
						|
                continue
 | 
						|
            fcmd = open(fname, 'w')
 | 
						|
            fcmd.write(cmdstr)
 | 
						|
            fcmd.close()
 | 
						|
 | 
						|
    except Exception,ex:
 | 
						|
    	sys.stderr.write(str(ex))
 | 
						|
        sys.stderr.write("\nSyntax error in line %d\n" %(linenr))
 | 
						|
        if not test:
 | 
						|
            fd.close()
 | 
						|
            sys.exit(1)
 | 
						|
 | 
						|
# Normal exit pass
 | 
						|
print "Pass"
 | 
						|
sys.exit(0)
 |