[Lldb-commits] [lldb] r248036 - Adds parallel work queue index to test events, stdout/stderr results support.

Todd Fiala via lldb-commits lldb-commits at lists.llvm.org
Fri Sep 18 14:01:13 PDT 2015


Author: tfiala
Date: Fri Sep 18 16:01:13 2015
New Revision: 248036

URL: http://llvm.org/viewvc/llvm-project?rev=248036&view=rev
Log:
Adds parallel work queue index to test events, stdout/stderr results support.

See http://reviews.llvm.org/D12983 for details.

Modified:
    lldb/trunk/test/dosep.py
    lldb/trunk/test/dotest.py
    lldb/trunk/test/dotest_args.py
    lldb/trunk/test/test_results.py

Modified: lldb/trunk/test/dosep.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/dosep.py?rev=248036&r1=248035&r2=248036&view=diff
==============================================================================
--- lldb/trunk/test/dosep.py (original)
+++ lldb/trunk/test/dosep.py Fri Sep 18 16:01:13 2015
@@ -80,7 +80,12 @@ RESULTS_FORMATTER = None
 RUNNER_PROCESS_ASYNC_MAP = None
 RESULTS_LISTENER_CHANNEL = None
 
-def setup_global_variables(lock, counter, total, name_len, options):
+"""Contains an optional function pointer that can return the worker index
+   for the given thread/process calling it.  Returns a 0-based index."""
+GET_WORKER_INDEX = None
+
+def setup_global_variables(
+        lock, counter, total, name_len, options, worker_index_map):
     global output_lock, test_counter, total_tests, test_name_len
     global dotest_options
     output_lock = lock
