[llvm] r299560 - [lit] Use process pools for test execution by default

Reid Kleckner via llvm-commits llvm-commits at lists.llvm.org
Wed Apr 5 09:44:57 PDT 2017


Author: rnk
Date: Wed Apr  5 11:44:56 2017
New Revision: 299560

URL: http://llvm.org/viewvc/llvm-project?rev=299560&view=rev
Log:
[lit] Use process pools for test execution by default

Summary:
This drastically reduces lit test execution startup time on Windows. Our
previous strategy was to manually create one Process per job and manage
the worker pool ourselves. Instead, let's use the worker pool provided
by multiprocessing.  multiprocessing.Pool(jobs) returns almost
immediately, and initializes the appropriate number of workers, so they
can all start executing tests immediately. This avoids the ramp-up
period that the old implementation suffers from.  This appears to speed
up small test runs.

Here are some timings of the llvm-readobj tests on Windows using the
various execution strategies:

 # multiprocessing.Pool:
$ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-process-pool |& grep real: ; done
real: 0m1.156s
real: 0m1.078s
real: 0m1.094s

 # multiprocessing.Process:
$ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-processes |& grep real: ; done
real: 0m6.062s
real: 0m5.860s
real: 0m5.984s

 # threading.Thread:
$ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-threads |& grep real: ; done
real: 0m9.438s
real: 0m10.765s
real: 0m11.079s

I kept the old code to launch processes in case this change doesn't work
on all platforms that LLVM supports, but at some point I would like to
remove both the threading and old multiprocessing execution strategies.

Reviewers: modocache, rafael

Subscribers: llvm-commits

Differential Revision: https://reviews.llvm.org/D31677

Modified:
    llvm/trunk/utils/lit/lit/main.py
    llvm/trunk/utils/lit/lit/run.py

Modified: llvm/trunk/utils/lit/lit/main.py
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/utils/lit/lit/main.py?rev=299560&r1=299559&r2=299560&view=diff
==============================================================================
--- llvm/trunk/utils/lit/lit/main.py (original)
+++ llvm/trunk/utils/lit/lit/main.py Wed Apr  5 11:44:56 2017
@@ -278,12 +278,15 @@ def main_with_tmp(builtinParameters):
     debug_group.add_argument("--show-tests", dest="showTests",
                       help="Show all discovered tests",
                       action="store_true", default=False)
-    debug_group.add_argument("--use-processes", dest="useProcesses",
+    debug_group.add_argument("--use-process-pool", dest="executionStrategy",
+                      help="Run tests in parallel with a process pool",
+                      action="store_const", const="PROCESS_POOL")
+    debug_group.add_argument("--use-processes", dest="executionStrategy",
                       help="Run tests in parallel with processes (not threads)",
-                      action="store_true", default=True)
-    debug_group.add_argument("--use-threads", dest="useProcesses",
+                      action="store_const", const="PROCESSES")
+    debug_group.add_argument("--use-threads", dest="executionStrategy",
                       help="Run tests in parallel with threads (not processes)",
-                      action="store_false", default=True)
+                      action="store_const", const="THREADS")
 
     opts = parser.parse_args()
     args = opts.test_paths
@@ -298,6 +301,9 @@ def main_with_tmp(builtinParameters):
     if opts.numThreads is None:
         opts.numThreads = lit.util.detectCPUs()
 
+    if opts.executionStrategy is None:
+        opts.executionStrategy = 'PROCESS_POOL'
+
     if opts.maxFailures == 0:
         parser.error("Setting --max-failures to 0 does not have any effect.")
 
@@ -481,7 +487,7 @@ def main_with_tmp(builtinParameters):
     display = TestingProgressDisplay(opts, len(run.tests), progressBar)
     try:
         run.execute_tests(display, opts.numThreads, opts.maxTime,
-                          opts.useProcesses)
+                          opts.executionStrategy)
     except KeyboardInterrupt:
         sys.exit(2)
     display.finish()

Modified: llvm/trunk/utils/lit/lit/run.py
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/utils/lit/lit/run.py?rev=299560&r1=299559&r2=299560&view=diff
==============================================================================
--- llvm/trunk/utils/lit/lit/run.py (original)
+++ llvm/trunk/utils/lit/lit/run.py Wed Apr  5 11:44:56 2017
@@ -1,4 +1,5 @@
 import os
