[llvm] r354187 - [lit][NFC] Cleanup lit worker process handling

Julian Lettner via llvm-commits llvm-commits at lists.llvm.org
Fri Feb 15 16:40:40 PST 2019


Author: yln
Date: Fri Feb 15 16:40:40 2019
New Revision: 354187

URL: http://llvm.org/viewvc/llvm-project?rev=354187&view=rev
Log:
[lit][NFC] Cleanup lit worker process handling

Move code that is executed on worker process to separate file. This
makes the use of the pickled arguments stored in global variables in the
worker a bit clearer. (Still not pretty though.)

Extract handling of parallelism groups to it's own function.

Use BoundedSemaphore instead of Semaphore. BoundedSemaphore raises for
unmatched release() calls.

Cleanup imports.

Differential Revision: https://reviews.llvm.org/D58196

Added:
    llvm/trunk/utils/lit/lit/worker.py
Modified:
    llvm/trunk/utils/lit/lit/run.py
    llvm/trunk/utils/lit/lit/util.py

Modified: llvm/trunk/utils/lit/lit/run.py
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/utils/lit/lit/run.py?rev=354187&r1=354186&r2=354187&view=diff
==============================================================================
--- llvm/trunk/utils/lit/lit/run.py (original)
+++ llvm/trunk/utils/lit/lit/run.py Fri Feb 15 16:40:40 2019
@@ -1,28 +1,9 @@
-import os
-import sys
-import threading
+import multiprocessing
 import time
-import traceback
-try:
-    import Queue as queue
-except ImportError:
-    import queue
-
-try:
-    import win32api
-except ImportError:
-    win32api = None
 
-import multiprocessing
 import lit.Test
-
-def abort_now():
-    """Abort the current process without doing any exception teardown"""
-    sys.stdout.flush()
-    if win32api:
-        win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
-    else:
-        os.kill(0, 9)
+import lit.util
+import lit.worker
 
 class _Display(object):
     def __init__(self, display, provider, maxFailures):
@@ -48,12 +29,11 @@ class Run(object):
         # For example, some ASan tests require lots of virtual memory and run
         # faster with less parallelism on OS X.
         self.parallelism_semaphores = \