@@ -89,6 +94,22 @@ def setup_global_variables(lock, counter
     test_name_len = name_len
     dotest_options = options
 
+    if worker_index_map is not None:
+        # We'll use the output lock for this to avoid sharing another lock.
+        # This won't be used much.
+        index_lock = lock
+
+        def get_worker_index_use_pid():
+            """Returns a 0-based, process-unique index for the worker."""
+            pid = os.getpid()
+            with index_lock:
+                if pid not in worker_index_map:
+                    worker_index_map[pid] = len(worker_index_map)
+                return worker_index_map[pid]
+
+        global GET_WORKER_INDEX
+        GET_WORKER_INDEX = get_worker_index_use_pid
+
 
 def report_test_failure(name, command, output):
     global output_lock
@@ -150,31 +171,6 @@ def parse_test_results(output):
     return passes, failures, unexpected_successes
 
 
-def inferior_session_interceptor(forwarding_func, event):
-    """Intercepts session begin/end events, passing through everyting else.
-
-    @param forwarding_func a callable object to pass along the event if it
-    is not one that gets intercepted.
-
-    @param event the test result event received.
-    """
-
-    if event is not None and isinstance(event, dict):
-        if "event" in event:
-            if event["event"] == "session_begin":
-                # Swallow it.  Could report on inferior here if we
-                # cared.
-                return
-            elif event["event"] == "session_end":
-                # Swallow it.  Could report on inferior here if we
-                # cared.  More usefully, we can verify that the
-                # inferior went down hard if we don't receive this.
-                return
-
-    # Pass it along.
-    forwarding_func(event)
-
-
 def call_with_timeout(command, timeout, name, inferior_pid_events):
     """Run command with a timeout if possible.
     -s QUIT will create a coredump if they are enabled on your system
@@ -183,6 +179,10 @@ def call_with_timeout(command, timeout,
     if timeout_command and timeout != "0":
         command = [timeout_command, '-s', 'QUIT', timeout] + command
 
+    if GET_WORKER_INDEX is not None:
+        worker_index = GET_WORKER_INDEX()
+        command.extend([
+            "--event-add-entries", "worker_index={}".format(worker_index)])
     # Specifying a value for close_fds is unsupported on Windows when using
     # subprocess.PIPE
     if os.name != "nt":
@@ -263,7 +263,8 @@ out_q = None
 
 def process_dir_worker_multiprocessing(
         a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
-        a_dotest_options, job_queue, result_queue, inferior_pid_events):
+        a_dotest_options, job_queue, result_queue, inferior_pid_events,
+        worker_index_map):
     """Worker thread main loop when in multiprocessing mode.
     Takes one directory specification at a time and works on it."""
 
@@ -273,7 +274,7 @@ def process_dir_worker_multiprocessing(
     # Setup the global state for the worker process.
     setup_global_variables(
         a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
-        a_dotest_options)
+        a_dotest_options, worker_index_map)
 
     # Keep grabbing entries from the queue until done.
     while not job_queue.empty():
@@ -441,14 +442,36 @@ def initialize_global_vars_multiprocessi
     # rest of the flat module.
     global output_lock
     output_lock = multiprocessing.RLock()
+
     initialize_global_vars_common(num_threads, test_work_items)
 
 
 def initialize_global_vars_threading(num_threads, test_work_items):
+    """Initializes global variables used in threading mode.
+    @param num_threads specifies the number of workers used.
+    @param test_work_items specifies all the work items
+    that will be processed.
+    """
     # Initialize the global state we'll use to communicate with the
     # rest of the flat module.
     global output_lock
     output_lock = threading.RLock()
+
+    index_lock = threading.RLock()
+    index_map = {}
+
+    def get_worker_index_threading():
+        """Returns a 0-based, thread-unique index for the worker thread."""
+        thread_id = threading.current_thread().ident
+        with index_lock:
+            if thread_id not in index_map:
+                index_map[thread_id] = len(index_map)
+            return index_map[thread_id]
+
+
+    global GET_WORKER_INDEX
+    GET_WORKER_INDEX = get_worker_index_threading
+
     initialize_global_vars_common(num_threads, test_work_items)
 
 
@@ -630,6 +653,10 @@ def multiprocessing_test_runner(num_thre
     # hold 2 * (num inferior dotest.py processes started) entries.
     inferior_pid_events = multiprocessing.Queue(4096)
 
+    # Worker dictionary allows each worker to figure out its worker index.
+    manager = multiprocessing.Manager()
+    worker_index_map = manager.dict()
+
     # Create workers.  We don't use multiprocessing.Pool due to
     # challenges with handling ^C keyboard interrupts.
     workers = []
@@ -643,7 +670,8 @@ def multiprocessing_test_runner(num_thre
                   dotest_options,
                   job_queue,
                   result_queue,
-                  inferior_pid_events))
+                  inferior_pid_events,
+                  worker_index_map))
         worker.start()
         workers.append(worker)
 
@@ -717,11 +745,14 @@ def multiprocessing_test_runner_pool(num
     # Initialize our global state.
     initialize_global_vars_multiprocessing(num_threads, test_work_items)
 
+    manager = multiprocessing.Manager()
+    worker_index_map = manager.dict()
+
     pool = multiprocessing.Pool(
         num_threads,
         initializer=setup_global_variables,
         initargs=(output_lock, test_counter, total_tests, test_name_len,
-                  dotest_options))
+                  dotest_options, worker_index_map))
 
     # Start the map operation (async mode).
     map_future = pool.map_async(
@@ -819,6 +850,10 @@ def inprocess_exec_test_runner(test_work
     # Initialize our global state.
     initialize_global_vars_multiprocessing(1, test_work_items)
 
+    # We're always worker index 0
+    global GET_WORKER_INDEX
+    GET_WORKER_INDEX = lambda: 0
+
     # Run the listener and related channel maps in a separate thread.
     # global RUNNER_PROCESS_ASYNC_MAP
     global RESULTS_LISTENER_CHANNEL
@@ -861,8 +896,7 @@ def walk_and_invoke(test_directory, test
     # listener channel and tell the inferior to send results to the
     # port on which we'll be listening.
     if RESULTS_FORMATTER is not None:
-        forwarding_func = lambda event: inferior_session_interceptor(
-            RESULTS_FORMATTER.process_event, event)
+        forwarding_func = RESULTS_FORMATTER.process_event
         RESULTS_LISTENER_CHANNEL = (
             dotest_channels.UnpicklingForwardingListenerChannel(
                 RUNNER_PROCESS_ASYNC_MAP, "localhost", 0, forwarding_func))
@@ -1088,6 +1122,11 @@ def adjust_inferior_options(dotest_argv)
     if dotest_options.results_formatter_options is not None:
         _remove_option(dotest_argv, "--results-formatter-options", 2)
 
+    # Remove test runner name if present.
+    if dotest_options.test_runner_name is not None:
+        _remove_option(dotest_argv, "--test-runner-name", 2)
+
+
 def main(print_details_on_success, num_threads, test_subdir,
          test_runner_name, results_formatter):
     """Run dotest.py in inferior mode in parallel.

Modified: lldb/trunk/test/dotest.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/dotest.py?rev=248036&r1=248035&r2=248036&view=diff
==============================================================================
--- lldb/trunk/test/dotest.py (original)
+++ lldb/trunk/test/dotest.py Fri Sep 18 16:01:13 2015
@@ -823,6 +823,19 @@ def parseOptionsAndInitTestdirs():
         lldb_platform_url = args.lldb_platform_url
     if args.lldb_platform_working_dir:
         lldb_platform_working_dir = args.lldb_platform_working_dir
+
+    if args.event_add_entries and len(args.event_add_entries) > 0:
+        entries = {}
+        # Parse out key=val pairs, separated by comma
+        for keyval in args.event_add_entries.split(","):
+            key_val_entry = keyval.split("=")
+            if len(key_val_entry) == 2:
+                entries[key_val_entry[0]] = key_val_entry[1]
+        # Tell the event builder to create all events with these
+        # key/val pairs in them.
+        if len(entries) > 0:
+            test_results.EventBuilder.add_entries_to_all_events(entries)
+
     # Gather all the dirs passed on the command line.
     if len(args.args) > 0:
         testdirs = map(os.path.abspath, args.args)
@@ -947,8 +960,15 @@ def setupTestResults():
 
     if results_filename:
         # Open the results file for writing.
-        results_file_object = open(results_filename, "w")
-        cleanup_func = results_file_object.close
+        if results_filename == 'stdout':
+            results_file_object = sys.stdout
+            cleanup_func = None
+        elif results_filename == 'stderr':
+            results_file_object = sys.stderr
+            cleanup_func = None
+        else:
+            results_file_object = open(results_filename, "w")
+            cleanup_func = results_file_object.close
         default_formatter_name = "test_results.XunitFormatter"
     elif results_port:
         # Connect to the specified localhost port.
@@ -995,7 +1015,8 @@ def setupTestResults():
             results_formatter_object.end_session()
 
             # And now close out the output file-like object.
-            cleanup_func()
+            if cleanup_func is not None:
+                cleanup_func()
 
         atexit.register(shutdown_formatter)
 

Modified: lldb/trunk/test/dotest_args.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/dotest_args.py?rev=248036&r1=248035&r2=248036&view=diff
==============================================================================
--- lldb/trunk/test/dotest_args.py (original)
+++ lldb/trunk/test/dotest_args.py Fri Sep 18 16:01:13 2015
@@ -165,6 +165,11 @@ def create_parser():
         help=('Specify comma-separated options to pass to the formatter. '
               'Use --results-formatter-options="--option1[,--option2[,...]]" '
               'syntax.  Note the "=" is critical, and don\'t use whitespace.'))