+import sys
 import threading
 import time
 import traceback
@@ -84,11 +85,13 @@ class Tester(object):
     def run_test(self, test_index):
         test = self.run_instance.tests[test_index]
         try:
-            self.run_instance.execute_test(test)
+            execute_test(test, self.run_instance.lit_config,
+                         self.run_instance.parallelism_semaphores)
         except KeyboardInterrupt:
             # This is a sad hack. Unfortunately subprocess goes
             # bonkers with ctrl-c and we start forking merrily.
             print('\nCtrl-C detected, goodbye.')
+            sys.stdout.flush()
             os.kill(0,9)
         self.consumer.update(test_index, test)
 
@@ -167,6 +170,44 @@ class _Display(object):
 def handleFailures(provider, consumer, maxFailures):
     consumer.display = _Display(consumer.display, provider, maxFailures)
 
+def execute_test(test, lit_config, parallelism_semaphores):
+    """Execute one test"""
+    pg = test.config.parallelism_group
+    if callable(pg):
+        pg = pg(test)
+
+    result = None
+    semaphore = None
+    try:
+        if pg:
+            semaphore = parallelism_semaphores[pg]
+        if semaphore:
+            semaphore.acquire()
+        start_time = time.time()
+        result = test.config.test_format.execute(test, lit_config)
+        # Support deprecated result from execute() which returned the result
+        # code and additional output as a tuple.
+        if isinstance(result, tuple):
+            code, output = result
+            result = lit.Test.Result(code, output)
+        elif not isinstance(result, lit.Test.Result):
+            raise ValueError("unexpected result from test execution")
+        result.elapsed = time.time() - start_time
+    except KeyboardInterrupt:
+        raise
+    except:
+        if lit_config.debug:
+            raise
+        output = 'Exception during script execution:\n'
+        output += traceback.format_exc()
+        output += '\n'
+        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+    finally:
+        if semaphore:
+            semaphore.release()
+
+    test.setResult(result)
+
 class Run(object):
     """
     This class represents a concrete, configured testing run.
@@ -177,42 +218,10 @@ class Run(object):
         self.tests = tests
 
     def execute_test(self, test):
-        pg = test.config.parallelism_group
-        if callable(pg): pg = pg(test)
-
-        result = None
-        semaphore = None
-        try:
-            if pg: semaphore = self.parallelism_semaphores[pg]
-            if semaphore: semaphore.acquire()
-            start_time = time.time()
-            result = test.config.test_format.execute(test, self.lit_config)
-
-            # Support deprecated result from execute() which returned the result
-            # code and additional output as a tuple.
-            if isinstance(result, tuple):
-                code, output = result
-                result = lit.Test.Result(code, output)
-            elif not isinstance(result, lit.Test.Result):
-                raise ValueError("unexpected result from test execution")
-
-            result.elapsed = time.time() - start_time
-        except KeyboardInterrupt:
-            raise
-        except:
-            if self.lit_config.debug:
-                raise
-            output = 'Exception during script execution:\n'
-            output += traceback.format_exc()
-            output += '\n'
-            result = lit.Test.Result(lit.Test.UNRESOLVED, output)
-        finally:
-            if semaphore: semaphore.release()
-
-        test.setResult(result)
+        return execute_test(test, self.lit_config, self.parallelism_semaphores)
 
     def execute_tests(self, display, jobs, max_time=None,
-                      use_processes=False):
+                      execution_strategy=None):
         """
         execute_tests(display, jobs, [max_time])
 
