[Lldb-commits] [lldb] r210931 - Added gdb-remote stop packet expedited register tests.

Todd Fiala todd.fiala at gmail.com
Fri Jun 13 12:11:33 PDT 2014


Author: tfiala
Date: Fri Jun 13 14:11:33 2014
New Revision: 210931

URL: http://llvm.org/viewvc/llvm-project?rev=210931&view=rev
Log:
Added gdb-remote stop packet expedited register tests.

Expedited registers currently checked for are pc, fp and sp.

Also broke out the gdb-remote base test case logic into
class gdbremote_testcase.GdbRemoteTestCaseBase in the new
gdbremote_testcase.py file.

TestGdbRemoteExpeditedRegisters.py is the first gdb-remote area
to be contained in its own test case class file.

The monolithic TestLldbGdbServer.py has been modified to derive
from gdbremote_testcase.GdbRemoteTestCaseBase.  Soon I will
pull out all the gdb-remote functional area tests from that class
into separate classes.

I'm intending to start all GdbRemote test cases with GdbRemote
so it is easy to run them all with a -p pattern match on the
test run infrastructure.

Also scanned and removed all cases of whitespace-only lines in
the files I touched.

Added:
    lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py
    lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py
Modified:
    lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py

Added: lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py?rev=210931&view=auto
==============================================================================
--- lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py (added)
+++ lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py Fri Jun 13 14:11:33 2014
@@ -0,0 +1,132 @@
+import unittest2
+
+import gdbremote_testcase
+from lldbtest import *
+
+class TestGdbRemoteExpeditedRegisters(gdbremote_testcase.GdbRemoteTestCaseBase):
+
+    def gather_expedited_registers(self):
+        # Setup the stub and set the gdb remote command stream.
+        procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
+        self.test_sequence.add_log_lines([
+            # Start up the inferior.
+            "read packet: $c#00",
+            # Immediately tell it to stop.  We want to see what it reports.
+            "read packet: {}".format(chr(03)),
+            {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result", 2:"key_vals_text"} },
+            ], True)
+
+        # Run the gdb remote command stream.
+        context = self.expect_gdbremote_sequence()
+        self.assertIsNotNone(context)
+
+        # Pull out expedited registers.
+        key_vals_text = context.get("key_vals_text")
+        self.assertIsNotNone(key_vals_text)
+
+        expedited_registers = self.extract_registers_from_stop_notification(key_vals_text)
+        self.assertIsNotNone(expedited_registers)
+
+        return expedited_registers
+
+    def stop_notification_contains_generic_register(self, generic_register_name):
+        # Generate a stop reply, parse out expedited registers from stop notification.
+        expedited_registers = self.gather_expedited_registers()
+
+        # Gather target register infos.
+        reg_infos = self.gather_register_infos()
+
+        # Find the generic register.
+        reg_info = self.find_generic_register_with_name(reg_infos, generic_register_name)
+        self.assertIsNotNone(reg_info)
+
+        # Ensure the expedited registers contained it.
+        self.assertTrue(reg_info["lldb_register_index"] in expedited_registers)
+        # print "{} reg_info:{}".format(generic_register_name, reg_info)
+
+    def stop_notification_contains_any_registers(self):
+        # Generate a stop reply, parse out expedited registers from stop notification.
+        expedited_registers = self.gather_expedited_registers()
+        # Verify we have at least one expedited register.
+        self.assertTrue(len(expedited_registers) > 0)
+
+    @debugserver_test
+    @dsym_test
+    def test_stop_notification_contains_any_registers_debugserver_dsym(self):
+        self.init_debugserver_test()
+        self.buildDsym()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_any_registers()
+
+    @llgs_test
+    @dwarf_test
+    @unittest2.expectedFailure()
+    def test_stop_notification_contains_any_registers_llgs_dwarf(self):
+        self.init_llgs_test()
+        self.buildDwarf()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_any_registers()
+
+    def stop_notification_contains_pc_register(self):
+        self.stop_notification_contains_generic_register("pc")
+
+    @debugserver_test
+    @dsym_test
+    def test_stop_notification_contains_pc_register_debugserver_dsym(self):
+        self.init_debugserver_test()
+        self.buildDsym()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_pc_register()
+
+    @llgs_test
+    @dwarf_test
+    @unittest2.expectedFailure()
+    def test_stop_notification_contains_pc_register_llgs_dwarf(self):
+        self.init_llgs_test()
+        self.buildDwarf()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_pc_register()
+
+    def stop_notification_contains_fp_register(self):
+        self.stop_notification_contains_generic_register("fp")
+
+    @debugserver_test
+    @dsym_test
+    def test_stop_notification_contains_fp_register_debugserver_dsym(self):
+        self.init_debugserver_test()
+        self.buildDsym()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_fp_register()
+
+    @llgs_test
+    @dwarf_test
+    @unittest2.expectedFailure()
+    def test_stop_notification_contains_fp_register_llgs_dwarf(self):
+        self.init_llgs_test()
+        self.buildDwarf()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_fp_register()
+
+    def stop_notification_contains_sp_register(self):
+        self.stop_notification_contains_generic_register("sp")
+
+    @debugserver_test
+    @dsym_test
+    def test_stop_notification_contains_sp_register_debugserver_dsym(self):
+        self.init_debugserver_test()
+        self.buildDsym()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_sp_register()
+
+    @llgs_test
+    @dwarf_test
+    @unittest2.expectedFailure()
+    def test_stop_notification_contains_sp_register_llgs_dwarf(self):
+        self.init_llgs_test()
+        self.buildDwarf()
+        self.set_inferior_startup_launch()
+        self.stop_notification_contains_sp_register()
+
+
+if __name__ == '__main__':
+    unittest2.main()

Modified: lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py?rev=210931&r1=210930&r2=210931&view=diff
==============================================================================
--- lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py (original)
+++ lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py Fri Jun 13 14:11:33 2014
@@ -1,507 +1,22 @@
 """
-Test lldb-gdbserver operation
+Test case for testing the gdbremote protocol.
+
+Tests run against debugserver and lldb-gdbserver (llgs).
+lldb-gdbserver tests run where the lldb-gdbserver exe is
+available.
+
+This class will be broken into smaller test case classes by
+gdb remote packet functional areas.  For now it contains
+the initial set of tests implemented.
 """
 
+import lldbgdbserverutils
 import unittest2
-import pexpect
-import platform
-import sets
-import signal
-import socket
-import subprocess
-import sys
-import time
 from lldbtest import *
-from lldbgdbserverutils import *
-import logging
-import os.path
-
-class LldbGdbServerTestCase(TestBase):
-
-    mydir = TestBase.compute_mydir(__file__)
-
-    port = 12345
-
-    _TIMEOUT_SECONDS = 5
-
-    _GDBREMOTE_KILL_PACKET = "$k#6b"
-
-    _LOGGING_LEVEL = logging.WARNING
-    # _LOGGING_LEVEL = logging.DEBUG
-
-    _STARTUP_ATTACH = "attach"
-    _STARTUP_LAUNCH = "launch"
-
-    # GDB Signal numbers that are not target-specific used for common exceptions
-    TARGET_EXC_BAD_ACCESS      = 0x91
-    TARGET_EXC_BAD_INSTRUCTION = 0x92
-    TARGET_EXC_ARITHMETIC      = 0x93
-    TARGET_EXC_EMULATION       = 0x94
-    TARGET_EXC_SOFTWARE        = 0x95
-    TARGET_EXC_BREAKPOINT      = 0x96
-
-    def setUp(self):
-        TestBase.setUp(self)
-        FORMAT = '%(asctime)-15s %(levelname)-8s %(message)s'
-        logging.basicConfig(format=FORMAT)
-        self.logger = logging.getLogger(__name__)
-        self.logger.setLevel(self._LOGGING_LEVEL)
-        self.test_sequence = GdbRemoteTestSequence(self.logger)
-        self.set_inferior_startup_launch()
-
-        # Uncomment this code to force only a single test to run (by name).
-        #if not re.search(r"P_", self._testMethodName):
-        #    self.skipTest("focusing on one test")
-
-    def reset_test_sequence(self):
-        self.test_sequence = GdbRemoteTestSequence(self.logger)
-
-    def init_llgs_test(self):
-        self.debug_monitor_exe = get_lldb_gdbserver_exe()
-        if not self.debug_monitor_exe:
-            self.skipTest("lldb_gdbserver exe not found")
-        self.debug_monitor_extra_args = ""
-
-    def init_debugserver_test(self):
-        self.debug_monitor_exe = get_debugserver_exe()
-        if not self.debug_monitor_exe:
-            self.skipTest("debugserver exe not found")
-        self.debug_monitor_extra_args = " --log-file=/tmp/packets-{}.log --log-flags=0x800000".format(self._testMethodName)
-
-    def create_socket(self):
-        sock = socket.socket()
-        logger = self.logger
-
-        def shutdown_socket():
-            if sock:
-                try:
-                    # send the kill packet so lldb-gdbserver shuts down gracefully
-                    sock.sendall(LldbGdbServerTestCase._GDBREMOTE_KILL_PACKET)
-                except:
-                    logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
-
-                try:
-                    sock.close()
-                except:
-                    logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
-
-        self.addTearDownHook(shutdown_socket)
-
-        sock.connect(('localhost', self.port))
-        return sock
-
-    def set_inferior_startup_launch(self):
-        self._inferior_startup = self._STARTUP_LAUNCH
-
-    def set_inferior_startup_attach(self):
-        self._inferior_startup = self._STARTUP_ATTACH
-
-    def start_server(self, attach_pid=None):
-        # Create the command line
-        commandline = "{}{} localhost:{}".format(self.debug_monitor_exe, self.debug_monitor_extra_args, self.port)
-        if attach_pid:
-            commandline += " --attach=%d" % attach_pid
-            
-        # start the server
-        server = pexpect.spawn(commandline)
-
-        # Turn on logging for what the child sends back.
-        if self.TraceOn():
-            server.logfile_read = sys.stdout
-
-        # Schedule debug monitor to be shut down during teardown.
-        logger = self.logger
-        def shutdown_debug_monitor():
-            try:
-                server.close()
-            except:
-                logger.warning("failed to close pexpect server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
-
-        self.addTearDownHook(shutdown_debug_monitor)
-
-        # Wait until we receive the server ready message before continuing.
-        server.expect_exact('Listening to port {} for a connection from localhost'.format(self.port))
-
-        # Create a socket to talk to the server
-        self.sock = self.create_socket()
-
-        return server
-
-    def launch_process_for_attach(self,inferior_args=None, sleep_seconds=3):
-        # We're going to start a child process that the debug monitor stub can later attach to.
-        # This process needs to be started so that it just hangs around for a while.  We'll
-        # have it sleep.
-        exe_path = os.path.abspath("a.out")
-
-        args = [exe_path]
-        if inferior_args:
-            args.extend(inferior_args)
-        if sleep_seconds:
-            args.append("sleep:%d" % sleep_seconds)
-
-        return subprocess.Popen(args)
-
-    def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3):
-        """Prep the debug monitor, the inferior, and the expected packet stream.
-
-        Handle the separate cases of using the debug monitor in attach-to-inferior mode
-        and in launch-inferior mode.
-
-        For attach-to-inferior mode, the inferior process is first started, then
-        the debug monitor is started in attach to pid mode (using --attach on the
-        stub command line), and the no-ack-mode setup is appended to the packet
-        stream.  The packet stream is not yet executed, ready to have more expected
-        packet entries added to it.
-
-        For launch-inferior mode, the stub is first started, then no ack mode is
-        setup on the expected packet stream, then the verified launch packets are added
-        to the expected socket stream.  The packet stream is not yet executed, ready
-        to have more expected packet entries added to it.
-
-        The return value is:
-        {inferior:<inferior>, server:<server>}
-        """
-        inferior = None
-        attach_pid = None
-
-        if self._inferior_startup == self._STARTUP_ATTACH:
-            # Launch the process that we'll use as the inferior.
-            inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds)
-            self.assertIsNotNone(inferior)
-            self.assertTrue(inferior.pid > 0)
-            attach_pid = inferior.pid
-
-        # Launch the debug monitor stub, attaching to the inferior.
-        server = self.start_server(attach_pid=attach_pid)
-        self.assertIsNotNone(server)
-
-        if self._inferior_startup == self._STARTUP_LAUNCH:
-            # Build launch args
-            launch_args = [os.path.abspath('a.out')]
-            if inferior_args:
-                launch_args.extend(inferior_args)
-
-        # Build the expected protocol stream
-        self.add_no_ack_remote_stream()
-        if self._inferior_startup == self._STARTUP_LAUNCH:
-            self.add_verified_launch_packets(launch_args)
-
-        return {"inferior":inferior, "server":server}
-
-    def add_no_ack_remote_stream(self):
-        self.test_sequence.add_log_lines(
-            ["read packet: +",
-             "read packet: $QStartNoAckMode#b0",
-             "send packet: +",
-             "send packet: $OK#9a",
-             "read packet: +"],
-            True)
-
-    def add_verified_launch_packets(self, launch_args):
-        self.test_sequence.add_log_lines(
-            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
-             "send packet: $OK#00",
-             "read packet: $qLaunchSuccess#a5",
-             "send packet: $OK#00"],
-            True)
-
-    def add_thread_suffix_request_packets(self):
-        self.test_sequence.add_log_lines(
-            ["read packet: $QThreadSuffixSupported#00",
-             "send packet: $OK#00",
-            ], True)
-
-    def add_process_info_collection_packets(self):
-        self.test_sequence.add_log_lines(
-            ["read packet: $qProcessInfo#00",
-              { "direction":"send", "regex":r"^\$(.+)#00", "capture":{1:"process_info_raw"} }],
-            True)
-
-    _KNOWN_PROCESS_INFO_KEYS = [
-        "pid",
-        "parent-pid",
-        "real-uid",
-        "real-gid",
-        "effective-uid",
-        "effective-gid",
-        "cputype",
-        "cpusubtype",
-        "ostype",
-        "vendor",
-        "endian",
-        "ptrsize"
-        ]
-
-    def parse_process_info_response(self, context):
-        # Ensure we have a process info response.
-        self.assertIsNotNone(context)
-        process_info_raw = context.get("process_info_raw")
-        self.assertIsNotNone(process_info_raw)
-
-        # Pull out key:value; pairs.
-        process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
-
-        # Validate keys are known.
-        for (key, val) in process_info_dict.items():
-            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
-            self.assertIsNotNone(val)
-
-        return process_info_dict
-
-    def add_register_info_collection_packets(self):
-        self.test_sequence.add_log_lines(
-            [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
-              "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
-              "save_key":"reg_info_responses" } ],
-            True)
-
-    def parse_register_info_packets(self, context):
-        """Return an array of register info dictionaries, one per register info."""
-        reg_info_responses = context.get("reg_info_responses")
-        self.assertIsNotNone(reg_info_responses)
-
-        # Parse register infos.
-        return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
-
-    def expect_gdbremote_sequence(self):
-        return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, self._TIMEOUT_SECONDS, self.logger)
-
-    _KNOWN_REGINFO_KEYS = [
-        "name",
-        "alt-name",
-        "bitsize",
-        "offset",
-        "encoding",
-        "format",
-        "set",
-        "gcc",
-        "dwarf",
-        "generic",
-        "container-regs",
-        "invalidate-regs"
-    ]
-
-    def assert_valid_reg_info(self, reg_info):
-        # Assert we know about all the reginfo keys parsed.
-        for key in reg_info:
-            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
-        
-        # Check the bare-minimum expected set of register info keys.
-        self.assertTrue("name" in reg_info)
-        self.assertTrue("bitsize" in reg_info)
-        self.assertTrue("offset" in reg_info)
-        self.assertTrue("encoding" in reg_info)
-        self.assertTrue("format" in reg_info)
-
-    def find_pc_reg_info(self, reg_infos):
-        lldb_reg_index = 0
-        for reg_info in reg_infos:
-            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
-                return (lldb_reg_index, reg_info)
-            lldb_reg_index += 1
-
-        return (None, None)
-
-    def add_lldb_register_index(self, reg_infos):
-        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
-
-        We'll use this when we want to call packets like P/p with a register index but do so
-        on only a subset of the full register info set.
-        """
-        self.assertIsNotNone(reg_infos)
-
-        reg_index = 0
-        for reg_info in reg_infos:
-            reg_info["lldb_register_index"] = reg_index
-            reg_index += 1
 
-    def add_query_memory_region_packets(self, address):
-        self.test_sequence.add_log_lines(
-            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
-             {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
-            True)
-        
-    def parse_memory_region_packet(self, context):
-        # Ensure we have a context.
-        self.assertIsNotNone(context.get("memory_region_response"))
-        
-        # Pull out key:value; pairs.
-        mem_region_dict = {match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", context.get("memory_region_response"))}
-
-        # Validate keys are known.
-        for (key, val) in mem_region_dict.items():
-            self.assertTrue(key in ["start", "size", "permissions", "error"])
-            self.assertIsNotNone(val)
-
-        # Return the dictionary of key-value pairs for the memory region.
-        return mem_region_dict
-
-    def assert_address_within_memory_region(self, test_address, mem_region_dict):
-        self.assertIsNotNone(mem_region_dict)
-        self.assertTrue("start" in mem_region_dict)
-        self.assertTrue("size" in mem_region_dict)
-        
-        range_start = int(mem_region_dict["start"], 16)
-        range_size = int(mem_region_dict["size"], 16)
-        range_end = range_start + range_size
-
-        if test_address < range_start:
-            self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
-        elif test_address >= range_end:
-            self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
-
-    def add_threadinfo_collection_packets(self):
-        self.test_sequence.add_log_lines(
-            [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
-                "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
-              "save_key":"threadinfo_responses" } ],
-            True)
+import gdbremote_testcase
 
-    def parse_threadinfo_packets(self, context):
-        """Return an array of thread ids (decimal ints), one per thread."""
-        threadinfo_responses = context.get("threadinfo_responses")
-        self.assertIsNotNone(threadinfo_responses)
-
-        thread_ids = []
-        for threadinfo_response in threadinfo_responses:
-            new_thread_infos = parse_threadinfo_response(threadinfo_response)
-            thread_ids.extend(new_thread_infos)
-        return thread_ids
-
-    def wait_for_thread_count(self, thread_count, timeout_seconds=3):
-        start_time = time.time()
-        timeout_time = start_time + timeout_seconds
-
-        actual_thread_count = 0
-        while actual_thread_count < thread_count:
-            self.reset_test_sequence()
-            self.add_threadinfo_collection_packets()
-
-            context = self.expect_gdbremote_sequence()
-            self.assertIsNotNone(context)
-
-            threads = self.parse_threadinfo_packets(context)
-            self.assertIsNotNone(threads)
-
-            actual_thread_count = len(threads)
-
-            if time.time() > timeout_time:
-                raise Exception(
-                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
-                        timeout_seconds, thread_count, actual_thread_count))
-
-        return threads
-
-    def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
-        self.test_sequence.add_log_lines(
-            [# Set the breakpoint.
-             "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
-             # Verify the stub could set it.
-             "send packet: $OK#00",
-             ], True)
-        
-        if (do_continue):
-            self.test_sequence.add_log_lines(
-                [# Continue the inferior.
-                 "read packet: $c#00",
-                 # Expect a breakpoint stop report.
-                 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
-                 ], True)        
-
-    def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
-        self.test_sequence.add_log_lines(
-            [# Remove the breakpoint.
-             "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
-             # Verify the stub could unset it.
-             "send packet: $OK#00",
-            ], True)
-
-    def add_qSupported_packets(self):
-        self.test_sequence.add_log_lines(
-            ["read packet: $qSupported#00",
-             {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
-            ], True)
-
-    _KNOWN_QSUPPORTED_STUB_FEATURES = [
-        "augmented-libraries-svr4-read",
-        "PacketSize",
-        "QStartNoAckMode",
-        "qXfer:auxv:read",
-        "qXfer:libraries:read",
-        "qXfer:libraries-svr4:read",
-    ]
-
-    def parse_qSupported_response(self, context):
-        self.assertIsNotNone(context)
-
-        raw_response = context.get("qSupported_response")
-        self.assertIsNotNone(raw_response)
-
-        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
-        # +,-,? is stripped from the key and set as the value.
-        supported_dict = {}
-        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
-            key = match.group(1)
-            val = match.group(3)
-            
-            # key=val: store as is
-            if val and len(val) > 0:
-                supported_dict[key] = val
-            else:
-                if len(key) < 2:
-                    raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
-                supported_type = key[-1]
-                key = key[:-1]
-                if not supported_type in ["+", "-", "?"]:
-                    raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
-                supported_dict[key] = supported_type 
-            # Ensure we know the supported element
-            if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
-                raise Exception("unknown qSupported stub feature reported: %s" % key)
-
-        return supported_dict
-
-    def run_process_then_stop(self, run_seconds=1):
-        # Tell the stub to continue.
-        self.test_sequence.add_log_lines(
-             ["read packet: $vCont;c#00"],
-             True)
-        context = self.expect_gdbremote_sequence()
-
-        # Wait for run_seconds.
-        time.sleep(run_seconds)
-
-        # Send an interrupt, capture a T response.
-        self.reset_test_sequence()
-        self.test_sequence.add_log_lines(
-            ["read packet: {}".format(chr(03)),
-             {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
-            True)
-        context = self.expect_gdbremote_sequence()
-        self.assertIsNotNone(context)
-        self.assertIsNotNone(context.get("stop_result"))
-        
-        return context
-
-    def select_modifiable_register(self, reg_infos):
-        """Find a register that can be read/written freely."""
-        PREFERRED_REGISTER_NAMES = sets.Set(["rax",])
-
-        # First check for the first register from the preferred register name set.
-        alternative_register_index = None
-
-        self.assertIsNotNone(reg_infos)
-        for reg_info in reg_infos:
-            if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
-                # We found a preferred register.  Use it.
-                return reg_info["lldb_register_index"]
-            if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
-                # A frame pointer register will do as a register to modify temporarily.
-                alternative_register_index = reg_info["lldb_register_index"]
-
-        # We didn't find a preferred register.  Return whatever alternative register
-        # we found, if any.
-        return alternative_register_index
+class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase):
 
     @debugserver_test
     def test_exe_starts_debugserver(self):
@@ -584,7 +99,7 @@ class LldbGdbServerTestCase(TestBase):
 
         self.add_no_ack_remote_stream()
         self.test_sequence.add_log_lines(
-            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
+            ["read packet: %s" % lldbgdbserverutils.build_gdbremote_A_packet(launch_args),
              "send packet: $OK#9a"],
             True)
         self.expect_gdbremote_sequence()
@@ -681,13 +196,9 @@ class LldbGdbServerTestCase(TestBase):
              {"type":"output_match", "regex":r"^hello, world\r\n$" },
              "send packet: $W00#00"],
             True)
-            
+
         context = self.expect_gdbremote_sequence()
         self.assertIsNotNone(context)
-        
-        # O_content = context.get("O_content")
-        # self.assertIsNotNone(O_content)
-        # self.assertEquals(O_content, "hello, world\r\n")
 
     @debugserver_test
     @dsym_test
@@ -755,7 +266,7 @@ class LldbGdbServerTestCase(TestBase):
         self.assertNotEqual(0, pid)
 
         # If possible, verify that the process is running.
-        self.assertTrue(process_is_running(pid, True))
+        self.assertTrue(lldbgdbserverutils.process_is_running(pid, True))
 
     @debugserver_test
     @dsym_test
@@ -853,7 +364,7 @@ class LldbGdbServerTestCase(TestBase):
         self.assertIsNotNone(poll_result)
 
         # Where possible, verify at the system level that the process is not running.
-        self.assertFalse(process_is_running(procs["inferior"].pid, False))
+        self.assertFalse(lldbgdbserverutils.process_is_running(procs["inferior"].pid, False))
 
     @debugserver_test
     @dsym_test
@@ -885,7 +396,7 @@ class LldbGdbServerTestCase(TestBase):
         self.assertIsNotNone(poll_result)
 
         # Where possible, verify at the system level that the process is not running.
-        self.assertFalse(process_is_running(procs["inferior"].pid, False))
+        self.assertFalse(lldbgdbserverutils.process_is_running(procs["inferior"].pid, False))
 
     @debugserver_test
     @dsym_test
@@ -925,7 +436,7 @@ class LldbGdbServerTestCase(TestBase):
 
         reg_info_packet = context.get("reginfo_0")
         self.assertIsNotNone(reg_info_packet)
-        self.assert_valid_reg_info(parse_reg_info_response(reg_info_packet))
+        self.assert_valid_reg_info(lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
 
     @debugserver_test
     @dsym_test
@@ -1248,7 +759,7 @@ class LldbGdbServerTestCase(TestBase):
     def Hg_switches_to_3_threads(self):
         # Startup the inferior with three threads (main + 2 new ones).
         procs = self.prep_debug_monitor_and_inferior(inferior_args=["thread:new", "thread:new"])
-        
+
         # Let the inferior process have a few moments to start up the thread when launched.  (The launch scenario has no time to run, so threads won't be there yet.)
         self.run_process_then_stop(run_seconds=1)
 
@@ -1266,10 +777,10 @@ class LldbGdbServerTestCase(TestBase):
                  "read packet: $qC#00",
                  { "direction":"send", "regex":r"^\$QC([0-9a-fA-F]+)#", "capture":{1:"thread_id"} }],
                 True)
-            
+
             context = self.expect_gdbremote_sequence()
             self.assertIsNotNone(context)
-            
+
             # Verify the thread id.
             self.assertIsNotNone(context.get("thread_id"))
             self.assertEquals(int(context.get("thread_id"), 16), thread)
@@ -1313,7 +824,7 @@ class LldbGdbServerTestCase(TestBase):
         # and the test requires getting stdout from the exe.
 
         NUM_THREADS = 3
-        
+
         # Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
         # inferior_args=["thread:print-ids"]
         inferior_args=["thread:segfault"]
@@ -1347,16 +858,16 @@ class LldbGdbServerTestCase(TestBase):
                  {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"signo", 2:"thread_id"} }
                  ], True)
             context = self.expect_gdbremote_sequence()
-            
+
             self.assertIsNotNone(context)
             signo = context.get("signo")
             self.assertEqual(int(signo, 16), self.TARGET_EXC_BAD_ACCESS)
-            
+
             # Ensure we haven't seen this tid yet.
             thread_id = int(context.get("thread_id"), 16)
             self.assertFalse(thread_id in signaled_tids)
             signaled_tids[thread_id] = 1
-            
+
             # Send SIGUSR1 to the thread that signaled the SIGSEGV.
             self.reset_test_sequence()
             self.test_sequence.add_log_lines(
@@ -1381,7 +892,7 @@ class LldbGdbServerTestCase(TestBase):
             stop_signo = context.get("stop_signo")
             self.assertIsNotNone(stop_signo)
             self.assertEquals(int(stop_signo,16), signal.SIGUSR1)
-            
+
             # Ensure the stop thread is the thread to which we delivered the signal.
             stop_thread_id = context.get("stop_thread_id")
             self.assertIsNotNone(stop_thread_id)
@@ -1391,7 +902,7 @@ class LldbGdbServerTestCase(TestBase):
             # print_thread_id = context.get("print_thread_id")
             # self.assertIsNotNone(print_thread_id)
             # self.assertFalse(print_thread_id in print_thread_ids)
-            
+
             # Now remember this print (i.e. inferior-reflected) thread id and ensure we don't hit it again.
             # print_thread_ids[print_thread_id] = 1
 
@@ -1458,7 +969,7 @@ class LldbGdbServerTestCase(TestBase):
         self.assertIsNotNone(context.get("read_contents"))
         read_contents = context.get("read_contents").decode("hex")
         self.assertEquals(read_contents, MEMORY_CONTENTS)
-        
+
     @debugserver_test
     @dsym_test
     def test_m_packet_reads_memory_debugserver_dsym(self):
@@ -1544,10 +1055,9 @@ class LldbGdbServerTestCase(TestBase):
         self.assertTrue("permissions" in mem_region_dict)
         self.assertTrue("r" in mem_region_dict["permissions"])
         self.assertTrue("x" in mem_region_dict["permissions"])
-        
+
         # Ensure the start address and size encompass the address we queried.
         self.assert_address_within_memory_region(code_address, mem_region_dict)
-        
 
     @debugserver_test
     @dsym_test
@@ -1610,7 +1120,6 @@ class LldbGdbServerTestCase(TestBase):
         # Ensure the start address and size encompass the address we queried.
         self.assert_address_within_memory_region(stack_address, mem_region_dict)
 
-
     @debugserver_test
     @dsym_test
     def test_qMemoryRegionInfo_reports_stack_address_as_readable_writeable_debugserver_dsym(self):
@@ -1768,7 +1277,7 @@ class LldbGdbServerTestCase(TestBase):
         self.assertIsNotNone(p_response)
 
         # Convert from target endian to int.
-        returned_pc = unpack_register_hex_unsigned(endian, p_response)
+        returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
         self.assertEquals(returned_pc, function_address)
 
         # Verify that a breakpoint remove and continue gets us the expected output.
@@ -1812,7 +1321,7 @@ class LldbGdbServerTestCase(TestBase):
         g_c2_address = args["g_c2_address"]
         expected_g_c1 = args["expected_g_c1"]
         expected_g_c2 = args["expected_g_c2"]
-        
+
         # Read g_c1 and g_c2 contents.
         self.reset_test_sequence()
         self.test_sequence.add_log_lines(
@@ -1852,13 +1361,13 @@ class LldbGdbServerTestCase(TestBase):
             self.assertIsNotNone(context)
             self.assertIsNotNone(context.get("stop_signo"))
             self.assertEquals(int(context.get("stop_signo"), 16), signal.SIGTRAP)
-           
+
             single_step_count += 1
-            
+
             # See if the predicate is true.  If so, we're done.
             if predicate(args):
                 return (True, single_step_count)
-        
+
         # The predicate didn't return true within the runaway step count.
         return (False, single_step_count)
 
@@ -1922,7 +1431,7 @@ class LldbGdbServerTestCase(TestBase):
         args["expected_g_c2"] = "1"
 
         self.assertTrue(self.g_c1_c2_contents_are(args))
-        
+
         # Verify we take only a small number of steps to hit the first state.  Might need to work through function entry prologue code.
         args["expected_g_c1"] = "1"
         args["expected_g_c2"] = "1"
@@ -2086,7 +1595,7 @@ class LldbGdbServerTestCase(TestBase):
             # Verify the response length.
             p_response = context.get("p_response")
             self.assertIsNotNone(p_response)
-            initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
+            initial_reg_value = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
 
             # Flip the value by xoring with all 1s
             all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
@@ -2096,7 +1605,7 @@ class LldbGdbServerTestCase(TestBase):
             # Write the flipped value to the register.
             self.reset_test_sequence()
             self.test_sequence.add_log_lines(
-                ["read packet: $P{0:x}={1}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size)),
+                ["read packet: $P{0:x}={1}#00".format(reg_index, lldbgdbserverutils.pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size)),
                 { "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
                 ], True)
             context = self.expect_gdbremote_sequence()
@@ -2125,7 +1634,7 @@ class LldbGdbServerTestCase(TestBase):
 
                 verify_p_response_raw = context.get("p_response")
                 self.assertIsNotNone(verify_p_response_raw)
-                verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
+                verify_bits = lldbgdbserverutils.unpack_register_hex_unsigned(endian, verify_p_response_raw)
 
                 if verify_bits != flipped_bits_int:
                     # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
@@ -2235,12 +1744,12 @@ class LldbGdbServerTestCase(TestBase):
                 # Set the next value to use for writing as the increment plus current value.
                 p_response = context.get("p_response")
                 self.assertIsNotNone(p_response)
-                next_value = unpack_register_hex_unsigned(endian, p_response)
+                next_value = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
 
             # Set new value using P and thread suffix.
             self.reset_test_sequence()
             self.test_sequence.add_log_lines(
-                ["read packet: $P{0:x}={1};thread:{2:x}#00".format(reg_index, pack_register_hex(endian, next_value, byte_size=reg_byte_size), thread),
+                ["read packet: $P{0:x}={1};thread:{2:x}#00".format(reg_index, lldbgdbserverutils.pack_register_hex(endian, next_value, byte_size=reg_byte_size), thread),
                  "send packet: $OK#00",
                 ], True)
             context = self.expect_gdbremote_sequence()
@@ -2267,7 +1776,7 @@ class LldbGdbServerTestCase(TestBase):
             # Get the register value.
             p_response = context.get("p_response")
             self.assertIsNotNone(p_response)
-            read_value = unpack_register_hex_unsigned(endian, p_response)
+            read_value = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
 
             # Make sure we read back what we wrote.
             self.assertEquals(read_value, expected_reg_values[thread_index])

Added: lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py?rev=210931&view=auto
==============================================================================
--- lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py (added)
+++ lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py Fri Jun 13 14:11:33 2014
@@ -0,0 +1,543 @@
+"""
+Base class for gdb-remote test cases.
+"""
+
+import unittest2
+import pexpect
+import platform
+import sets
+import signal
+import socket
+import subprocess
+import sys
+import time
+from lldbtest import *
+from lldbgdbserverutils import *
+import logging
+import os.path
+
+class GdbRemoteTestCaseBase(TestBase):
+
+    mydir = TestBase.compute_mydir(__file__)
+
+    port = 12345
+
+    _TIMEOUT_SECONDS = 5
+
+    _GDBREMOTE_KILL_PACKET = "$k#6b"
+
+    _LOGGING_LEVEL = logging.WARNING
+    # _LOGGING_LEVEL = logging.DEBUG
+
+    _STARTUP_ATTACH = "attach"
+    _STARTUP_LAUNCH = "launch"
+
+    # GDB Signal numbers that are not target-specific used for common exceptions
+    TARGET_EXC_BAD_ACCESS      = 0x91
+    TARGET_EXC_BAD_INSTRUCTION = 0x92
+    TARGET_EXC_ARITHMETIC      = 0x93
+    TARGET_EXC_EMULATION       = 0x94
+    TARGET_EXC_SOFTWARE        = 0x95
+    TARGET_EXC_BREAKPOINT      = 0x96
+
+    def setUp(self):
+        TestBase.setUp(self)
+        FORMAT = '%(asctime)-15s %(levelname)-8s %(message)s'
+        logging.basicConfig(format=FORMAT)
+        self.logger = logging.getLogger(__name__)
+        self.logger.setLevel(self._LOGGING_LEVEL)
+        self.test_sequence = GdbRemoteTestSequence(self.logger)
+        self.set_inferior_startup_launch()
+
+        # Uncomment this code to force only a single test to run (by name).
+        #if not re.search(r"P_", self._testMethodName):
+        #    self.skipTest("focusing on one test")
+
+    def reset_test_sequence(self):
+        self.test_sequence = GdbRemoteTestSequence(self.logger)
+
+    def init_llgs_test(self):
+        self.debug_monitor_exe = get_lldb_gdbserver_exe()
+        if not self.debug_monitor_exe:
+            self.skipTest("lldb_gdbserver exe not found")
+        self.debug_monitor_extra_args = ""
+
+    def init_debugserver_test(self):
+        self.debug_monitor_exe = get_debugserver_exe()
+        if not self.debug_monitor_exe:
+            self.skipTest("debugserver exe not found")
+        self.debug_monitor_extra_args = " --log-file=/tmp/packets-{}.log --log-flags=0x800000".format(self._testMethodName)
+
+    def create_socket(self):
+        sock = socket.socket()
+        logger = self.logger
+
+        def shutdown_socket():
+            if sock:
+                try:
+                    # send the kill packet so lldb-gdbserver shuts down gracefully
+                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
+                except:
+                    logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+
+                try:
+                    sock.close()
+                except:
+                    logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+
+        self.addTearDownHook(shutdown_socket)
+
+        sock.connect(('localhost', self.port))
+        return sock
+
+    def set_inferior_startup_launch(self):
+        self._inferior_startup = self._STARTUP_LAUNCH
+
+    def set_inferior_startup_attach(self):
+        self._inferior_startup = self._STARTUP_ATTACH
+
+    def start_server(self, attach_pid=None):
+        # Create the command line
+        commandline = "{}{} localhost:{}".format(self.debug_monitor_exe, self.debug_monitor_extra_args, self.port)
+        if attach_pid:
+            commandline += " --attach=%d" % attach_pid
+
+        # start the server
+        server = pexpect.spawn(commandline)
+
+        # Turn on logging for what the child sends back.
+        if self.TraceOn():
+            server.logfile_read = sys.stdout
+
+        # Schedule debug monitor to be shut down during teardown.
+        logger = self.logger
+        def shutdown_debug_monitor():
+            try:
+                server.close()
+            except:
+                logger.warning("failed to close pexpect server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+
+        self.addTearDownHook(shutdown_debug_monitor)
+
+        # Wait until we receive the server ready message before continuing.
+        server.expect_exact('Listening to port {} for a connection from localhost'.format(self.port))
+
+        # Create a socket to talk to the server
+        self.sock = self.create_socket()
+
+        return server
+
+    def launch_process_for_attach(self,inferior_args=None, sleep_seconds=3):
+        # We're going to start a child process that the debug monitor stub can later attach to.
+        # This process needs to be started so that it just hangs around for a while.  We'll
+        # have it sleep.
+        exe_path = os.path.abspath("a.out")
+
+        args = [exe_path]
+        if inferior_args:
+            args.extend(inferior_args)
+        if sleep_seconds:
+            args.append("sleep:%d" % sleep_seconds)
+
+        return subprocess.Popen(args)
+
+    def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3):
+        """Prep the debug monitor, the inferior, and the expected packet stream.
+
+        Handle the separate cases of using the debug monitor in attach-to-inferior mode
+        and in launch-inferior mode.
+
+        For attach-to-inferior mode, the inferior process is first started, then
+        the debug monitor is started in attach to pid mode (using --attach on the
+        stub command line), and the no-ack-mode setup is appended to the packet
+        stream.  The packet stream is not yet executed, ready to have more expected
+        packet entries added to it.
+
+        For launch-inferior mode, the stub is first started, then no ack mode is
+        setup on the expected packet stream, then the verified launch packets are added
+        to the expected socket stream.  The packet stream is not yet executed, ready
+        to have more expected packet entries added to it.
+
+        The return value is:
+        {inferior:<inferior>, server:<server>}
+        """
+        inferior = None
+        attach_pid = None
+
+        if self._inferior_startup == self._STARTUP_ATTACH:
+            # Launch the process that we'll use as the inferior.
+            inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds)
+            self.assertIsNotNone(inferior)
+            self.assertTrue(inferior.pid > 0)
+            attach_pid = inferior.pid
+
+        # Launch the debug monitor stub, attaching to the inferior.
+        server = self.start_server(attach_pid=attach_pid)
+        self.assertIsNotNone(server)
+
+        if self._inferior_startup == self._STARTUP_LAUNCH:
+            # Build launch args
+            launch_args = [os.path.abspath('a.out')]
+            if inferior_args:
+                launch_args.extend(inferior_args)
+
+        # Build the expected protocol stream
+        self.add_no_ack_remote_stream()
+        if self._inferior_startup == self._STARTUP_LAUNCH:
+            self.add_verified_launch_packets(launch_args)
+
+        return {"inferior":inferior, "server":server}
+
+    def add_no_ack_remote_stream(self):
+        self.test_sequence.add_log_lines(
+            ["read packet: +",
+             "read packet: $QStartNoAckMode#b0",
+             "send packet: +",
+             "send packet: $OK#9a",
+             "read packet: +"],
+            True)
+
+    def add_verified_launch_packets(self, launch_args):
+        self.test_sequence.add_log_lines(
+            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
+             "send packet: $OK#00",
+             "read packet: $qLaunchSuccess#a5",
+             "send packet: $OK#00"],
+            True)
+
+    def add_thread_suffix_request_packets(self):
+        self.test_sequence.add_log_lines(
+            ["read packet: $QThreadSuffixSupported#00",
+             "send packet: $OK#00",
+            ], True)
+
+    def add_process_info_collection_packets(self):
+        self.test_sequence.add_log_lines(
+            ["read packet: $qProcessInfo#00",
+              { "direction":"send", "regex":r"^\$(.+)#00", "capture":{1:"process_info_raw"} }],
+            True)
+
+    _KNOWN_PROCESS_INFO_KEYS = [
+        "pid",
+        "parent-pid",
+        "real-uid",
+        "real-gid",
+        "effective-uid",
+        "effective-gid",
+        "cputype",
+        "cpusubtype",
+        "ostype",
+        "vendor",
+        "endian",
+        "ptrsize"
+        ]
+
+    def parse_process_info_response(self, context):
+        # Ensure we have a process info response.
+        self.assertIsNotNone(context)
+        process_info_raw = context.get("process_info_raw")
+        self.assertIsNotNone(process_info_raw)
+
+        # Pull out key:value; pairs.
+        process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
+
+        # Validate keys are known.
+        for (key, val) in process_info_dict.items():
+            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
+            self.assertIsNotNone(val)
+
+        return process_info_dict
+
+    def add_register_info_collection_packets(self):
+        self.test_sequence.add_log_lines(
+            [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
+              "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
+              "save_key":"reg_info_responses" } ],
+            True)
+
+    def parse_register_info_packets(self, context):
+        """Return an array of register info dictionaries, one per register info."""
+        reg_info_responses = context.get("reg_info_responses")
+        self.assertIsNotNone(reg_info_responses)
+
+        # Parse register infos.
+        return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
+
+    def expect_gdbremote_sequence(self):
+        return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, self._TIMEOUT_SECONDS, self.logger)
+
+    _KNOWN_REGINFO_KEYS = [
+        "name",
+        "alt-name",
+        "bitsize",
+        "offset",
+        "encoding",
+        "format",
+        "set",
+        "gcc",
+        "dwarf",
+        "generic",
+        "container-regs",
+        "invalidate-regs"
+    ]
+
+    def assert_valid_reg_info(self, reg_info):
+        # Assert we know about all the reginfo keys parsed.
+        for key in reg_info:
+            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
+
+        # Check the bare-minimum expected set of register info keys.
+        self.assertTrue("name" in reg_info)
+        self.assertTrue("bitsize" in reg_info)
+        self.assertTrue("offset" in reg_info)
+        self.assertTrue("encoding" in reg_info)
+        self.assertTrue("format" in reg_info)
+
+    def find_pc_reg_info(self, reg_infos):
+        lldb_reg_index = 0
+        for reg_info in reg_infos:
+            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
+                return (lldb_reg_index, reg_info)
+            lldb_reg_index += 1
+
+        return (None, None)
+
+    def add_lldb_register_index(self, reg_infos):
+        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
+
+        We'll use this when we want to call packets like P/p with a register index but do so
+        on only a subset of the full register info set.
+        """
+        self.assertIsNotNone(reg_infos)
+
+        reg_index = 0
+        for reg_info in reg_infos:
+            reg_info["lldb_register_index"] = reg_index
+            reg_index += 1
+
+    def add_query_memory_region_packets(self, address):
+        self.test_sequence.add_log_lines(
+            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
+             {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
+            True)
+
+    def parse_key_val_dict(self, key_val_text):
+        self.assertIsNotNone(key_val_text)
+        kv_dict = {}
+        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
+            kv_dict[match.group(1)] = match.group(2)
+        return kv_dict
+
+    def parse_memory_region_packet(self, context):
+        # Ensure we have a context.
+        self.assertIsNotNone(context.get("memory_region_response"))
+
+        # Pull out key:value; pairs.
+        mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
+
+        # Validate keys are known.
+        for (key, val) in mem_region_dict.items():
+            self.assertTrue(key in ["start", "size", "permissions", "error"])
+            self.assertIsNotNone(val)
+
+        # Return the dictionary of key-value pairs for the memory region.
+        return mem_region_dict
+
+    def assert_address_within_memory_region(self, test_address, mem_region_dict):
+        self.assertIsNotNone(mem_region_dict)
+        self.assertTrue("start" in mem_region_dict)
+        self.assertTrue("size" in mem_region_dict)
+
+        range_start = int(mem_region_dict["start"], 16)
+        range_size = int(mem_region_dict["size"], 16)
+        range_end = range_start + range_size
+
+        if test_address < range_start:
+            self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+        elif test_address >= range_end:
+            self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+
+    def add_threadinfo_collection_packets(self):
+        self.test_sequence.add_log_lines(
+            [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
+                "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
+              "save_key":"threadinfo_responses" } ],
+            True)
+
+    def parse_threadinfo_packets(self, context):
+        """Return an array of thread ids (decimal ints), one per thread."""
+        threadinfo_responses = context.get("threadinfo_responses")
+        self.assertIsNotNone(threadinfo_responses)
+
+        thread_ids = []
+        for threadinfo_response in threadinfo_responses:
+            new_thread_infos = parse_threadinfo_response(threadinfo_response)
+            thread_ids.extend(new_thread_infos)
+        return thread_ids
+
+    def wait_for_thread_count(self, thread_count, timeout_seconds=3):
+        start_time = time.time()
+        timeout_time = start_time + timeout_seconds
+
+        actual_thread_count = 0
+        while actual_thread_count < thread_count:
+            self.reset_test_sequence()
+            self.add_threadinfo_collection_packets()
+
+            context = self.expect_gdbremote_sequence()
+            self.assertIsNotNone(context)
+
+            threads = self.parse_threadinfo_packets(context)
+            self.assertIsNotNone(threads)
+
+            actual_thread_count = len(threads)
+
+            if time.time() > timeout_time:
+                raise Exception(
+                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
+                        timeout_seconds, thread_count, actual_thread_count))
+
+        return threads
+
+    def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
+        self.test_sequence.add_log_lines(
+            [# Set the breakpoint.
+             "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
+             # Verify the stub could set it.
+             "send packet: $OK#00",
+             ], True)
+
+        if (do_continue):
+            self.test_sequence.add_log_lines(
+                [# Continue the inferior.
+                 "read packet: $c#00",
+                 # Expect a breakpoint stop report.
+                 {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
+                 ], True)        
+
+    def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
+        self.test_sequence.add_log_lines(
+            [# Remove the breakpoint.
+             "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
+             # Verify the stub could unset it.
+             "send packet: $OK#00",
+            ], True)
+
+    def add_qSupported_packets(self):
+        self.test_sequence.add_log_lines(
+            ["read packet: $qSupported#00",
+             {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
+            ], True)
+
+    _KNOWN_QSUPPORTED_STUB_FEATURES = [
+        "augmented-libraries-svr4-read",
+        "PacketSize",
+        "QStartNoAckMode",
+        "qXfer:auxv:read",
+        "qXfer:libraries:read",
+        "qXfer:libraries-svr4:read",
+    ]
+
+    def parse_qSupported_response(self, context):
+        self.assertIsNotNone(context)
+
+        raw_response = context.get("qSupported_response")
+        self.assertIsNotNone(raw_response)
+
+        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
+        # +,-,? is stripped from the key and set as the value.
+        supported_dict = {}
+        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
+            key = match.group(1)
+            val = match.group(3)
+
+            # key=val: store as is
+            if val and len(val) > 0:
+                supported_dict[key] = val
+            else:
+                if len(key) < 2:
+                    raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
+                supported_type = key[-1]
+                key = key[:-1]
+                if not supported_type in ["+", "-", "?"]:
+                    raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
+                supported_dict[key] = supported_type 
+            # Ensure we know the supported element
+            if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
+                raise Exception("unknown qSupported stub feature reported: %s" % key)
+
+        return supported_dict
+
+    def run_process_then_stop(self, run_seconds=1):
+        # Tell the stub to continue.
+        self.test_sequence.add_log_lines(
+             ["read packet: $vCont;c#00"],
+             True)
+        context = self.expect_gdbremote_sequence()
+
+        # Wait for run_seconds.
+        time.sleep(run_seconds)
+
+        # Send an interrupt, capture a T response.
+        self.reset_test_sequence()
+        self.test_sequence.add_log_lines(
+            ["read packet: {}".format(chr(03)),
+             {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
+            True)
+        context = self.expect_gdbremote_sequence()
+        self.assertIsNotNone(context)
+        self.assertIsNotNone(context.get("stop_result"))
+
+        return context
+
+    def select_modifiable_register(self, reg_infos):
+        """Find a register that can be read/written freely."""
+        PREFERRED_REGISTER_NAMES = sets.Set(["rax",])
+
+        # First check for the first register from the preferred register name set.
+        alternative_register_index = None
+
+        self.assertIsNotNone(reg_infos)
+        for reg_info in reg_infos:
+            if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
+                # We found a preferred register.  Use it.
+                return reg_info["lldb_register_index"]
+            if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
+                # A frame pointer register will do as a register to modify temporarily.
+                alternative_register_index = reg_info["lldb_register_index"]
+
+        # We didn't find a preferred register.  Return whatever alternative register
+        # we found, if any.
+        return alternative_register_index
+
+    def extract_registers_from_stop_notification(self, stop_key_vals_text):
+        self.assertIsNotNone(stop_key_vals_text)
+        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
+
+        registers = {}
+        for (key, val) in kv_dict.items():
+            if re.match(r"^[0-9a-fA-F]+", key):
+                registers[int(key, 16)] = val
+        return registers
+
+    def gather_register_infos(self):
+        self.reset_test_sequence()
+        self.add_register_info_collection_packets()
+
+        context = self.expect_gdbremote_sequence()
+        self.assertIsNotNone(context)
+
+        reg_infos = self.parse_register_info_packets(context)
+        self.assertIsNotNone(reg_infos)
+        self.add_lldb_register_index(reg_infos)
+
+        return reg_infos
+
+    def find_generic_register_with_name(self, reg_infos, generic_name):
+        self.assertIsNotNone(reg_infos)
+        for reg_info in reg_infos:
+            if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
+                return reg_info
+        return None
+
+        
\ No newline at end of file





More information about the lldb-commits mailing list