+    group.add_argument(
+        '--event-add-entries',
+        action='store',
+        help=('Specify comma-separated KEY=VAL entries to add key and value '
+              'pairs to all test events generated by this test run.'))
     # Remove the reference to our helper function
     del X
 

Modified: lldb/trunk/test/test_results.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/test_results.py?rev=248036&r1=248035&r2=248036&view=diff
==============================================================================
--- lldb/trunk/test/test_results.py (original)
+++ lldb/trunk/test/test_results.py Fri Sep 18 16:01:13 2015
@@ -12,6 +12,7 @@ import argparse
 import cPickle
 import inspect
 import os
+import pprint
 import re
 import sys
 import threading
@@ -22,6 +23,9 @@ import xml.sax.saxutils
 
 class EventBuilder(object):
     """Helper class to build test result event dictionaries."""
+
+    BASE_DICTIONARY = None
+
     @staticmethod
     def _get_test_name_info(test):
         """Returns (test-class-name, test-method-name) from a test case instance.
@@ -46,12 +50,19 @@ class EventBuilder(object):
         @return event dictionary with common event fields set.
         """
         test_class_name, test_name = EventBuilder._get_test_name_info(test)
-        return {
+
+        if EventBuilder.BASE_DICTIONARY is not None:
+            # Start with a copy of the "always include" entries.
+            result = dict(EventBuilder.BASE_DICTIONARY)
+        else:
+            result = {}
+        result.update({
             "event": event_type,
             "test_class": test_class_name,
             "test_name": test_name,
             "event_time": time.time()
-        }
+        })
+        return result
 
     @staticmethod
     def _error_tuple_class(error_tuple):
@@ -122,9 +133,9 @@ class EventBuilder(object):
         event = EventBuilder._event_dictionary_test_result(test, status)
         event["issue_class"] = EventBuilder._error_tuple_class(error_tuple)
         event["issue_message"] = EventBuilder._error_tuple_message(error_tuple)
-        tb = EventBuilder._error_tuple_traceback(error_tuple)
-        if tb is not None:
-            event["issue_backtrace"] = traceback.format_tb(tb)
+        backtrace = EventBuilder._error_tuple_traceback(error_tuple)
+        if backtrace is not None:
+            event["issue_backtrace"] = traceback.format_tb(backtrace)
         return event
 
     @staticmethod
@@ -246,8 +257,38 @@ class EventBuilder(object):
         event["issue_phase"] = "cleanup"
         return event
 