@@ -234,6 +243,14 @@ class Run(object):
         be given an UNRESOLVED result.
         """
 
+        if execution_strategy == 'PROCESS_POOL':
+            self.execute_tests_with_mp_pool(display, jobs, max_time)
+            return
+        # FIXME: Standardize on the PROCESS_POOL execution strategy and remove
+        # the other two strategies.
+
+        use_processes = execution_strategy == 'PROCESSES'
+
         # Choose the appropriate parallel execution implementation.
         consumer = None
         if jobs != 1 and use_processes and multiprocessing:
@@ -263,8 +280,8 @@ class Run(object):
         provider = TestProvider(queue_impl, canceled_flag)
         handleFailures(provider, consumer, self.lit_config.maxFailures)
 
-        # Queue the tests outside the main thread because we can't guarantee
-        # that we can put() all the tests without blocking:
+        # Putting tasks into the threading or multiprocessing Queue may block,
+        # so do it in a separate thread.
         # https://docs.python.org/2/library/multiprocessing.html
         # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
         # without taking any out.
@@ -317,3 +334,111 @@ class Run(object):
         # Wait for all the tasks to complete.
         for t in tasks:
             t.join()
+
+    def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
+        # Don't do anything if we aren't going to run any tests.
+        if not self.tests or jobs == 0:
+            return
+
+        # Set up semaphores to limit parallelism of certain classes of tests.
+        # For example, some ASan tests require lots of virtual memory and run
+        # faster with less parallelism on OS X.
+        self.parallelism_semaphores = \
+                {k: multiprocessing.Semaphore(v) for k, v in
+                 self.lit_config.parallelism_groups.items()}
+
+        # Save the display object on the runner so that we can update it from
+        # our task completion callback.
+        self.display = display
+
+        # Start a process pool. Copy over the data shared between all test runs.
+        pool = multiprocessing.Pool(jobs, worker_initializer,
+                                    (self.lit_config,
+                                     self.parallelism_semaphores))
+
+        # Install a console-control signal handler on Windows.
+        if win32api is not None:
+            def console_ctrl_handler(type):
+                print "Ctr-C received, terminating"
+                pool.terminate()
+                pool.join()
+                os.kill(0,9)
+                return True
+            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+
+        # FIXME: Implement max_time using .wait() timeout argument and a
+        # deadline.
+
+        try:
+            async_results = [pool.apply_async(worker_run_one_test,
+                                              args=(test_index, test),
+                                              callback=self.consume_test_result)
+                             for test_index, test in enumerate(self.tests)]
+
+            # Wait for all results to come in. The callback that runs in the
+            # parent process will update the display.
+            for a in async_results:
+                a.wait()
+                if not a.successful():
+                    a.get() # Exceptions raised here come from the worker.
+        finally:
+            pool.terminate()
+            pool.join()
+
+        # Mark any tests that weren't run as UNRESOLVED.
+        for test in self.tests:
+            if test.result is None:
+                test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
+
+    def consume_test_result(self, pool_result):
+        """Test completion callback for worker_run_one_test
+
+        Updates the test result status in the parent process. Each task in the
+        pool returns the test index and the result, and we use the index to look
+        up the original test object. Also updates the progress bar as tasks
+        complete.
+        """
+        (test_index, test_with_result) = pool_result
+        # Update the parent process copy of the test. This includes the result,
+        # XFAILS, REQUIRES, and UNSUPPORTED statuses.
+        assert self.tests[test_index].file_path == test_with_result.file_path, \
+                "parent and child disagree on test path"
+        self.tests[test_index] = test_with_result
+        self.display.update(test_with_result)
+
+child_lit_config = None
+child_parallelism_semaphores = None
+
+def worker_initializer(lit_config, parallelism_semaphores):
+    """Copy expensive repeated data into worker processes"""
+    global child_lit_config
+    child_lit_config = lit_config
+    global child_parallelism_semaphores
+    child_parallelism_semaphores = parallelism_semaphores
+
+def worker_run_one_test(test_index, test):
+    """Run one test in a multiprocessing.Pool
+
+    Side effects in this function and functions it calls are not visible in the
+    main lit process.
+
+    Arguments and results of this function are pickled, so they should be cheap
+    to copy. For efficiency, we copy all data needed to execute all tests into
+    each worker and store it in the child_* global variables. This reduces the
+    cost of each task.
+
+    Returns an index and a Result, which the parent process uses to update
+    the display.
+    """
+    try:
+        execute_test(test, child_lit_config, child_parallelism_semaphores)
+        return (test_index, test)
+    except KeyboardInterrupt as e:
+        # This is a sad hack. Unfortunately subprocess goes
+        # bonkers with ctrl-c and we start forking merrily.
+        print('\nCtrl-C detected, goodbye.')
+        traceback.print_exc()
+        sys.stdout.flush()
+        os.kill(0,9)
+    except:
+        traceback.print_exc()




More information about the llvm-commits mailing list