-                {k: multiprocessing.Semaphore(v) for k, v in
+                {k: multiprocessing.BoundedSemaphore(v) for k, v in
                  self.lit_config.parallelism_groups.items()}
 
     def execute_test(self, test):
-        return _execute_test_impl(test, self.lit_config,
-                                  self.parallelism_semaphores)
+        return lit.worker._execute_test(test, self.lit_config)
 
     def execute_tests_in_pool(self, jobs, max_time):
         # We need to issue many wait calls, so compute the final deadline and
@@ -67,22 +47,22 @@ class Run(object):
         # interrupts the workers before we make it into our task callback, they
         # will each raise a KeyboardInterrupt exception and print to stderr at
         # the same time.
-        pool = multiprocessing.Pool(jobs, worker_initializer,
+        pool = multiprocessing.Pool(jobs, lit.worker.initializer,
                                     (self.lit_config,
                                      self.parallelism_semaphores))
 
         # Install a console-control signal handler on Windows.
-        if win32api is not None:
+        if lit.util.win32api is not None:
             def console_ctrl_handler(type):
                 print('\nCtrl-C detected, terminating.')
                 pool.terminate()
                 pool.join()
-                abort_now()
+                lit.util.abort_now()
                 return True
-            win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
+            lit.util.win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)
 
         try:
-            async_results = [pool.apply_async(worker_run_one_test,
+            async_results = [pool.apply_async(lit.worker.run_one_test,
                                               args=(test_index, test),
                                               callback=self.consume_test_result)
                              for test_index, test in enumerate(self.tests)]
@@ -143,11 +123,9 @@ class Run(object):
         self.failure_count = 0
         self.hit_max_failures = False
         if jobs == 1:
-            global child_lit_config
-            child_lit_config = self.lit_config
             for test_index, test in enumerate(self.tests):
-                result = worker_run_one_test(test_index, test)
-                self.consume_test_result(result)
+                lit.worker._execute_test(test, self.lit_config)
+                self.consume_test_result((test_index, test))
                 if self.hit_max_failures:
                     break
         else:
@@ -159,7 +137,7 @@ class Run(object):
                 test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))
 
     def consume_test_result(self, pool_result):
-        """Test completion callback for worker_run_one_test
+        """Test completion callback for lit.worker.run_one_test
 
         Updates the test result status in the parent process. Each task in the
         pool returns the test index and the result, and we use the index to look
@@ -186,74 +164,3 @@ class Run(object):
         if self.lit_config.maxFailures and \
                 self.failure_count == self.lit_config.maxFailures:
             self.hit_max_failures = True
-
-def _execute_test_impl(test, lit_config, parallelism_semaphores):
-    """Execute one test"""
-    pg = test.config.parallelism_group
-    if callable(pg):
-        pg = pg(test)
-
-    result = None
-    semaphore = None
-    try:
-        if pg:
-            semaphore = parallelism_semaphores[pg]
-        if semaphore:
-            semaphore.acquire()
-        start_time = time.time()
-        result = test.config.test_format.execute(test, lit_config)
-        # Support deprecated result from execute() which returned the result
-        # code and additional output as a tuple.
-        if isinstance(result, tuple):
-            code, output = result
-            result = lit.Test.Result(code, output)
-        elif not isinstance(result, lit.Test.Result):
-            raise ValueError("unexpected result from test execution")
-        result.elapsed = time.time() - start_time
-    except KeyboardInterrupt:
-        raise
-    except:
-        if lit_config.debug:
-            raise
-        output = 'Exception during script execution:\n'
-        output += traceback.format_exc()
-        output += '\n'
-        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
-    finally:
-        if semaphore:
-            semaphore.release()
-
-    test.setResult(result)
-
-child_lit_config = None
-child_parallelism_semaphores = None
-
-def worker_initializer(lit_config, parallelism_semaphores):
-    """Copy expensive repeated data into worker processes"""
-    global child_lit_config
-    child_lit_config = lit_config
-    global child_parallelism_semaphores
-    child_parallelism_semaphores = parallelism_semaphores
-
-def worker_run_one_test(test_index, test):
-    """Run one test in a multiprocessing.Pool
-
-    Side effects in this function and functions it calls are not visible in the
-    main lit process.
-
-    Arguments and results of this function are pickled, so they should be cheap
-    to copy. For efficiency, we copy all data needed to execute all tests into
-    each worker and store it in the child_* global variables. This reduces the
-    cost of each task.
-
-    Returns an index and a Result, which the parent process uses to update
-    the display.
-    """
-    try:
-        _execute_test_impl(test, child_lit_config, child_parallelism_semaphores)
-        return (test_index, test)
-    except KeyboardInterrupt as e:
-        # If a worker process gets an interrupt, abort it immediately.
-        abort_now()
-    except:
-        traceback.print_exc()

Modified: llvm/trunk/utils/lit/lit/util.py
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/utils/lit/lit/util.py?rev=354187&r1=354186&r2=354187&view=diff
==============================================================================
--- llvm/trunk/utils/lit/lit/util.py (original)
+++ llvm/trunk/utils/lit/lit/util.py Fri Feb 15 16:40:40 2019
@@ -424,3 +424,17 @@ def killProcessAndChildren(pid):
         psutilProc.kill()
     except psutil.NoSuchProcess:
         pass
+
+
+try:
+    import win32api
+except ImportError:
+    win32api = None
+
+def abort_now():
+    """Abort the current process without doing any exception teardown"""
+    sys.stdout.flush()
+    if win32api:
+        win32api.TerminateProcess(win32api.GetCurrentProcess(), 3)
+    else:
+        os.kill(0, 9)

Added: llvm/trunk/utils/lit/lit/worker.py
URL: http://llvm.org/viewvc/llvm-project/llvm/trunk/utils/lit/lit/worker.py?rev=354187&view=auto
==============================================================================
--- llvm/trunk/utils/lit/lit/worker.py (added)
+++ llvm/trunk/utils/lit/lit/worker.py Fri Feb 15 16:40:40 2019
@@ -0,0 +1,82 @@
+# The functions in this module are meant to run on a separate worker process.
+# Exception: in single process mode _execute_test is called directly.
+import time
+import traceback
+
+import lit.Test
+import lit.util
+
+_lit_config = None
+_parallelism_semaphores = None
+
+def initializer(lit_config, parallelism_semaphores):
+    """Copy expensive repeated data into worker processes"""
+    global _lit_config
+    global _parallelism_semaphores
+    _lit_config = lit_config
+    _parallelism_semaphores = parallelism_semaphores
+
+def run_one_test(test_index, test):
+    """Run one test in a multiprocessing.Pool
+
+    Side effects in this function and functions it calls are not visible in the
+    main lit process.
+
+    Arguments and results of this function are pickled, so they should be cheap
+    to copy. For efficiency, we copy all data needed to execute all tests into
+    each worker and store it in the worker_* global variables. This reduces the
+    cost of each task.
+
+    Returns an index and a Result, which the parent process uses to update
+    the display.
+    """
+    try:
+        _execute_test_in_parallelism_group(test, _lit_config,
+                                           _parallelism_semaphores)
+        return (test_index, test)
+    except KeyboardInterrupt:
+        # If a worker process gets an interrupt, abort it immediately.
+        lit.util.abort_now()
+    except:
+        traceback.print_exc()
+
+def _execute_test_in_parallelism_group(test, lit_config, parallelism_semaphores):
+    """Execute one test inside the appropriate parallelism group"""
+    pg = test.config.parallelism_group
+    if callable(pg):
+        pg = pg(test)
+
+    if pg:
+        semaphore = parallelism_semaphores[pg]
+        try:
+            semaphore.acquire()
+            _execute_test(test, lit_config)
+        finally:
+            semaphore.release()
+    else:
+        _execute_test(test, lit_config)
+
+def _execute_test(test, lit_config):
+    """Execute one test"""
+    try:
+        start_time = time.time()
+        result = test.config.test_format.execute(test, lit_config)
+        # Support deprecated result from execute() which returned the result
+        # code and additional output as a tuple.
+        if isinstance(result, tuple):
+            code, output = result
+            result = lit.Test.Result(code, output)
+        elif not isinstance(result, lit.Test.Result):
+            raise ValueError("unexpected result from test execution")
+        result.elapsed = time.time() - start_time
+    except KeyboardInterrupt:
+        raise
+    except:
+        if lit_config.debug:
+            raise
+        output = 'Exception during script execution:\n'
+        output += traceback.format_exc()
+        output += '\n'
+        result = lit.Test.Result(lit.Test.UNRESOLVED, output)
+
+    test.setResult(result)




More information about the llvm-commits mailing list