+    @staticmethod
+    def add_entries_to_all_events(entries_dict):
+        """Specifies a dictionary of entries to add to all test events.
+
+        This provides a mechanism for, say, a parallel test runner to
+        indicate to each inferior dotest.py that it should add a
+        worker index to each.
+
+        Calling this method replaces all previous entries added
+        by a prior call to this.
+
+        Event build methods will overwrite any entries that collide.
+        Thus, the passed in dictionary is the base, which gets merged
+        over by event building when keys collide.
+
+        @param entries_dict a dictionary containing key and value
+        pairs that should be merged into all events created by the
+        event generator.  May be None to clear out any extra entries.
+        """
+        EventBuilder.BASE_DICTIONARY = dict(entries_dict)
+
+    @staticmethod
+    def base_event():
+        """@return the base event dictionary that all events should contain."""
+        if EventBuilder.BASE_DICTIONARY is not None:
+            return dict(EventBuilder.BASE_DICTIONARY)
+        else:
+            return None
+
 
 class ResultsFormatter(object):
+
     """Provides interface to formatting test results out to a file-like object.
 
     This class allows the LLDB test framework's raw test-realted
@@ -420,6 +461,9 @@ class XunitFormatter(ResultsFormatter):
 
     @staticmethod
     def _build_illegal_xml_regex():
+        """Contructs a regex to match all illegal xml characters.
+
+        Expects to be used against a unicode string."""
         # Construct the range pairs of invalid unicode chareacters.
         illegal_chars_u = [
             (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84),
@@ -453,6 +497,15 @@ class XunitFormatter(ResultsFormatter):
         return xml.sax.saxutils.quoteattr(text)
 
     def _replace_invalid_xml(self, str_or_unicode):
+        """Replaces invalid XML characters with a '?'.
+
+        @param str_or_unicode a string to replace invalid XML
+        characters within.  Can be unicode or not.  If not unicode,
+        assumes it is a byte string in utf-8 encoding.
+
+        @returns a utf-8-encoded byte string with invalid
+        XML replaced with '?'.
+        """
         # Get the content into unicode
         if isinstance(str_or_unicode, str):
             unicode_content = str_or_unicode.decode('utf-8')
@@ -472,6 +525,11 @@ class XunitFormatter(ResultsFormatter):
             XunitFormatter.RM_FAILURE,
             XunitFormatter.RM_PASSTHRU]
         parser.add_argument(
+            "--assert-on-unknown-events",
+            action="store_true",
+            help=('cause unknown test events to generate '
+                  'a python assert.  Default is to ignore.'))
+        parser.add_argument(
             "--xpass", action="store", choices=results_mapping_choices,
             default=XunitFormatter.RM_FAILURE,
             help=('specify mapping from unexpected success to jUnit/xUnit '
@@ -533,8 +591,10 @@ class XunitFormatter(ResultsFormatter):
         elif event_type == "test_result":
             self._process_test_result(test_event)
         else:
-            sys.stderr.write("unknown event type {} from {}\n".format(
-                event_type, test_event))
+            # This is an unknown event.
+            if self.options.assert_on_unknown_events:
+                raise Exception("unknown event type {} from {}\n".format(
+                    event_type, test_event))
 
     def _handle_success(self, test_event):
         """Handles a test success.
@@ -817,27 +877,40 @@ class RawPickledFormatter(ResultsFormatt
 
     def begin_session(self):
         super(RawPickledFormatter, self).begin_session()
-        self.process_event({
+        event = EventBuilder.base_event()
+        if event is None:
+            event = {}
+        event.update({
             "event": "session_begin",
             "event_time": time.time(),
             "pid": self.pid
         })
+        self.process_event(event)
 
     def process_event(self, test_event):
         super(RawPickledFormatter, self).process_event(test_event)
 
-        # Add pid to the event for tracking.
-        # test_event["pid"] = self.pid
-
         # Send it as {serialized_length_of_serialized_bytes}#{serialized_bytes}
         pickled_message = cPickle.dumps(test_event)
         self.out_file.send(
             "{}#{}".format(len(pickled_message), pickled_message))
 
     def end_session(self):
-        self.process_event({
+        event = EventBuilder.base_event()
+        if event is None:
+            event = {}
+        event.update({
             "event": "session_end",
             "event_time": time.time(),
             "pid": self.pid
         })
+        self.process_event(event)
         super(RawPickledFormatter, self).end_session()
+
+
+class DumpFormatter(ResultsFormatter):
+    """Formats events to the file as their raw python dictionary format."""
+
+    def process_event(self, test_event):
+        super(DumpFormatter, self).process_event(test_event)
+        self.out_file.write("\n" + pprint.pformat(test_event) + "\n")




More information about the lldb-commits mailing list