[PATCH] Add per-test timeouts to lit
Jonathan Roelofs
jonathan at codesourcery.com
Wed Dec 10 10:54:47 PST 2014
Address more of ddunbar's comments, including adding some tests.
(doing this over email because phabricator is in the middle of some maintenance)
Jon
On 12/9/14 8:37 PM, Daniel Dunbar wrote:
> Looks good, some nits inline. I would like to see a test or two before this goes in, though.
>
> ================
> Comment at: utils/lit/lit/util.py:11
> @@ -9,1 +10,3 @@
>
> +import Test
> +
> ----------------
> This import isn't needed anymore.
>
> ================
> Comment at: utils/lit/lit/util.py:156
> @@ +155,3 @@
> +
> + class Watchdog(object):
> + def __init__(self, popen, timeout):
> ----------------
> jroelofs wrote:
>> thanks for the suggestion... I like this a lot more than what I had before.
> Can this be lifted to the top-level of this file just to avoid the break in the code?
>
> ================
> Comment at: utils/lit/lit/util.py:160
> @@ +159,3 @@
> + self.timer = None
> + if timeout is not None:
> + def timeout_handler():
> ----------------
> I would prefer if this class assumed timeout was non-None and the caller assumed the responsibility of handling the optionality of timeout (and just didn't create an instance)...
>
> With this change the watchdog implementation gets a tad simpler.
>
> ================
> Comment at: utils/lit/lit/util.py:161
> @@ +160,3 @@
> + if timeout is not None:
> + def timeout_handler():
> + try:
> ----------------
> This might be more readable as a regular method on the class instead of a nested function.
>
> ================
> Comment at: utils/lit/lit/util.py:166
> @@ +165,3 @@
> + # The Popen already terminated before the watchdog
> + # got acqauinted with it. Too bad they couldn't be
> + # friends
> ----------------
> Sic.
>
> ================
> Comment at: utils/lit/lit/util.py:171
> @@ +170,3 @@
> + timer = threading.Timer(timeout, timeout_handler)
> + timer.start()
> +
> ----------------
> This is currently broken, this should be self.timer, but with the change above there should only be a single assignment anyway.
>
> ================
> Comment at: utils/lit/lit/util.py:178
> @@ +177,3 @@
> + def timed_out(self):
> + return self.timeout_tripped
> +
> ----------------
> There isn't a good reason to use a getter method here, clients can just use the property directly.
>
> ================
> Comment at: utils/lit/lit/util.py:186
> @@ +185,3 @@
> + if wd.timed_out():
> + err += 'Test timed out after %d seconds\n' % (timeout)
> + wd.cancel()
> ----------------
> I'd add a newline (or two) at the start of this string, as there isn't much guarantee the existing stderr data has one.
>
> http://reviews.llvm.org/D6584
>
>
--
Jon Roelofs
jonathan at codesourcery.com
CodeSourcery / Mentor Embedded
-------------- next part --------------
Index: utils/lit/lit/LitConfig.py
===================================================================
--- utils/lit/lit/LitConfig.py (revision 223618)
+++ utils/lit/lit/LitConfig.py (working copy)
@@ -1,123 +1,124 @@
from __future__ import absolute_import
import inspect
import os
import sys
import lit.Test
import lit.formats
import lit.TestingConfig
import lit.util
class LitConfig:
"""LitConfig - Configuration data for a 'lit' test runner instance, shared
across all tests.
The LitConfig object is also used to communicate with client configuration
files, it is always passed in as the global variable 'lit' so that
configuration files can access common functionality and internal components
easily.
"""
def __init__(self, progname, path, quiet,
useValgrind, valgrindLeakCheck, valgrindArgs,
noExecute, debug, isWindows,
- params, config_prefix = None):
+ params, config_prefix = None, timeout = None):
# The name of the test runner.
self.progname = progname
# The items to add to the PATH environment variable.
self.path = [str(p) for p in path]
self.quiet = bool(quiet)
self.useValgrind = bool(useValgrind)
self.valgrindLeakCheck = bool(valgrindLeakCheck)
self.valgrindUserArgs = list(valgrindArgs)
self.noExecute = noExecute
self.debug = debug
self.isWindows = bool(isWindows)
self.params = dict(params)
self.bashPath = None
# Configuration files to look for when discovering test suites.
self.config_prefix = config_prefix or 'lit'
self.config_name = '%s.cfg' % (self.config_prefix,)
self.site_config_name = '%s.site.cfg' % (self.config_prefix,)
self.local_config_name = '%s.local.cfg' % (self.config_prefix,)
self.numErrors = 0
self.numWarnings = 0
self.valgrindArgs = []
if self.useValgrind:
self.valgrindArgs = ['valgrind', '-q', '--run-libc-freeres=no',
'--tool=memcheck', '--trace-children=yes',
'--error-exitcode=123']
if self.valgrindLeakCheck:
self.valgrindArgs.append('--leak-check=full')
else:
# The default is 'summary'.
self.valgrindArgs.append('--leak-check=no')
self.valgrindArgs.extend(self.valgrindUserArgs)
+ self.timeout = timeout
def load_config(self, config, path):
"""load_config(config, path) - Load a config object from an alternate
path."""
if self.debug:
self.note('load_config from %r' % path)
config.load_from_path(path, self)
return config
def getBashPath(self):
"""getBashPath - Get the path to 'bash'"""
if self.bashPath is not None:
return self.bashPath
self.bashPath = lit.util.which('bash', os.pathsep.join(self.path))
if self.bashPath is None:
self.bashPath = lit.util.which('bash')
if self.bashPath is None:
self.warning("Unable to find 'bash'.")
self.bashPath = ''
return self.bashPath
def getToolsPath(self, dir, paths, tools):
if dir is not None and os.path.isabs(dir) and os.path.isdir(dir):
if not lit.util.checkToolsPath(dir, tools):
return None
else:
dir = lit.util.whichTools(tools, paths)
# bash
self.bashPath = lit.util.which('bash', dir)
if self.bashPath is None:
self.note("Unable to find 'bash.exe'.")
self.bashPath = ''
return dir
def _write_message(self, kind, message):
# Get the file/line where this message was generated.
f = inspect.currentframe()
# Step out of _write_message, and then out of wrapper.
f = f.f_back.f_back
file,line,_,_,_ = inspect.getframeinfo(f)
location = '%s:%d' % (os.path.basename(file), line)
sys.stderr.write('%s: %s: %s: %s\n' % (self.progname, location,
kind, message))
def note(self, message):
self._write_message('note', message)
def warning(self, message):
self._write_message('warning', message)
self.numWarnings += 1
def error(self, message):
self._write_message('error', message)
self.numErrors += 1
def fatal(self, message):
self._write_message('fatal', message)
sys.exit(2)
Index: utils/lit/lit/TestRunner.py
===================================================================
--- utils/lit/lit/TestRunner.py (revision 223618)
+++ utils/lit/lit/TestRunner.py (working copy)
@@ -1,525 +1,537 @@
from __future__ import absolute_import
import os, signal, subprocess, sys
import re
import platform
import tempfile
import lit.ShUtil as ShUtil
import lit.Test as Test
import lit.util
class InternalShellError(Exception):
def __init__(self, command, message):
self.command = command
self.message = message
kIsWindows = platform.system() == 'Windows'
# Don't use close_fds on Windows.
kUseCloseFDs = not kIsWindows
# Use temporary files to replace /dev/null on Windows.
kAvoidDevNull = kIsWindows
-def executeShCmd(cmd, cfg, cwd, results):
+def executeShCmd(cmd, cfg, cwd, results, watchdog=None):
if isinstance(cmd, ShUtil.Seq):
if cmd.op == ';':
- res = executeShCmd(cmd.lhs, cfg, cwd, results)
- return executeShCmd(cmd.rhs, cfg, cwd, results)
+ res = executeShCmd(cmd.lhs, cfg, cwd, results, watchdog)
+ return executeShCmd(cmd.rhs, cfg, cwd, results, watchdog)
if cmd.op == '&':
raise InternalShellError(cmd,"unsupported shell operator: '&'")
if cmd.op == '||':
- res = executeShCmd(cmd.lhs, cfg, cwd, results)
+ res = executeShCmd(cmd.lhs, cfg, cwd, results, watchdog)
if res != 0:
- res = executeShCmd(cmd.rhs, cfg, cwd, results)
+ res = executeShCmd(cmd.rhs, cfg, cwd, results, watchdog)
return res
if cmd.op == '&&':
- res = executeShCmd(cmd.lhs, cfg, cwd, results)
+ res = executeShCmd(cmd.lhs, cfg, cwd, results, watchdog)
if res is None:
return res
if res == 0:
- res = executeShCmd(cmd.rhs, cfg, cwd, results)
+ res = executeShCmd(cmd.rhs, cfg, cwd, results, watchdog)
return res
raise ValueError('Unknown shell command: %r' % cmd.op)
assert isinstance(cmd, ShUtil.Pipeline)
procs = []
input = subprocess.PIPE
stderrTempFiles = []
opened_files = []
named_temp_files = []
# To avoid deadlock, we use a single stderr stream for piped
# output. This is null until we have seen some output using
# stderr.
for i,j in enumerate(cmd.commands):
# Apply the redirections, we use (N,) as a sentinel to indicate stdin,
# stdout, stderr for N equal to 0, 1, or 2 respectively. Redirects to or
# from a file are represented with a list [file, mode, file-object]
# where file-object is initially None.
redirects = [(0,), (1,), (2,)]
for r in j.redirects:
if r[0] == ('>',2):
redirects[2] = [r[1], 'w', None]
elif r[0] == ('>>',2):
redirects[2] = [r[1], 'a', None]
elif r[0] == ('>&',2) and r[1] in '012':
redirects[2] = redirects[int(r[1])]
elif r[0] == ('>&',) or r[0] == ('&>',):
redirects[1] = redirects[2] = [r[1], 'w', None]
elif r[0] == ('>',):
redirects[1] = [r[1], 'w', None]
elif r[0] == ('>>',):
redirects[1] = [r[1], 'a', None]
elif r[0] == ('<',):
redirects[0] = [r[1], 'r', None]
else:
raise InternalShellError(j,"Unsupported redirect: %r" % (r,))
# Map from the final redirections to something subprocess can handle.
final_redirects = []
for index,r in enumerate(redirects):
if r == (0,):
result = input
elif r == (1,):
if index == 0:
raise InternalShellError(j,"Unsupported redirect for stdin")
elif index == 1:
result = subprocess.PIPE
else:
result = subprocess.STDOUT
elif r == (2,):
if index != 2:
raise InternalShellError(j,"Unsupported redirect on stdout")
result = subprocess.PIPE
else:
if r[2] is None:
if kAvoidDevNull and r[0] == '/dev/null':
r[2] = tempfile.TemporaryFile(mode=r[1])
else:
r[2] = open(r[0], r[1])
# Workaround a Win32 and/or subprocess bug when appending.
#
# FIXME: Actually, this is probably an instance of PR6753.
if r[1] == 'a':
r[2].seek(0, 2)
opened_files.append(r[2])
result = r[2]
final_redirects.append(result)
stdin, stdout, stderr = final_redirects
# If stderr wants to come from stdout, but stdout isn't a pipe, then put
# stderr on a pipe and treat it as stdout.
if (stderr == subprocess.STDOUT and stdout != subprocess.PIPE):
stderr = subprocess.PIPE
stderrIsStdout = True
else:
stderrIsStdout = False
# Don't allow stderr on a PIPE except for the last
# process, this could deadlock.
#
# FIXME: This is slow, but so is deadlock.
if stderr == subprocess.PIPE and j != cmd.commands[-1]:
stderr = tempfile.TemporaryFile(mode='w+b')
stderrTempFiles.append((i, stderr))
# Resolve the executable path ourselves.
args = list(j.args)
executable = lit.util.which(args[0], cfg.environment['PATH'])
if not executable:
raise InternalShellError(j, '%r: command not found' % j.args[0])
# Replace uses of /dev/null with temporary files.
if kAvoidDevNull:
for i,arg in enumerate(args):
if arg == "/dev/null":
f = tempfile.NamedTemporaryFile(delete=False)
f.close()
named_temp_files.append(f.name)
args[i] = f.name
try:
- procs.append(subprocess.Popen(args, cwd=cwd,
- executable = executable,
- stdin = stdin,
- stdout = stdout,
- stderr = stderr,
- env = cfg.environment,
- close_fds = kUseCloseFDs))
+ p = subprocess.Popen(args, cwd=cwd,
+ executable = executable,
+ stdin = stdin,
+ stdout = stdout,
+ stderr = stderr,
+ env = cfg.environment,
+ close_fds = kUseCloseFDs)
+ procs.append(p)
+ if watchdog is not None:
+ watchdog.watch(p)
except OSError as e:
raise InternalShellError(j, 'Could not create process due to {}'.format(e))
# Immediately close stdin for any process taking stdin from us.
if stdin == subprocess.PIPE:
procs[-1].stdin.close()
procs[-1].stdin = None
# Update the current stdin source.
if stdout == subprocess.PIPE:
input = procs[-1].stdout
elif stderrIsStdout:
input = procs[-1].stderr
else:
input = subprocess.PIPE
# Explicitly close any redirected files. We need to do this now because we
# need to release any handles we may have on the temporary files (important
# on Win32, for example). Since we have already spawned the subprocess, our
# handles have already been transferred so we do not need them anymore.
for f in opened_files:
f.close()
# FIXME: There is probably still deadlock potential here. Yawn.
procData = [None] * len(procs)
procData[-1] = procs[-1].communicate()
for i in range(len(procs) - 1):
if procs[i].stdout is not None:
out = procs[i].stdout.read()
else:
out = ''
if procs[i].stderr is not None:
err = procs[i].stderr.read()
else:
err = ''
procData[i] = (out,err)
# Read stderr out of the temp files.
for i,f in stderrTempFiles:
f.seek(0, 0)
procData[i] = (procData[i][0], f.read())
def to_string(bytes):
if isinstance(bytes, str):
return bytes
return bytes.encode('utf-8')
exitCode = None
for i,(out,err) in enumerate(procData):
res = procs[i].wait()
# Detect Ctrl-C in subprocess.
if res == -signal.SIGINT:
raise KeyboardInterrupt
# Ensure the resulting output is always of string type.
try:
out = to_string(out.decode('utf-8'))
except:
out = str(out)
try:
err = to_string(err.decode('utf-8'))
except:
err = str(err)
results.append((cmd.commands[i], out, err, res))
if cmd.pipe_err:
# Python treats the exit code as a signed char.
if exitCode is None:
exitCode = res
elif res < 0:
exitCode = min(exitCode, res)
else:
exitCode = max(exitCode, res)
else:
exitCode = res
# Remove any named temporary files we created.
for f in named_temp_files:
try:
os.remove(f)
except OSError:
pass
if cmd.negate:
exitCode = not exitCode
return exitCode
def executeScriptInternal(test, litConfig, tmpBase, commands, cwd):
cmds = []
for ln in commands:
try:
cmds.append(ShUtil.ShParser(ln, litConfig.isWindows,
test.config.pipefail).parse())
except:
return lit.Test.Result(Test.FAIL, "shell parser error on: %r" % ln)
cmd = cmds[0]
for c in cmds[1:]:
cmd = ShUtil.Seq(cmd, '&&', c)
results = []
+ wd = None
try:
- exitCode = executeShCmd(cmd, test.config, cwd, results)
+ if litConfig.timeout is not None:
+ wd = lit.util.Watchdog(litConfig.timeout)
+ exitCode = executeShCmd(cmd, test.config, cwd, results, wd)
+ if wd is not None:
+ wd.cancel()
except InternalShellError:
e = sys.exc_info()[1]
exitCode = 127
results.append((e.command, '', e.message, exitCode))
out = err = ''
for i,(cmd, cmd_out,cmd_err,res) in enumerate(results):
out += 'Command %d: %s\n' % (i, ' '.join('"%s"' % s for s in cmd.args))
out += 'Command %d Result: %r\n' % (i, res)
out += 'Command %d Output:\n%s\n\n' % (i, cmd_out)
out += 'Command %d Stderr:\n%s\n\n' % (i, cmd_err)
+ if wd is not None and wd.timed_out:
+ out += "\n\nTimed out after %.2f seconds" % (litConfig.timeout)
+
return out, err, exitCode
def executeScript(test, litConfig, tmpBase, commands, cwd):
bashPath = litConfig.getBashPath();
isWin32CMDEXE = (litConfig.isWindows and not bashPath)
script = tmpBase + '.script'
if isWin32CMDEXE:
script += '.bat'
# Write script file
mode = 'w'
if litConfig.isWindows and not isWin32CMDEXE:
mode += 'b' # Avoid CRLFs when writing bash scripts.
f = open(script, mode)
if isWin32CMDEXE:
f.write('\nif %ERRORLEVEL% NEQ 0 EXIT\n'.join(commands))
else:
if test.config.pipefail:
f.write('set -o pipefail;')
f.write('{ ' + '; } &&\n{ '.join(commands) + '; }')
f.write('\n')
f.close()
if isWin32CMDEXE:
command = ['cmd','/c', script]
else:
if bashPath:
command = [bashPath, script]
else:
command = ['/bin/sh', script]
if litConfig.useValgrind:
# FIXME: Running valgrind on sh is overkill. We probably could just
# run on clang with no real loss.
command = litConfig.valgrindArgs + command
return lit.util.executeCommand(command, cwd=cwd,
- env=test.config.environment)
+ env=test.config.environment,
+ timeout=litConfig.timeout)
def parseIntegratedTestScriptCommands(source_path):
"""
parseIntegratedTestScriptCommands(source_path) -> commands
Parse the commands in an integrated test script file into a list of
(line_number, command_type, line).
"""
# This code is carefully written to be dual compatible with Python 2.5+ and
# Python 3 without requiring input files to always have valid codings. The
# trick we use is to open the file in binary mode and use the regular
# expression library to find the commands, with it scanning strings in
# Python2 and bytes in Python3.
#
# Once we find a match, we do require each script line to be decodable to
# UTF-8, so we convert the outputs to UTF-8 before returning. This way the
# remaining code can work with "strings" agnostic of the executing Python
# version.
def to_bytes(str):
# Encode to UTF-8 to get binary data.
return str.encode('utf-8')
def to_string(bytes):
if isinstance(bytes, str):
return bytes
return to_bytes(bytes)
keywords = ('RUN:', 'XFAIL:', 'REQUIRES:', 'END.')
keywords_re = re.compile(
to_bytes("(%s)(.*)\n" % ("|".join(k for k in keywords),)))
f = open(source_path, 'rb')
try:
# Read the entire file contents.
data = f.read()
# Ensure the data ends with a newline.
if not data.endswith(to_bytes('\n')):
data = data + to_bytes('\n')
# Iterate over the matches.
line_number = 1
last_match_position = 0
for match in keywords_re.finditer(data):
# Compute the updated line number by counting the intervening
# newlines.
match_position = match.start()
line_number += data.count(to_bytes('\n'), last_match_position,
match_position)
last_match_position = match_position
# Convert the keyword and line to UTF-8 strings and yield the
# command. Note that we take care to return regular strings in
# Python 2, to avoid other code having to differentiate between the
# str and unicode types.
keyword,ln = match.groups()
yield (line_number, to_string(keyword[:-1].decode('utf-8')),
to_string(ln.decode('utf-8')))
finally:
f.close()
def parseIntegratedTestScript(test, normalize_slashes=False,
extra_substitutions=[]):
"""parseIntegratedTestScript - Scan an LLVM/Clang style integrated test
script and extract the lines to 'RUN' as well as 'XFAIL' and 'REQUIRES'
information. The RUN lines also will have variable substitution performed.
"""
# Get the temporary location, this is always relative to the test suite
# root, not test source root.
#
# FIXME: This should not be here?
sourcepath = test.getSourcePath()
sourcedir = os.path.dirname(sourcepath)
execpath = test.getExecPath()
execdir,execbase = os.path.split(execpath)
tmpDir = os.path.join(execdir, 'Output')
tmpBase = os.path.join(tmpDir, execbase)
# Normalize slashes, if requested.
if normalize_slashes:
sourcepath = sourcepath.replace('\\', '/')
sourcedir = sourcedir.replace('\\', '/')
tmpDir = tmpDir.replace('\\', '/')
tmpBase = tmpBase.replace('\\', '/')
# We use #_MARKER_# to hide %% while we do the other substitutions.
substitutions = list(extra_substitutions)
substitutions.extend([('%%', '#_MARKER_#')])
substitutions.extend(test.config.substitutions)
substitutions.extend([('%s', sourcepath),
('%S', sourcedir),
('%p', sourcedir),
('%{pathsep}', os.pathsep),
('%t', tmpBase + '.tmp'),
('%T', tmpDir),
('#_MARKER_#', '%')])
# "%/[STpst]" should be normalized.
substitutions.extend([
('%/s', sourcepath.replace('\\', '/')),
('%/S', sourcedir.replace('\\', '/')),
('%/p', sourcedir.replace('\\', '/')),
('%/t', tmpBase.replace('\\', '/') + '.tmp'),
('%/T', tmpDir.replace('\\', '/')),
])
# Collect the test lines from the script.
script = []
requires = []
for line_number, command_type, ln in \
parseIntegratedTestScriptCommands(sourcepath):
if command_type == 'RUN':
# Trim trailing whitespace.
ln = ln.rstrip()
# Substitute line number expressions
ln = re.sub('%\(line\)', str(line_number), ln)
def replace_line_number(match):
if match.group(1) == '+':
return str(line_number + int(match.group(2)))
if match.group(1) == '-':
return str(line_number - int(match.group(2)))
ln = re.sub('%\(line *([\+-]) *(\d+)\)', replace_line_number, ln)
# Collapse lines with trailing '\\'.
if script and script[-1][-1] == '\\':
script[-1] = script[-1][:-1] + ln
else:
script.append(ln)
elif command_type == 'XFAIL':
test.xfails.extend([s.strip() for s in ln.split(',')])
elif command_type == 'REQUIRES':
requires.extend([s.strip() for s in ln.split(',')])
elif command_type == 'END':
# END commands are only honored if the rest of the line is empty.
if not ln.strip():
break
else:
raise ValueError("unknown script command type: %r" % (
command_type,))
# Apply substitutions to the script. Allow full regular
# expression syntax. Replace each matching occurrence of regular
# expression pattern a with substitution b in line ln.
def processLine(ln):
# Apply substitutions
for a,b in substitutions:
if kIsWindows:
b = b.replace("\\","\\\\")
ln = re.sub(a, b, ln)
# Strip the trailing newline and any extra whitespace.
return ln.strip()
script = [processLine(ln)
for ln in script]
# Verify the script contains a run line.
if not script:
return lit.Test.Result(Test.UNRESOLVED, "Test has no run line!")
# Check for unterminated run lines.
if script[-1][-1] == '\\':
return lit.Test.Result(Test.UNRESOLVED,
"Test has unterminated run lines (with '\\')")
# Check that we have the required features:
missing_required_features = [f for f in requires
if f not in test.config.available_features]
if missing_required_features:
msg = ', '.join(missing_required_features)
return lit.Test.Result(Test.UNSUPPORTED,
"Test requires the following features: %s" % msg)
return script,tmpBase,execdir
def executeShTest(test, litConfig, useExternalSh,
extra_substitutions=[]):
if test.config.unsupported:
return (Test.UNSUPPORTED, 'Test is unsupported')
res = parseIntegratedTestScript(test, useExternalSh, extra_substitutions)
if isinstance(res, lit.Test.Result):
return res
if litConfig.noExecute:
return lit.Test.Result(Test.PASS)
script, tmpBase, execdir = res
# Create the output directory if it does not already exist.
lit.util.mkdir_p(os.path.dirname(tmpBase))
if useExternalSh:
res = executeScript(test, litConfig, tmpBase, script, execdir)
else:
res = executeScriptInternal(test, litConfig, tmpBase, script, execdir)
if isinstance(res, lit.Test.Result):
return res
out,err,exitCode = res
if exitCode == 0:
status = Test.PASS
else:
status = Test.FAIL
# Form the output log.
output = """Script:\n--\n%s\n--\nExit Code: %d\n\n""" % (
'\n'.join(script), exitCode)
# Append the outputs, if present.
if out:
output += """Command Output (stdout):\n--\n%s\n--\n""" % (out,)
if err:
output += """Command Output (stderr):\n--\n%s\n--\n""" % (err,)
return lit.Test.Result(status, output)
Index: utils/lit/lit/formats/base.py
===================================================================
--- utils/lit/lit/formats/base.py (revision 223618)
+++ utils/lit/lit/formats/base.py (working copy)
@@ -1,118 +1,118 @@
from __future__ import absolute_import
import os
import sys
import lit.Test
import lit.util
class TestFormat(object):
pass
###
class FileBasedTest(TestFormat):
def getTestsInDirectory(self, testSuite, path_in_suite,
litConfig, localConfig):
source_path = testSuite.getSourcePath(path_in_suite)
for filename in os.listdir(source_path):
# Ignore dot files and excluded tests.
if (filename.startswith('.') or
filename in localConfig.excludes):
continue
filepath = os.path.join(source_path, filename)
if not os.path.isdir(filepath):
base,ext = os.path.splitext(filename)
if ext in localConfig.suffixes:
yield lit.Test.Test(testSuite, path_in_suite + (filename,),
localConfig)
###
import re
import tempfile
class OneCommandPerFileTest(TestFormat):
# FIXME: Refactor into generic test for running some command on a directory
# of inputs.
def __init__(self, command, dir, recursive=False,
pattern=".*", useTempInput=False):
if isinstance(command, str):
self.command = [command]
else:
self.command = list(command)
if dir is not None:
dir = str(dir)
self.dir = dir
self.recursive = bool(recursive)
self.pattern = re.compile(pattern)
self.useTempInput = useTempInput
def getTestsInDirectory(self, testSuite, path_in_suite,
litConfig, localConfig):
dir = self.dir
if dir is None:
dir = testSuite.getSourcePath(path_in_suite)
for dirname,subdirs,filenames in os.walk(dir):
if not self.recursive:
subdirs[:] = []
subdirs[:] = [d for d in subdirs
if (d != '.svn' and
d not in localConfig.excludes)]
for filename in filenames:
if (filename.startswith('.') or
not self.pattern.match(filename) or
filename in localConfig.excludes):
continue
path = os.path.join(dirname,filename)
suffix = path[len(dir):]
if suffix.startswith(os.sep):
suffix = suffix[1:]
test = lit.Test.Test(
testSuite, path_in_suite + tuple(suffix.split(os.sep)),
localConfig)
# FIXME: Hack?
test.source_path = path
yield test
def createTempInput(self, tmp, test):
abstract
def execute(self, test, litConfig):
if test.config.unsupported:
return (lit.Test.UNSUPPORTED, 'Test is unsupported')
cmd = list(self.command)
# If using temp input, create a temporary file and hand it to the
# subclass.
if self.useTempInput:
tmp = tempfile.NamedTemporaryFile(suffix='.cpp')
self.createTempInput(tmp, test)
tmp.flush()
cmd.append(tmp.name)
elif hasattr(test, 'source_path'):
cmd.append(test.source_path)
else:
cmd.append(test.getSourcePath())
- out, err, exitCode = lit.util.executeCommand(cmd)
+ out, err, exitCode = lit.util.executeCommand(cmd, timeout=litConfig.timeout)
diags = out + err
if not exitCode and not diags.strip():
return lit.Test.PASS,''
# Try to include some useful information.
report = """Command: %s\n""" % ' '.join(["'%s'" % a
for a in cmd])
if self.useTempInput:
report += """Temporary File: %s\n""" % tmp.name
report += "--\n%s--\n""" % open(tmp.name).read()
report += """Output:\n--\n%s--""" % diags
return lit.Test.FAIL, report
Index: utils/lit/lit/main.py
===================================================================
--- utils/lit/lit/main.py (revision 223618)
+++ utils/lit/lit/main.py (working copy)
@@ -1,473 +1,477 @@
#!/usr/bin/env python
"""
lit - LLVM Integrated Tester.
See lit.pod for more information.
"""
from __future__ import absolute_import
import math, os, platform, random, re, sys, time
import lit.ProgressBar
import lit.LitConfig
import lit.Test
import lit.run
import lit.util
import lit.discovery
class TestingProgressDisplay(object):
def __init__(self, opts, numTests, progressBar=None):
self.opts = opts
self.numTests = numTests
self.current = None
self.progressBar = progressBar
self.completed = 0
def finish(self):
if self.progressBar:
self.progressBar.clear()
elif self.opts.quiet:
pass
elif self.opts.succinct:
sys.stdout.write('\n')
def update(self, test):
self.completed += 1
if self.opts.incremental:
update_incremental_cache(test)
if self.progressBar:
self.progressBar.update(float(self.completed)/self.numTests,
test.getFullName())
shouldShow = test.result.code.isFailure or \
(not self.opts.quiet and not self.opts.succinct)
if not shouldShow:
return
if self.progressBar:
self.progressBar.clear()
# Show the test result line.
test_name = test.getFullName()
print('%s: %s (%d of %d)' % (test.result.code.name, test_name,
self.completed, self.numTests))
# Show the test failure output, if requested.
if test.result.code.isFailure and self.opts.showOutput:
print("%s TEST '%s' FAILED %s" % ('*'*20, test.getFullName(),
'*'*20))
print(test.result.output)
print("*" * 20)
# Report test metrics, if present.
if test.result.metrics:
print("%s TEST '%s' RESULTS %s" % ('*'*10, test.getFullName(),
'*'*10))
items = sorted(test.result.metrics.items())
for metric_name, value in items:
print('%s: %s ' % (metric_name, value.format()))
print("*" * 10)
# Ensure the output is flushed.
sys.stdout.flush()
def write_test_results(run, lit_config, testing_time, output_path):
try:
import json
except ImportError:
lit_config.fatal('test output unsupported with Python 2.5')
# Construct the data we will write.
data = {}
# Encode the current lit version as a schema version.
data['__version__'] = lit.__versioninfo__
data['elapsed'] = testing_time
# FIXME: Record some information on the lit configuration used?
# FIXME: Record information from the individual test suites?
# Encode the tests.
data['tests'] = tests_data = []
for test in run.tests:
test_data = {
'name' : test.getFullName(),
'code' : test.result.code.name,
'output' : test.result.output,
'elapsed' : test.result.elapsed }
# Add test metrics, if present.
if test.result.metrics:
test_data['metrics'] = metrics_data = {}
for key, value in test.result.metrics.items():
metrics_data[key] = value.todata()
tests_data.append(test_data)
# Write the output.
f = open(output_path, 'w')
try:
json.dump(data, f, indent=2, sort_keys=True)
f.write('\n')
finally:
f.close()
def update_incremental_cache(test):
if not test.result.code.isFailure:
return
fname = test.getFilePath()
os.utime(fname, None)
def sort_by_incremental_cache(run):
def sortIndex(test):
fname = test.getFilePath()
try:
return -os.path.getmtime(fname)
except:
return 0
run.tests.sort(key = lambda t: sortIndex(t))
def main(builtinParameters = {}):
# Use processes by default on Unix platforms.
isWindows = platform.system() == 'Windows'
useProcessesIsDefault = not isWindows
global options
from optparse import OptionParser, OptionGroup
parser = OptionParser("usage: %prog [options] {file-or-path}")
parser.add_option("", "--version", dest="show_version",
help="Show version and exit",
action="store_true", default=False)
parser.add_option("-j", "--threads", dest="numThreads", metavar="N",
help="Number of testing threads",
type=int, action="store", default=None)
parser.add_option("", "--config-prefix", dest="configPrefix",
metavar="NAME", help="Prefix for 'lit' config files",
action="store", default=None)
parser.add_option("", "--param", dest="userParameters",
metavar="NAME=VAL",
help="Add 'NAME' = 'VAL' to the user defined parameters",
type=str, action="append", default=[])
group = OptionGroup(parser, "Output Format")
# FIXME: I find these names very confusing, although I like the
# functionality.
group.add_option("-q", "--quiet", dest="quiet",
help="Suppress no error output",
action="store_true", default=False)
group.add_option("-s", "--succinct", dest="succinct",
help="Reduce amount of output",
action="store_true", default=False)
group.add_option("-v", "--verbose", dest="showOutput",
help="Show all test output",
action="store_true", default=False)
group.add_option("-o", "--output", dest="output_path",
help="Write test results to the provided path",
action="store", type=str, metavar="PATH")
group.add_option("", "--no-progress-bar", dest="useProgressBar",
help="Do not use curses based progress bar",
action="store_false", default=True)
group.add_option("", "--show-unsupported", dest="show_unsupported",
help="Show unsupported tests",
action="store_true", default=False)
group.add_option("", "--show-xfail", dest="show_xfail",
help="Show tests that were expected to fail",
action="store_true", default=False)
parser.add_option_group(group)
group = OptionGroup(parser, "Test Execution")
group.add_option("", "--path", dest="path",
help="Additional paths to add to testing environment",
action="append", type=str, default=[])
group.add_option("", "--vg", dest="useValgrind",
help="Run tests under valgrind",
action="store_true", default=False)
group.add_option("", "--vg-leak", dest="valgrindLeakCheck",
help="Check for memory leaks under valgrind",
action="store_true", default=False)
group.add_option("", "--vg-arg", dest="valgrindArgs", metavar="ARG",
help="Specify an extra argument for valgrind",
type=str, action="append", default=[])
group.add_option("", "--time-tests", dest="timeTests",
help="Track elapsed wall time for each test",
action="store_true", default=False)
group.add_option("", "--no-execute", dest="noExecute",
help="Don't execute any tests (assume PASS)",
action="store_true", default=False)
group.add_option("", "--xunit-xml-output", dest="xunit_output_file",
help=("Write XUnit-compatible XML test reports to the"
" specified file"), default=None)
parser.add_option_group(group)
group = OptionGroup(parser, "Test Selection")
group.add_option("", "--max-tests", dest="maxTests", metavar="N",
help="Maximum number of tests to run",
action="store", type=int, default=None)
group.add_option("", "--max-time", dest="maxTime", metavar="N",
help="Maximum time to spend testing (in seconds)",
action="store", type=float, default=None)
+ group.add_option("", "--timeout", dest="timeout", metavar="N",
+ help="Maximum time to spend in any given test (in seconds)",
+ action="store", type=float, default=None)
group.add_option("", "--shuffle", dest="shuffle",
help="Run tests in random order",
action="store_true", default=False)
group.add_option("-i", "--incremental", dest="incremental",
help="Run modified and failing tests first (updates "
"mtimes)",
action="store_true", default=False)
group.add_option("", "--filter", dest="filter", metavar="REGEX",
help=("Only run tests with paths matching the given "
"regular expression"),
action="store", default=None)
parser.add_option_group(group)
group = OptionGroup(parser, "Debug and Experimental Options")
group.add_option("", "--debug", dest="debug",
help="Enable debugging (for 'lit' development)",
action="store_true", default=False)
group.add_option("", "--show-suites", dest="showSuites",
help="Show discovered test suites",
action="store_true", default=False)
group.add_option("", "--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
group.add_option("", "--use-processes", dest="useProcesses",
help="Run tests in parallel with processes (not threads)",
action="store_true", default=useProcessesIsDefault)
group.add_option("", "--use-threads", dest="useProcesses",
help="Run tests in parallel with threads (not processes)",
action="store_false", default=useProcessesIsDefault)
parser.add_option_group(group)
(opts, args) = parser.parse_args()
if opts.show_version:
print("lit %s" % (lit.__version__,))
return
if not args:
parser.error('No inputs specified')
if opts.numThreads is None:
# Python <2.5 has a race condition causing lit to always fail with numThreads>1
# http://bugs.python.org/issue1731717
# I haven't seen this bug occur with 2.5.2 and later, so only enable multiple
# threads by default there.
if sys.hexversion >= 0x2050200:
opts.numThreads = lit.util.detectCPUs()
else:
opts.numThreads = 1
inputs = args
# Create the user defined parameters.
userParams = dict(builtinParameters)
for entry in opts.userParameters:
if '=' not in entry:
name,val = entry,''
else:
name,val = entry.split('=', 1)
userParams[name] = val
# Create the global config object.
litConfig = lit.LitConfig.LitConfig(
progname = os.path.basename(sys.argv[0]),
path = opts.path,
quiet = opts.quiet,
useValgrind = opts.useValgrind,
valgrindLeakCheck = opts.valgrindLeakCheck,
valgrindArgs = opts.valgrindArgs,
noExecute = opts.noExecute,
debug = opts.debug,
isWindows = isWindows,
params = userParams,
- config_prefix = opts.configPrefix)
+ config_prefix = opts.configPrefix,
+ timeout = opts.timeout)
# Perform test discovery.
run = lit.run.Run(litConfig,
lit.discovery.find_tests_for_inputs(litConfig, inputs))
if opts.showSuites or opts.showTests:
# Aggregate the tests by suite.
suitesAndTests = {}
for result_test in run.tests:
if result_test.suite not in suitesAndTests:
suitesAndTests[result_test.suite] = []
suitesAndTests[result_test.suite].append(result_test)
suitesAndTests = list(suitesAndTests.items())
suitesAndTests.sort(key = lambda item: item[0].name)
# Show the suites, if requested.
if opts.showSuites:
print('-- Test Suites --')
for ts,ts_tests in suitesAndTests:
print(' %s - %d tests' %(ts.name, len(ts_tests)))
print(' Source Root: %s' % ts.source_root)
print(' Exec Root : %s' % ts.exec_root)
# Show the tests, if requested.
if opts.showTests:
print('-- Available Tests --')
for ts,ts_tests in suitesAndTests:
ts_tests.sort(key = lambda test: test.path_in_suite)
for test in ts_tests:
print(' %s' % (test.getFullName(),))
# Exit.
sys.exit(0)
# Select and order the tests.
numTotalTests = len(run.tests)
# First, select based on the filter expression if given.
if opts.filter:
try:
rex = re.compile(opts.filter)
except:
parser.error("invalid regular expression for --filter: %r" % (
opts.filter))
run.tests = [result_test for result_test in run.tests
if rex.search(result_test.getFullName())]
# Then select the order.
if opts.shuffle:
random.shuffle(run.tests)
elif opts.incremental:
sort_by_incremental_cache(run)
else:
run.tests.sort(key = lambda result_test: result_test.getFullName())
# Finally limit the number of tests, if desired.
if opts.maxTests is not None:
run.tests = run.tests[:opts.maxTests]
# Don't create more threads than tests.
opts.numThreads = min(len(run.tests), opts.numThreads)
extra = ''
if len(run.tests) != numTotalTests:
extra = ' of %d' % numTotalTests
header = '-- Testing: %d%s tests, %d threads --'%(len(run.tests), extra,
opts.numThreads)
progressBar = None
if not opts.quiet:
if opts.succinct and opts.useProgressBar:
try:
tc = lit.ProgressBar.TerminalController()
progressBar = lit.ProgressBar.ProgressBar(tc, header)
except ValueError:
print(header)
progressBar = lit.ProgressBar.SimpleProgressBar('Testing: ')
else:
print(header)
startTime = time.time()
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
run.execute_tests(display, opts.numThreads, opts.maxTime,
opts.useProcesses)
except KeyboardInterrupt:
sys.exit(2)
display.finish()
testing_time = time.time() - startTime
if not opts.quiet:
print('Testing Time: %.2fs' % (testing_time,))
# Write out the test data, if requested.
if opts.output_path is not None:
write_test_results(run, litConfig, testing_time, opts.output_path)
# List test results organized by kind.
hasFailures = False
byCode = {}
for test in run.tests:
if test.result.code not in byCode:
byCode[test.result.code] = []
byCode[test.result.code].append(test)
if test.result.code.isFailure:
hasFailures = True
# Print each test in any of the failing groups.
for title,code in (('Unexpected Passing Tests', lit.Test.XPASS),
('Failing Tests', lit.Test.FAIL),
('Unresolved Tests', lit.Test.UNRESOLVED),
('Unsupported Tests', lit.Test.UNSUPPORTED),
('Expected Failing Tests', lit.Test.XFAIL)):
if (lit.Test.XFAIL == code and not opts.show_xfail) or \
(lit.Test.UNSUPPORTED == code and not opts.show_unsupported):
continue
elts = byCode.get(code)
if not elts:
continue
print('*'*20)
print('%s (%d):' % (title, len(elts)))
for test in elts:
print(' %s' % test.getFullName())
sys.stdout.write('\n')
if opts.timeTests and run.tests:
# Order by time.
test_times = [(test.getFullName(), test.result.elapsed)
for test in run.tests]
lit.util.printHistogram(test_times, title='Tests')
for name,code in (('Expected Passes ', lit.Test.PASS),
('Expected Failures ', lit.Test.XFAIL),
('Unsupported Tests ', lit.Test.UNSUPPORTED),
('Unresolved Tests ', lit.Test.UNRESOLVED),
('Unexpected Passes ', lit.Test.XPASS),
('Unexpected Failures', lit.Test.FAIL)):
if opts.quiet and not code.isFailure:
continue
N = len(byCode.get(code,[]))
if N:
print(' %s: %d' % (name,N))
if opts.xunit_output_file:
# Collect the tests, indexed by test suite
by_suite = {}
for result_test in run.tests:
suite = result_test.suite.config.name
if suite not in by_suite:
by_suite[suite] = {
'passes' : 0,
'failures' : 0,
'tests' : [] }
by_suite[suite]['tests'].append(result_test)
if result_test.result.code.isFailure:
by_suite[suite]['failures'] += 1
else:
by_suite[suite]['passes'] += 1
xunit_output_file = open(opts.xunit_output_file, "w")
xunit_output_file.write("<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n")
xunit_output_file.write("<testsuites>\n")
for suite_name, suite in by_suite.items():
safe_suite_name = suite_name.replace(".", "-")
xunit_output_file.write("<testsuite name='" + safe_suite_name + "'")
xunit_output_file.write(" tests='" + str(suite['passes'] +
suite['failures']) + "'")
xunit_output_file.write(" failures='" + str(suite['failures']) +
"'>\n")
for result_test in suite['tests']:
xunit_output_file.write(result_test.getJUnitXML() + "\n")
xunit_output_file.write("</testsuite>\n")
xunit_output_file.write("</testsuites>")
xunit_output_file.close()
# If we encountered any additional errors, exit abnormally.
if litConfig.numErrors:
sys.stderr.write('\n%d error(s), exiting.\n' % litConfig.numErrors)
sys.exit(2)
# Warn about warnings.
if litConfig.numWarnings:
sys.stderr.write('\n%d warning(s) in tests.\n' % litConfig.numWarnings)
if hasFailures:
sys.exit(1)
sys.exit(0)
if __name__=='__main__':
main()
Index: utils/lit/lit/util.py
===================================================================
--- utils/lit/lit/util.py (revision 223618)
+++ utils/lit/lit/util.py (working copy)
@@ -1,191 +1,232 @@
import errno
import itertools
import math
import os
import platform
import signal
import subprocess
import sys
+import threading
+
+class Watchdog(object):
+ """
+ Watches a Popen call, and kills it after a timeout.
+ """
+ def __init__(self, timeout):
+ self.timed_out = False
+ self.timeout = timeout
+ self.popens = []
+ self.timer = threading.Timer(timeout, self.handler)
+ self.timer.start()
+
+ def handler(self):
+ self.timed_out = True
+ for p in self.popens:
+ try:
+ p.kill()
+ except OSError, e:
+ # The Popen already terminated before the watchdog
+ # got a chance to kill it. Too bad they couldn't be
+ # friends.
+ pass
+
+ def watch(self, popen):
+ self.popens.append(popen)
+
+ def cancel(self):
+ if self.timer is not None:
+ self.timer.cancel()
+
def detectCPUs():
"""
Detects the number of CPUs on a system. Cribbed from pp.
"""
# Linux, Unix and MacOS:
if hasattr(os, "sysconf"):
if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
# Linux & Unix:
ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
if isinstance(ncpus, int) and ncpus > 0:
return ncpus
else: # OSX:
return int(capture(['sysctl', '-n', 'hw.ncpu']))
# Windows:
if "NUMBER_OF_PROCESSORS" in os.environ:
ncpus = int(os.environ["NUMBER_OF_PROCESSORS"])
if ncpus > 0:
return ncpus
return 1 # Default
def mkdir_p(path):
"""mkdir_p(path) - Make the "path" directory, if it does not exist; this
will also make directories for any missing parent directories."""
if not path or os.path.exists(path):
return
parent = os.path.dirname(path)
if parent != path:
mkdir_p(parent)
try:
os.mkdir(path)
except OSError:
e = sys.exc_info()[1]
# Ignore EEXIST, which may occur during a race condition.
if e.errno != errno.EEXIST:
raise
def capture(args, env=None):
"""capture(command) - Run the given command (or argv list) in a shell and
return the standard output."""
p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
env=env)
out,_ = p.communicate()
return out
def which(command, paths = None):
"""which(command, [paths]) - Look up the given command in the paths string
(or the PATH environment variable, if unspecified)."""
if paths is None:
paths = os.environ.get('PATH','')
# Check for absolute match first.
if os.path.isfile(command):
return command
# Would be nice if Python had a lib function for this.
if not paths:
paths = os.defpath
# Get suffixes to search.
# On Cygwin, 'PATHEXT' may exist but it should not be used.
if os.pathsep == ';':
pathext = os.environ.get('PATHEXT', '').split(';')
else:
pathext = ['']
# Search the paths...
for path in paths.split(os.pathsep):
for ext in pathext:
p = os.path.join(path, command + ext)
if os.path.exists(p):
return p
return None
def checkToolsPath(dir, tools):
for tool in tools:
if not os.path.exists(os.path.join(dir, tool)):
return False;
return True;
def whichTools(tools, paths):
for path in paths.split(os.pathsep):
if checkToolsPath(path, tools):
return path
return None
def printHistogram(items, title = 'Items'):
items.sort(key = lambda item: item[1])
maxValue = max([v for _,v in items])
# Select first "nice" bar height that produces more than 10 bars.
power = int(math.ceil(math.log(maxValue, 10)))
for inc in itertools.cycle((5, 2, 2.5, 1)):
barH = inc * 10**power
N = int(math.ceil(maxValue / barH))
if N > 10:
break
elif inc == 1:
power -= 1
histo = [set() for i in range(N)]
for name,v in items:
bin = min(int(N * v/maxValue), N-1)
histo[bin].add(name)
barW = 40
hr = '-' * (barW + 34)
print('\nSlowest %s:' % title)
print(hr)
for name,value in items[-20:]:
print('%.2fs: %s' % (value, name))
print('\n%s Times:' % title)
print(hr)
pDigits = int(math.ceil(math.log(maxValue, 10)))
pfDigits = max(0, 3-pDigits)
if pfDigits:
pDigits += pfDigits + 1
cDigits = int(math.ceil(math.log(len(items), 10)))
print("[%s] :: [%s] :: [%s]" % ('Range'.center((pDigits+1)*2 + 3),
'Percentage'.center(barW),
'Count'.center(cDigits*2 + 1)))
print(hr)
for i,row in enumerate(histo):
pct = float(len(row)) / len(items)
w = int(barW * pct)
print("[%*.*fs,%*.*fs) :: [%s%s] :: [%*d/%*d]" % (
pDigits, pfDigits, i*barH, pDigits, pfDigits, (i+1)*barH,
'*'*w, ' '*(barW-w), cDigits, len(row), cDigits, len(items)))
# Close extra file handles on UNIX (on Windows this cannot be done while
# also redirecting input).
kUseCloseFDs = not (platform.system() == 'Windows')
-def executeCommand(command, cwd=None, env=None):
+def executeCommand(command, cwd=None, env=None, timeout=None):
p = subprocess.Popen(command, cwd=cwd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env, close_fds=kUseCloseFDs)
+
+ if timeout is not None:
+ wd = Watchdog(timeout)
+ wd.watch(p)
+
out,err = p.communicate()
exitCode = p.wait()
+ if timeout is not None:
+ wd.cancel()
+ if wd.timed_out:
+ err += "\n\nTimed out after %.2f seconds" % (timeout)
+
# Detect Ctrl-C in subprocess.
if exitCode == -signal.SIGINT:
raise KeyboardInterrupt
def to_string(bytes):
if isinstance(bytes, str):
return bytes
return bytes.encode('utf-8')
# Ensure the resulting output is always of string type.
try:
out = to_string(out.decode('utf-8'))
except:
out = str(out)
try:
err = to_string(err.decode('utf-8'))
except:
err = str(err)
return out, err, exitCode
def usePlatformSdkOnDarwin(config, lit_config):
# On Darwin, support relocatable SDKs by providing Clang with a
# default system root path.
if 'darwin' in config.target_triple:
try:
cmd = subprocess.Popen(['xcrun', '--show-sdk-path'],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = cmd.communicate()
out = out.strip()
res = cmd.wait()
except OSError:
res = -1
if res == 0 and out:
sdk_path = out
lit_config.note('using SDKROOT: %r' % sdk_path)
config.environment['SDKROOT'] = sdk_path
Index: utils/lit/tests/Inputs/timeout/infloop.py
===================================================================
--- utils/lit/tests/Inputs/timeout/infloop.py (revision 0)
+++ utils/lit/tests/Inputs/timeout/infloop.py (working copy)
@@ -0,0 +1,8 @@
+# RUN: %{python} %s
+# XFAIL: *
+
+import sys
+
+print "infinite loop"
+while True:
+ pass
Index: utils/lit/tests/Inputs/timeout/lit.cfg
===================================================================
--- utils/lit/tests/Inputs/timeout/lit.cfg (revision 0)
+++ utils/lit/tests/Inputs/timeout/lit.cfg (working copy)
@@ -0,0 +1,16 @@
+# -*- Python -*-
+
+import os
+import sys
+
+import lit.formats
+
+config.name = 'timeout'
+config.test_format = lit.formats.ShTest(execute_external=False)
+config.suffixes = ['.py']
+config.test_source_root = os.path.dirname(__file__)
+config.test_exec_root = config.test_source_root
+config.target_triple = '(unused)'
+src_root = os.path.join(config.test_source_root, '..')
+config.environment['PYTHONPATH'] = src_root
+config.substitutions.append(('%{python}', sys.executable))
Index: utils/lit/tests/Inputs/timeout/short.py
===================================================================
--- utils/lit/tests/Inputs/timeout/short.py (revision 0)
+++ utils/lit/tests/Inputs/timeout/short.py (working copy)
@@ -0,0 +1,5 @@
+# RUN: %{python} %s
+
+import sys
+
+print "short program"
Index: utils/lit/tests/timeout.py
===================================================================
--- utils/lit/tests/timeout.py (revision 0)
+++ utils/lit/tests/timeout.py (working copy)
@@ -0,0 +1,9 @@
+# RUN: %{lit} \
+# RUN: %{inputs}/timeout/infloop.py \
+# RUN: %{inputs}/timeout/short.py \
+# RUN: -j 1 -v --debug --timeout 0.1 > %t.out 2> %t.err
+# RUN: FileCheck --check-prefix=CHECK-OUT < %t.out %s
+#
+
+# CHECK-OUT: XFAIL: timeout :: infloop.py
+# CHECK-OUT: PASS: timeout :: short.py
More information about the llvm-commits
mailing list