[Lldb-commits] [lldb] r210931 - Added gdb-remote stop packet expedited register tests.
Todd Fiala
todd.fiala at gmail.com
Fri Jun 13 12:11:33 PDT 2014
Author: tfiala
Date: Fri Jun 13 14:11:33 2014
New Revision: 210931
URL: http://llvm.org/viewvc/llvm-project?rev=210931&view=rev
Log:
Added gdb-remote stop packet expedited register tests.
Expedited registers currently checked for are pc, fp and sp.
Also broke out the gdb-remote base test case logic into
class gdbremote_testcase.GdbRemoteTestCaseBase in the new
gdbremote_testcase.py file.
TestGdbRemoteExpeditedRegisters.py is the first gdb-remote area
to be contained in its own test case class file.
The monolithic TestLldbGdbServer.py has been modified to derive
from gdbremote_testcase.GdbRemoteTestCaseBase. Soon I will
pull out all the gdb-remote functional area tests from that class
into separate classes.
I'm intending to start all GdbRemote test cases with GdbRemote
so it is easy to run them all with a -p pattern match on the
test run infrastructure.
Also scanned and removed all cases of whitespace-only lines in
the files I touched.
Added:
lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py
lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py
Modified:
lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py
Added: lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py?rev=210931&view=auto
==============================================================================
--- lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py (added)
+++ lldb/trunk/test/tools/lldb-gdbserver/TestGdbRemoteExpeditedRegisters.py Fri Jun 13 14:11:33 2014
@@ -0,0 +1,132 @@
+import unittest2
+
+import gdbremote_testcase
+from lldbtest import *
+
+class TestGdbRemoteExpeditedRegisters(gdbremote_testcase.GdbRemoteTestCaseBase):
+
+ def gather_expedited_registers(self):
+ # Setup the stub and set the gdb remote command stream.
+ procs = self.prep_debug_monitor_and_inferior(inferior_args=["sleep:2"])
+ self.test_sequence.add_log_lines([
+ # Start up the inferior.
+ "read packet: $c#00",
+ # Immediately tell it to stop. We want to see what it reports.
+ "read packet: {}".format(chr(03)),
+ {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result", 2:"key_vals_text"} },
+ ], True)
+
+ # Run the gdb remote command stream.
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ # Pull out expedited registers.
+ key_vals_text = context.get("key_vals_text")
+ self.assertIsNotNone(key_vals_text)
+
+ expedited_registers = self.extract_registers_from_stop_notification(key_vals_text)
+ self.assertIsNotNone(expedited_registers)
+
+ return expedited_registers
+
+ def stop_notification_contains_generic_register(self, generic_register_name):
+ # Generate a stop reply, parse out expedited registers from stop notification.
+ expedited_registers = self.gather_expedited_registers()
+
+ # Gather target register infos.
+ reg_infos = self.gather_register_infos()
+
+ # Find the generic register.
+ reg_info = self.find_generic_register_with_name(reg_infos, generic_register_name)
+ self.assertIsNotNone(reg_info)
+
+ # Ensure the expedited registers contained it.
+ self.assertTrue(reg_info["lldb_register_index"] in expedited_registers)
+ # print "{} reg_info:{}".format(generic_register_name, reg_info)
+
+ def stop_notification_contains_any_registers(self):
+ # Generate a stop reply, parse out expedited registers from stop notification.
+ expedited_registers = self.gather_expedited_registers()
+ # Verify we have at least one expedited register.
+ self.assertTrue(len(expedited_registers) > 0)
+
+ @debugserver_test
+ @dsym_test
+ def test_stop_notification_contains_any_registers_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_any_registers()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_stop_notification_contains_any_registers_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_any_registers()
+
+ def stop_notification_contains_pc_register(self):
+ self.stop_notification_contains_generic_register("pc")
+
+ @debugserver_test
+ @dsym_test
+ def test_stop_notification_contains_pc_register_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_pc_register()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_stop_notification_contains_pc_register_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_pc_register()
+
+ def stop_notification_contains_fp_register(self):
+ self.stop_notification_contains_generic_register("fp")
+
+ @debugserver_test
+ @dsym_test
+ def test_stop_notification_contains_fp_register_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_fp_register()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_stop_notification_contains_fp_register_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_fp_register()
+
+ def stop_notification_contains_sp_register(self):
+ self.stop_notification_contains_generic_register("sp")
+
+ @debugserver_test
+ @dsym_test
+ def test_stop_notification_contains_sp_register_debugserver_dsym(self):
+ self.init_debugserver_test()
+ self.buildDsym()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_sp_register()
+
+ @llgs_test
+ @dwarf_test
+ @unittest2.expectedFailure()
+ def test_stop_notification_contains_sp_register_llgs_dwarf(self):
+ self.init_llgs_test()
+ self.buildDwarf()
+ self.set_inferior_startup_launch()
+ self.stop_notification_contains_sp_register()
+
+
+if __name__ == '__main__':
+ unittest2.main()
Modified: lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py?rev=210931&r1=210930&r2=210931&view=diff
==============================================================================
--- lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py (original)
+++ lldb/trunk/test/tools/lldb-gdbserver/TestLldbGdbServer.py Fri Jun 13 14:11:33 2014
@@ -1,507 +1,22 @@
"""
-Test lldb-gdbserver operation
+Test case for testing the gdbremote protocol.
+
+Tests run against debugserver and lldb-gdbserver (llgs).
+lldb-gdbserver tests run where the lldb-gdbserver exe is
+available.
+
+This class will be broken into smaller test case classes by
+gdb remote packet functional areas. For now it contains
+the initial set of tests implemented.
"""
+import lldbgdbserverutils
import unittest2
-import pexpect
-import platform
-import sets
-import signal
-import socket
-import subprocess
-import sys
-import time
from lldbtest import *
-from lldbgdbserverutils import *
-import logging
-import os.path
-
-class LldbGdbServerTestCase(TestBase):
-
- mydir = TestBase.compute_mydir(__file__)
-
- port = 12345
-
- _TIMEOUT_SECONDS = 5
-
- _GDBREMOTE_KILL_PACKET = "$k#6b"
-
- _LOGGING_LEVEL = logging.WARNING
- # _LOGGING_LEVEL = logging.DEBUG
-
- _STARTUP_ATTACH = "attach"
- _STARTUP_LAUNCH = "launch"
-
- # GDB Signal numbers that are not target-specific used for common exceptions
- TARGET_EXC_BAD_ACCESS = 0x91
- TARGET_EXC_BAD_INSTRUCTION = 0x92
- TARGET_EXC_ARITHMETIC = 0x93
- TARGET_EXC_EMULATION = 0x94
- TARGET_EXC_SOFTWARE = 0x95
- TARGET_EXC_BREAKPOINT = 0x96
-
- def setUp(self):
- TestBase.setUp(self)
- FORMAT = '%(asctime)-15s %(levelname)-8s %(message)s'
- logging.basicConfig(format=FORMAT)
- self.logger = logging.getLogger(__name__)
- self.logger.setLevel(self._LOGGING_LEVEL)
- self.test_sequence = GdbRemoteTestSequence(self.logger)
- self.set_inferior_startup_launch()
-
- # Uncomment this code to force only a single test to run (by name).
- #if not re.search(r"P_", self._testMethodName):
- # self.skipTest("focusing on one test")
-
- def reset_test_sequence(self):
- self.test_sequence = GdbRemoteTestSequence(self.logger)
-
- def init_llgs_test(self):
- self.debug_monitor_exe = get_lldb_gdbserver_exe()
- if not self.debug_monitor_exe:
- self.skipTest("lldb_gdbserver exe not found")
- self.debug_monitor_extra_args = ""
-
- def init_debugserver_test(self):
- self.debug_monitor_exe = get_debugserver_exe()
- if not self.debug_monitor_exe:
- self.skipTest("debugserver exe not found")
- self.debug_monitor_extra_args = " --log-file=/tmp/packets-{}.log --log-flags=0x800000".format(self._testMethodName)
-
- def create_socket(self):
- sock = socket.socket()
- logger = self.logger
-
- def shutdown_socket():
- if sock:
- try:
- # send the kill packet so lldb-gdbserver shuts down gracefully
- sock.sendall(LldbGdbServerTestCase._GDBREMOTE_KILL_PACKET)
- except:
- logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
-
- try:
- sock.close()
- except:
- logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
-
- self.addTearDownHook(shutdown_socket)
-
- sock.connect(('localhost', self.port))
- return sock
-
- def set_inferior_startup_launch(self):
- self._inferior_startup = self._STARTUP_LAUNCH
-
- def set_inferior_startup_attach(self):
- self._inferior_startup = self._STARTUP_ATTACH
-
- def start_server(self, attach_pid=None):
- # Create the command line
- commandline = "{}{} localhost:{}".format(self.debug_monitor_exe, self.debug_monitor_extra_args, self.port)
- if attach_pid:
- commandline += " --attach=%d" % attach_pid
-
- # start the server
- server = pexpect.spawn(commandline)
-
- # Turn on logging for what the child sends back.
- if self.TraceOn():
- server.logfile_read = sys.stdout
-
- # Schedule debug monitor to be shut down during teardown.
- logger = self.logger
- def shutdown_debug_monitor():
- try:
- server.close()
- except:
- logger.warning("failed to close pexpect server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
-
- self.addTearDownHook(shutdown_debug_monitor)
-
- # Wait until we receive the server ready message before continuing.
- server.expect_exact('Listening to port {} for a connection from localhost'.format(self.port))
-
- # Create a socket to talk to the server
- self.sock = self.create_socket()
-
- return server
-
- def launch_process_for_attach(self,inferior_args=None, sleep_seconds=3):
- # We're going to start a child process that the debug monitor stub can later attach to.
- # This process needs to be started so that it just hangs around for a while. We'll
- # have it sleep.
- exe_path = os.path.abspath("a.out")
-
- args = [exe_path]
- if inferior_args:
- args.extend(inferior_args)
- if sleep_seconds:
- args.append("sleep:%d" % sleep_seconds)
-
- return subprocess.Popen(args)
-
- def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3):
- """Prep the debug monitor, the inferior, and the expected packet stream.
-
- Handle the separate cases of using the debug monitor in attach-to-inferior mode
- and in launch-inferior mode.
-
- For attach-to-inferior mode, the inferior process is first started, then
- the debug monitor is started in attach to pid mode (using --attach on the
- stub command line), and the no-ack-mode setup is appended to the packet
- stream. The packet stream is not yet executed, ready to have more expected
- packet entries added to it.
-
- For launch-inferior mode, the stub is first started, then no ack mode is
- setup on the expected packet stream, then the verified launch packets are added
- to the expected socket stream. The packet stream is not yet executed, ready
- to have more expected packet entries added to it.
-
- The return value is:
- {inferior:<inferior>, server:<server>}
- """
- inferior = None
- attach_pid = None
-
- if self._inferior_startup == self._STARTUP_ATTACH:
- # Launch the process that we'll use as the inferior.
- inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds)
- self.assertIsNotNone(inferior)
- self.assertTrue(inferior.pid > 0)
- attach_pid = inferior.pid
-
- # Launch the debug monitor stub, attaching to the inferior.
- server = self.start_server(attach_pid=attach_pid)
- self.assertIsNotNone(server)
-
- if self._inferior_startup == self._STARTUP_LAUNCH:
- # Build launch args
- launch_args = [os.path.abspath('a.out')]
- if inferior_args:
- launch_args.extend(inferior_args)
-
- # Build the expected protocol stream
- self.add_no_ack_remote_stream()
- if self._inferior_startup == self._STARTUP_LAUNCH:
- self.add_verified_launch_packets(launch_args)
-
- return {"inferior":inferior, "server":server}
-
- def add_no_ack_remote_stream(self):
- self.test_sequence.add_log_lines(
- ["read packet: +",
- "read packet: $QStartNoAckMode#b0",
- "send packet: +",
- "send packet: $OK#9a",
- "read packet: +"],
- True)
-
- def add_verified_launch_packets(self, launch_args):
- self.test_sequence.add_log_lines(
- ["read packet: %s" % build_gdbremote_A_packet(launch_args),
- "send packet: $OK#00",
- "read packet: $qLaunchSuccess#a5",
- "send packet: $OK#00"],
- True)
-
- def add_thread_suffix_request_packets(self):
- self.test_sequence.add_log_lines(
- ["read packet: $QThreadSuffixSupported#00",
- "send packet: $OK#00",
- ], True)
-
- def add_process_info_collection_packets(self):
- self.test_sequence.add_log_lines(
- ["read packet: $qProcessInfo#00",
- { "direction":"send", "regex":r"^\$(.+)#00", "capture":{1:"process_info_raw"} }],
- True)
-
- _KNOWN_PROCESS_INFO_KEYS = [
- "pid",
- "parent-pid",
- "real-uid",
- "real-gid",
- "effective-uid",
- "effective-gid",
- "cputype",
- "cpusubtype",
- "ostype",
- "vendor",
- "endian",
- "ptrsize"
- ]
-
- def parse_process_info_response(self, context):
- # Ensure we have a process info response.
- self.assertIsNotNone(context)
- process_info_raw = context.get("process_info_raw")
- self.assertIsNotNone(process_info_raw)
-
- # Pull out key:value; pairs.
- process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
-
- # Validate keys are known.
- for (key, val) in process_info_dict.items():
- self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
- self.assertIsNotNone(val)
-
- return process_info_dict
-
- def add_register_info_collection_packets(self):
- self.test_sequence.add_log_lines(
- [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
- "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
- "save_key":"reg_info_responses" } ],
- True)
-
- def parse_register_info_packets(self, context):
- """Return an array of register info dictionaries, one per register info."""
- reg_info_responses = context.get("reg_info_responses")
- self.assertIsNotNone(reg_info_responses)
-
- # Parse register infos.
- return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
-
- def expect_gdbremote_sequence(self):
- return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, self._TIMEOUT_SECONDS, self.logger)
-
- _KNOWN_REGINFO_KEYS = [
- "name",
- "alt-name",
- "bitsize",
- "offset",
- "encoding",
- "format",
- "set",
- "gcc",
- "dwarf",
- "generic",
- "container-regs",
- "invalidate-regs"
- ]
-
- def assert_valid_reg_info(self, reg_info):
- # Assert we know about all the reginfo keys parsed.
- for key in reg_info:
- self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
-
- # Check the bare-minimum expected set of register info keys.
- self.assertTrue("name" in reg_info)
- self.assertTrue("bitsize" in reg_info)
- self.assertTrue("offset" in reg_info)
- self.assertTrue("encoding" in reg_info)
- self.assertTrue("format" in reg_info)
-
- def find_pc_reg_info(self, reg_infos):
- lldb_reg_index = 0
- for reg_info in reg_infos:
- if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
- return (lldb_reg_index, reg_info)
- lldb_reg_index += 1
-
- return (None, None)
-
- def add_lldb_register_index(self, reg_infos):
- """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
-
- We'll use this when we want to call packets like P/p with a register index but do so
- on only a subset of the full register info set.
- """
- self.assertIsNotNone(reg_infos)
-
- reg_index = 0
- for reg_info in reg_infos:
- reg_info["lldb_register_index"] = reg_index
- reg_index += 1
- def add_query_memory_region_packets(self, address):
- self.test_sequence.add_log_lines(
- ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
- {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
- True)
-
- def parse_memory_region_packet(self, context):
- # Ensure we have a context.
- self.assertIsNotNone(context.get("memory_region_response"))
-
- # Pull out key:value; pairs.
- mem_region_dict = {match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", context.get("memory_region_response"))}
-
- # Validate keys are known.
- for (key, val) in mem_region_dict.items():
- self.assertTrue(key in ["start", "size", "permissions", "error"])
- self.assertIsNotNone(val)
-
- # Return the dictionary of key-value pairs for the memory region.
- return mem_region_dict
-
- def assert_address_within_memory_region(self, test_address, mem_region_dict):
- self.assertIsNotNone(mem_region_dict)
- self.assertTrue("start" in mem_region_dict)
- self.assertTrue("size" in mem_region_dict)
-
- range_start = int(mem_region_dict["start"], 16)
- range_size = int(mem_region_dict["size"], 16)
- range_end = range_start + range_size
-
- if test_address < range_start:
- self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
- elif test_address >= range_end:
- self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
-
- def add_threadinfo_collection_packets(self):
- self.test_sequence.add_log_lines(
- [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
- "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
- "save_key":"threadinfo_responses" } ],
- True)
+import gdbremote_testcase
- def parse_threadinfo_packets(self, context):
- """Return an array of thread ids (decimal ints), one per thread."""
- threadinfo_responses = context.get("threadinfo_responses")
- self.assertIsNotNone(threadinfo_responses)
-
- thread_ids = []
- for threadinfo_response in threadinfo_responses:
- new_thread_infos = parse_threadinfo_response(threadinfo_response)
- thread_ids.extend(new_thread_infos)
- return thread_ids
-
- def wait_for_thread_count(self, thread_count, timeout_seconds=3):
- start_time = time.time()
- timeout_time = start_time + timeout_seconds
-
- actual_thread_count = 0
- while actual_thread_count < thread_count:
- self.reset_test_sequence()
- self.add_threadinfo_collection_packets()
-
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
-
- threads = self.parse_threadinfo_packets(context)
- self.assertIsNotNone(threads)
-
- actual_thread_count = len(threads)
-
- if time.time() > timeout_time:
- raise Exception(
- 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
- timeout_seconds, thread_count, actual_thread_count))
-
- return threads
-
- def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
- self.test_sequence.add_log_lines(
- [# Set the breakpoint.
- "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
- # Verify the stub could set it.
- "send packet: $OK#00",
- ], True)
-
- if (do_continue):
- self.test_sequence.add_log_lines(
- [# Continue the inferior.
- "read packet: $c#00",
- # Expect a breakpoint stop report.
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
- ], True)
-
- def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
- self.test_sequence.add_log_lines(
- [# Remove the breakpoint.
- "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
- # Verify the stub could unset it.
- "send packet: $OK#00",
- ], True)
-
- def add_qSupported_packets(self):
- self.test_sequence.add_log_lines(
- ["read packet: $qSupported#00",
- {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
- ], True)
-
- _KNOWN_QSUPPORTED_STUB_FEATURES = [
- "augmented-libraries-svr4-read",
- "PacketSize",
- "QStartNoAckMode",
- "qXfer:auxv:read",
- "qXfer:libraries:read",
- "qXfer:libraries-svr4:read",
- ]
-
- def parse_qSupported_response(self, context):
- self.assertIsNotNone(context)
-
- raw_response = context.get("qSupported_response")
- self.assertIsNotNone(raw_response)
-
- # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
- # +,-,? is stripped from the key and set as the value.
- supported_dict = {}
- for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
- key = match.group(1)
- val = match.group(3)
-
- # key=val: store as is
- if val and len(val) > 0:
- supported_dict[key] = val
- else:
- if len(key) < 2:
- raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
- supported_type = key[-1]
- key = key[:-1]
- if not supported_type in ["+", "-", "?"]:
- raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
- supported_dict[key] = supported_type
- # Ensure we know the supported element
- if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
- raise Exception("unknown qSupported stub feature reported: %s" % key)
-
- return supported_dict
-
- def run_process_then_stop(self, run_seconds=1):
- # Tell the stub to continue.
- self.test_sequence.add_log_lines(
- ["read packet: $vCont;c#00"],
- True)
- context = self.expect_gdbremote_sequence()
-
- # Wait for run_seconds.
- time.sleep(run_seconds)
-
- # Send an interrupt, capture a T response.
- self.reset_test_sequence()
- self.test_sequence.add_log_lines(
- ["read packet: {}".format(chr(03)),
- {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
- True)
- context = self.expect_gdbremote_sequence()
- self.assertIsNotNone(context)
- self.assertIsNotNone(context.get("stop_result"))
-
- return context
-
- def select_modifiable_register(self, reg_infos):
- """Find a register that can be read/written freely."""
- PREFERRED_REGISTER_NAMES = sets.Set(["rax",])
-
- # First check for the first register from the preferred register name set.
- alternative_register_index = None
-
- self.assertIsNotNone(reg_infos)
- for reg_info in reg_infos:
- if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
- # We found a preferred register. Use it.
- return reg_info["lldb_register_index"]
- if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
- # A frame pointer register will do as a register to modify temporarily.
- alternative_register_index = reg_info["lldb_register_index"]
-
- # We didn't find a preferred register. Return whatever alternative register
- # we found, if any.
- return alternative_register_index
+class LldbGdbServerTestCase(gdbremote_testcase.GdbRemoteTestCaseBase):
@debugserver_test
def test_exe_starts_debugserver(self):
@@ -584,7 +99,7 @@ class LldbGdbServerTestCase(TestBase):
self.add_no_ack_remote_stream()
self.test_sequence.add_log_lines(
- ["read packet: %s" % build_gdbremote_A_packet(launch_args),
+ ["read packet: %s" % lldbgdbserverutils.build_gdbremote_A_packet(launch_args),
"send packet: $OK#9a"],
True)
self.expect_gdbremote_sequence()
@@ -681,13 +196,9 @@ class LldbGdbServerTestCase(TestBase):
{"type":"output_match", "regex":r"^hello, world\r\n$" },
"send packet: $W00#00"],
True)
-
+
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
-
- # O_content = context.get("O_content")
- # self.assertIsNotNone(O_content)
- # self.assertEquals(O_content, "hello, world\r\n")
@debugserver_test
@dsym_test
@@ -755,7 +266,7 @@ class LldbGdbServerTestCase(TestBase):
self.assertNotEqual(0, pid)
# If possible, verify that the process is running.
- self.assertTrue(process_is_running(pid, True))
+ self.assertTrue(lldbgdbserverutils.process_is_running(pid, True))
@debugserver_test
@dsym_test
@@ -853,7 +364,7 @@ class LldbGdbServerTestCase(TestBase):
self.assertIsNotNone(poll_result)
# Where possible, verify at the system level that the process is not running.
- self.assertFalse(process_is_running(procs["inferior"].pid, False))
+ self.assertFalse(lldbgdbserverutils.process_is_running(procs["inferior"].pid, False))
@debugserver_test
@dsym_test
@@ -885,7 +396,7 @@ class LldbGdbServerTestCase(TestBase):
self.assertIsNotNone(poll_result)
# Where possible, verify at the system level that the process is not running.
- self.assertFalse(process_is_running(procs["inferior"].pid, False))
+ self.assertFalse(lldbgdbserverutils.process_is_running(procs["inferior"].pid, False))
@debugserver_test
@dsym_test
@@ -925,7 +436,7 @@ class LldbGdbServerTestCase(TestBase):
reg_info_packet = context.get("reginfo_0")
self.assertIsNotNone(reg_info_packet)
- self.assert_valid_reg_info(parse_reg_info_response(reg_info_packet))
+ self.assert_valid_reg_info(lldbgdbserverutils.parse_reg_info_response(reg_info_packet))
@debugserver_test
@dsym_test
@@ -1248,7 +759,7 @@ class LldbGdbServerTestCase(TestBase):
def Hg_switches_to_3_threads(self):
# Startup the inferior with three threads (main + 2 new ones).
procs = self.prep_debug_monitor_and_inferior(inferior_args=["thread:new", "thread:new"])
-
+
# Let the inferior process have a few moments to start up the thread when launched. (The launch scenario has no time to run, so threads won't be there yet.)
self.run_process_then_stop(run_seconds=1)
@@ -1266,10 +777,10 @@ class LldbGdbServerTestCase(TestBase):
"read packet: $qC#00",
{ "direction":"send", "regex":r"^\$QC([0-9a-fA-F]+)#", "capture":{1:"thread_id"} }],
True)
-
+
context = self.expect_gdbremote_sequence()
self.assertIsNotNone(context)
-
+
# Verify the thread id.
self.assertIsNotNone(context.get("thread_id"))
self.assertEquals(int(context.get("thread_id"), 16), thread)
@@ -1313,7 +824,7 @@ class LldbGdbServerTestCase(TestBase):
# and the test requires getting stdout from the exe.
NUM_THREADS = 3
-
+
# Startup the inferior with three threads (main + NUM_THREADS-1 worker threads).
# inferior_args=["thread:print-ids"]
inferior_args=["thread:segfault"]
@@ -1347,16 +858,16 @@ class LldbGdbServerTestCase(TestBase):
{"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"signo", 2:"thread_id"} }
], True)
context = self.expect_gdbremote_sequence()
-
+
self.assertIsNotNone(context)
signo = context.get("signo")
self.assertEqual(int(signo, 16), self.TARGET_EXC_BAD_ACCESS)
-
+
# Ensure we haven't seen this tid yet.
thread_id = int(context.get("thread_id"), 16)
self.assertFalse(thread_id in signaled_tids)
signaled_tids[thread_id] = 1
-
+
# Send SIGUSR1 to the thread that signaled the SIGSEGV.
self.reset_test_sequence()
self.test_sequence.add_log_lines(
@@ -1381,7 +892,7 @@ class LldbGdbServerTestCase(TestBase):
stop_signo = context.get("stop_signo")
self.assertIsNotNone(stop_signo)
self.assertEquals(int(stop_signo,16), signal.SIGUSR1)
-
+
# Ensure the stop thread is the thread to which we delivered the signal.
stop_thread_id = context.get("stop_thread_id")
self.assertIsNotNone(stop_thread_id)
@@ -1391,7 +902,7 @@ class LldbGdbServerTestCase(TestBase):
# print_thread_id = context.get("print_thread_id")
# self.assertIsNotNone(print_thread_id)
# self.assertFalse(print_thread_id in print_thread_ids)
-
+
# Now remember this print (i.e. inferior-reflected) thread id and ensure we don't hit it again.
# print_thread_ids[print_thread_id] = 1
@@ -1458,7 +969,7 @@ class LldbGdbServerTestCase(TestBase):
self.assertIsNotNone(context.get("read_contents"))
read_contents = context.get("read_contents").decode("hex")
self.assertEquals(read_contents, MEMORY_CONTENTS)
-
+
@debugserver_test
@dsym_test
def test_m_packet_reads_memory_debugserver_dsym(self):
@@ -1544,10 +1055,9 @@ class LldbGdbServerTestCase(TestBase):
self.assertTrue("permissions" in mem_region_dict)
self.assertTrue("r" in mem_region_dict["permissions"])
self.assertTrue("x" in mem_region_dict["permissions"])
-
+
# Ensure the start address and size encompass the address we queried.
self.assert_address_within_memory_region(code_address, mem_region_dict)
-
@debugserver_test
@dsym_test
@@ -1610,7 +1120,6 @@ class LldbGdbServerTestCase(TestBase):
# Ensure the start address and size encompass the address we queried.
self.assert_address_within_memory_region(stack_address, mem_region_dict)
-
@debugserver_test
@dsym_test
def test_qMemoryRegionInfo_reports_stack_address_as_readable_writeable_debugserver_dsym(self):
@@ -1768,7 +1277,7 @@ class LldbGdbServerTestCase(TestBase):
self.assertIsNotNone(p_response)
# Convert from target endian to int.
- returned_pc = unpack_register_hex_unsigned(endian, p_response)
+ returned_pc = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
self.assertEquals(returned_pc, function_address)
# Verify that a breakpoint remove and continue gets us the expected output.
@@ -1812,7 +1321,7 @@ class LldbGdbServerTestCase(TestBase):
g_c2_address = args["g_c2_address"]
expected_g_c1 = args["expected_g_c1"]
expected_g_c2 = args["expected_g_c2"]
-
+
# Read g_c1 and g_c2 contents.
self.reset_test_sequence()
self.test_sequence.add_log_lines(
@@ -1852,13 +1361,13 @@ class LldbGdbServerTestCase(TestBase):
self.assertIsNotNone(context)
self.assertIsNotNone(context.get("stop_signo"))
self.assertEquals(int(context.get("stop_signo"), 16), signal.SIGTRAP)
-
+
single_step_count += 1
-
+
# See if the predicate is true. If so, we're done.
if predicate(args):
return (True, single_step_count)
-
+
# The predicate didn't return true within the runaway step count.
return (False, single_step_count)
@@ -1922,7 +1431,7 @@ class LldbGdbServerTestCase(TestBase):
args["expected_g_c2"] = "1"
self.assertTrue(self.g_c1_c2_contents_are(args))
-
+
# Verify we take only a small number of steps to hit the first state. Might need to work through function entry prologue code.
args["expected_g_c1"] = "1"
args["expected_g_c2"] = "1"
@@ -2086,7 +1595,7 @@ class LldbGdbServerTestCase(TestBase):
# Verify the response length.
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
- initial_reg_value = unpack_register_hex_unsigned(endian, p_response)
+ initial_reg_value = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
# Flip the value by xoring with all 1s
all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) / 8)
@@ -2096,7 +1605,7 @@ class LldbGdbServerTestCase(TestBase):
# Write the flipped value to the register.
self.reset_test_sequence()
self.test_sequence.add_log_lines(
- ["read packet: $P{0:x}={1}#00".format(reg_index, pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size)),
+ ["read packet: $P{0:x}={1}#00".format(reg_index, lldbgdbserverutils.pack_register_hex(endian, flipped_bits_int, byte_size=reg_byte_size)),
{ "direction":"send", "regex":r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", "capture":{1:"P_response"} },
], True)
context = self.expect_gdbremote_sequence()
@@ -2125,7 +1634,7 @@ class LldbGdbServerTestCase(TestBase):
verify_p_response_raw = context.get("p_response")
self.assertIsNotNone(verify_p_response_raw)
- verify_bits = unpack_register_hex_unsigned(endian, verify_p_response_raw)
+ verify_bits = lldbgdbserverutils.unpack_register_hex_unsigned(endian, verify_p_response_raw)
if verify_bits != flipped_bits_int:
# Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts.
@@ -2235,12 +1744,12 @@ class LldbGdbServerTestCase(TestBase):
# Set the next value to use for writing as the increment plus current value.
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
- next_value = unpack_register_hex_unsigned(endian, p_response)
+ next_value = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
# Set new value using P and thread suffix.
self.reset_test_sequence()
self.test_sequence.add_log_lines(
- ["read packet: $P{0:x}={1};thread:{2:x}#00".format(reg_index, pack_register_hex(endian, next_value, byte_size=reg_byte_size), thread),
+ ["read packet: $P{0:x}={1};thread:{2:x}#00".format(reg_index, lldbgdbserverutils.pack_register_hex(endian, next_value, byte_size=reg_byte_size), thread),
"send packet: $OK#00",
], True)
context = self.expect_gdbremote_sequence()
@@ -2267,7 +1776,7 @@ class LldbGdbServerTestCase(TestBase):
# Get the register value.
p_response = context.get("p_response")
self.assertIsNotNone(p_response)
- read_value = unpack_register_hex_unsigned(endian, p_response)
+ read_value = lldbgdbserverutils.unpack_register_hex_unsigned(endian, p_response)
# Make sure we read back what we wrote.
self.assertEquals(read_value, expected_reg_values[thread_index])
Added: lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py?rev=210931&view=auto
==============================================================================
--- lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py (added)
+++ lldb/trunk/test/tools/lldb-gdbserver/gdbremote_testcase.py Fri Jun 13 14:11:33 2014
@@ -0,0 +1,543 @@
+"""
+Base class for gdb-remote test cases.
+"""
+
+import unittest2
+import pexpect
+import platform
+import sets
+import signal
+import socket
+import subprocess
+import sys
+import time
+from lldbtest import *
+from lldbgdbserverutils import *
+import logging
+import os.path
+
+class GdbRemoteTestCaseBase(TestBase):
+
+ mydir = TestBase.compute_mydir(__file__)
+
+ port = 12345
+
+ _TIMEOUT_SECONDS = 5
+
+ _GDBREMOTE_KILL_PACKET = "$k#6b"
+
+ _LOGGING_LEVEL = logging.WARNING
+ # _LOGGING_LEVEL = logging.DEBUG
+
+ _STARTUP_ATTACH = "attach"
+ _STARTUP_LAUNCH = "launch"
+
+ # GDB Signal numbers that are not target-specific used for common exceptions
+ TARGET_EXC_BAD_ACCESS = 0x91
+ TARGET_EXC_BAD_INSTRUCTION = 0x92
+ TARGET_EXC_ARITHMETIC = 0x93
+ TARGET_EXC_EMULATION = 0x94
+ TARGET_EXC_SOFTWARE = 0x95
+ TARGET_EXC_BREAKPOINT = 0x96
+
+ def setUp(self):
+ TestBase.setUp(self)
+ FORMAT = '%(asctime)-15s %(levelname)-8s %(message)s'
+ logging.basicConfig(format=FORMAT)
+ self.logger = logging.getLogger(__name__)
+ self.logger.setLevel(self._LOGGING_LEVEL)
+ self.test_sequence = GdbRemoteTestSequence(self.logger)
+ self.set_inferior_startup_launch()
+
+ # Uncomment this code to force only a single test to run (by name).
+ #if not re.search(r"P_", self._testMethodName):
+ # self.skipTest("focusing on one test")
+
+ def reset_test_sequence(self):
+ self.test_sequence = GdbRemoteTestSequence(self.logger)
+
+ def init_llgs_test(self):
+ self.debug_monitor_exe = get_lldb_gdbserver_exe()
+ if not self.debug_monitor_exe:
+ self.skipTest("lldb_gdbserver exe not found")
+ self.debug_monitor_extra_args = ""
+
+ def init_debugserver_test(self):
+ self.debug_monitor_exe = get_debugserver_exe()
+ if not self.debug_monitor_exe:
+ self.skipTest("debugserver exe not found")
+ self.debug_monitor_extra_args = " --log-file=/tmp/packets-{}.log --log-flags=0x800000".format(self._testMethodName)
+
+ def create_socket(self):
+ sock = socket.socket()
+ logger = self.logger
+
+ def shutdown_socket():
+ if sock:
+ try:
+ # send the kill packet so lldb-gdbserver shuts down gracefully
+ sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
+ except:
+ logger.warning("failed to send kill packet to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+
+ try:
+ sock.close()
+ except:
+ logger.warning("failed to close socket to debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+
+ self.addTearDownHook(shutdown_socket)
+
+ sock.connect(('localhost', self.port))
+ return sock
+
+ def set_inferior_startup_launch(self):
+ self._inferior_startup = self._STARTUP_LAUNCH
+
+ def set_inferior_startup_attach(self):
+ self._inferior_startup = self._STARTUP_ATTACH
+
+ def start_server(self, attach_pid=None):
+ # Create the command line
+ commandline = "{}{} localhost:{}".format(self.debug_monitor_exe, self.debug_monitor_extra_args, self.port)
+ if attach_pid:
+ commandline += " --attach=%d" % attach_pid
+
+ # start the server
+ server = pexpect.spawn(commandline)
+
+ # Turn on logging for what the child sends back.
+ if self.TraceOn():
+ server.logfile_read = sys.stdout
+
+ # Schedule debug monitor to be shut down during teardown.
+ logger = self.logger
+ def shutdown_debug_monitor():
+ try:
+ server.close()
+ except:
+ logger.warning("failed to close pexpect server for debug monitor: {}; ignoring".format(sys.exc_info()[0]))
+
+ self.addTearDownHook(shutdown_debug_monitor)
+
+ # Wait until we receive the server ready message before continuing.
+ server.expect_exact('Listening to port {} for a connection from localhost'.format(self.port))
+
+ # Create a socket to talk to the server
+ self.sock = self.create_socket()
+
+ return server
+
+ def launch_process_for_attach(self,inferior_args=None, sleep_seconds=3):
+ # We're going to start a child process that the debug monitor stub can later attach to.
+ # This process needs to be started so that it just hangs around for a while. We'll
+ # have it sleep.
+ exe_path = os.path.abspath("a.out")
+
+ args = [exe_path]
+ if inferior_args:
+ args.extend(inferior_args)
+ if sleep_seconds:
+ args.append("sleep:%d" % sleep_seconds)
+
+ return subprocess.Popen(args)
+
+ def prep_debug_monitor_and_inferior(self, inferior_args=None, inferior_sleep_seconds=3):
+ """Prep the debug monitor, the inferior, and the expected packet stream.
+
+ Handle the separate cases of using the debug monitor in attach-to-inferior mode
+ and in launch-inferior mode.
+
+ For attach-to-inferior mode, the inferior process is first started, then
+ the debug monitor is started in attach to pid mode (using --attach on the
+ stub command line), and the no-ack-mode setup is appended to the packet
+ stream. The packet stream is not yet executed, ready to have more expected
+ packet entries added to it.
+
+ For launch-inferior mode, the stub is first started, then no ack mode is
+ setup on the expected packet stream, then the verified launch packets are added
+ to the expected socket stream. The packet stream is not yet executed, ready
+ to have more expected packet entries added to it.
+
+ The return value is:
+ {inferior:<inferior>, server:<server>}
+ """
+ inferior = None
+ attach_pid = None
+
+ if self._inferior_startup == self._STARTUP_ATTACH:
+ # Launch the process that we'll use as the inferior.
+ inferior = self.launch_process_for_attach(inferior_args=inferior_args, sleep_seconds=inferior_sleep_seconds)
+ self.assertIsNotNone(inferior)
+ self.assertTrue(inferior.pid > 0)
+ attach_pid = inferior.pid
+
+ # Launch the debug monitor stub, attaching to the inferior.
+ server = self.start_server(attach_pid=attach_pid)
+ self.assertIsNotNone(server)
+
+ if self._inferior_startup == self._STARTUP_LAUNCH:
+ # Build launch args
+ launch_args = [os.path.abspath('a.out')]
+ if inferior_args:
+ launch_args.extend(inferior_args)
+
+ # Build the expected protocol stream
+ self.add_no_ack_remote_stream()
+ if self._inferior_startup == self._STARTUP_LAUNCH:
+ self.add_verified_launch_packets(launch_args)
+
+ return {"inferior":inferior, "server":server}
+
+ def add_no_ack_remote_stream(self):
+ self.test_sequence.add_log_lines(
+ ["read packet: +",
+ "read packet: $QStartNoAckMode#b0",
+ "send packet: +",
+ "send packet: $OK#9a",
+ "read packet: +"],
+ True)
+
+ def add_verified_launch_packets(self, launch_args):
+ self.test_sequence.add_log_lines(
+ ["read packet: %s" % build_gdbremote_A_packet(launch_args),
+ "send packet: $OK#00",
+ "read packet: $qLaunchSuccess#a5",
+ "send packet: $OK#00"],
+ True)
+
+ def add_thread_suffix_request_packets(self):
+ self.test_sequence.add_log_lines(
+ ["read packet: $QThreadSuffixSupported#00",
+ "send packet: $OK#00",
+ ], True)
+
+ def add_process_info_collection_packets(self):
+ self.test_sequence.add_log_lines(
+ ["read packet: $qProcessInfo#00",
+ { "direction":"send", "regex":r"^\$(.+)#00", "capture":{1:"process_info_raw"} }],
+ True)
+
+ _KNOWN_PROCESS_INFO_KEYS = [
+ "pid",
+ "parent-pid",
+ "real-uid",
+ "real-gid",
+ "effective-uid",
+ "effective-gid",
+ "cputype",
+ "cpusubtype",
+ "ostype",
+ "vendor",
+ "endian",
+ "ptrsize"
+ ]
+
+ def parse_process_info_response(self, context):
+ # Ensure we have a process info response.
+ self.assertIsNotNone(context)
+ process_info_raw = context.get("process_info_raw")
+ self.assertIsNotNone(process_info_raw)
+
+ # Pull out key:value; pairs.
+ process_info_dict = { match.group(1):match.group(2) for match in re.finditer(r"([^:]+):([^;]+);", process_info_raw) }
+
+ # Validate keys are known.
+ for (key, val) in process_info_dict.items():
+ self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
+ self.assertIsNotNone(val)
+
+ return process_info_dict
+
+ def add_register_info_collection_packets(self):
+ self.test_sequence.add_log_lines(
+ [ { "type":"multi_response", "query":"qRegisterInfo", "append_iteration_suffix":True,
+ "end_regex":re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
+ "save_key":"reg_info_responses" } ],
+ True)
+
+ def parse_register_info_packets(self, context):
+ """Return an array of register info dictionaries, one per register info."""
+ reg_info_responses = context.get("reg_info_responses")
+ self.assertIsNotNone(reg_info_responses)
+
+ # Parse register infos.
+ return [parse_reg_info_response(reg_info_response) for reg_info_response in reg_info_responses]
+
+ def expect_gdbremote_sequence(self):
+ return expect_lldb_gdbserver_replay(self, self.sock, self.test_sequence, self._TIMEOUT_SECONDS, self.logger)
+
+ _KNOWN_REGINFO_KEYS = [
+ "name",
+ "alt-name",
+ "bitsize",
+ "offset",
+ "encoding",
+ "format",
+ "set",
+ "gcc",
+ "dwarf",
+ "generic",
+ "container-regs",
+ "invalidate-regs"
+ ]
+
+ def assert_valid_reg_info(self, reg_info):
+ # Assert we know about all the reginfo keys parsed.
+ for key in reg_info:
+ self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
+
+ # Check the bare-minimum expected set of register info keys.
+ self.assertTrue("name" in reg_info)
+ self.assertTrue("bitsize" in reg_info)
+ self.assertTrue("offset" in reg_info)
+ self.assertTrue("encoding" in reg_info)
+ self.assertTrue("format" in reg_info)
+
+ def find_pc_reg_info(self, reg_infos):
+ lldb_reg_index = 0
+ for reg_info in reg_infos:
+ if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
+ return (lldb_reg_index, reg_info)
+ lldb_reg_index += 1
+
+ return (None, None)
+
+ def add_lldb_register_index(self, reg_infos):
+ """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
+
+ We'll use this when we want to call packets like P/p with a register index but do so
+ on only a subset of the full register info set.
+ """
+ self.assertIsNotNone(reg_infos)
+
+ reg_index = 0
+ for reg_info in reg_infos:
+ reg_info["lldb_register_index"] = reg_index
+ reg_index += 1
+
+ def add_query_memory_region_packets(self, address):
+ self.test_sequence.add_log_lines(
+ ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
+ {"direction":"send", "regex":r"^\$(.+)#[0-9a-fA-F]{2}$", "capture":{1:"memory_region_response"} }],
+ True)
+
+ def parse_key_val_dict(self, key_val_text):
+ self.assertIsNotNone(key_val_text)
+ kv_dict = {}
+ for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
+ kv_dict[match.group(1)] = match.group(2)
+ return kv_dict
+
+ def parse_memory_region_packet(self, context):
+ # Ensure we have a context.
+ self.assertIsNotNone(context.get("memory_region_response"))
+
+ # Pull out key:value; pairs.
+ mem_region_dict = self.parse_key_val_dict(context.get("memory_region_response"))
+
+ # Validate keys are known.
+ for (key, val) in mem_region_dict.items():
+ self.assertTrue(key in ["start", "size", "permissions", "error"])
+ self.assertIsNotNone(val)
+
+ # Return the dictionary of key-value pairs for the memory region.
+ return mem_region_dict
+
+ def assert_address_within_memory_region(self, test_address, mem_region_dict):
+ self.assertIsNotNone(mem_region_dict)
+ self.assertTrue("start" in mem_region_dict)
+ self.assertTrue("size" in mem_region_dict)
+
+ range_start = int(mem_region_dict["start"], 16)
+ range_size = int(mem_region_dict["size"], 16)
+ range_end = range_start + range_size
+
+ if test_address < range_start:
+ self.fail("address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+ elif test_address >= range_end:
+ self.fail("address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(test_address, range_start, range_end, range_size))
+
+ def add_threadinfo_collection_packets(self):
+ self.test_sequence.add_log_lines(
+ [ { "type":"multi_response", "first_query":"qfThreadInfo", "next_query":"qsThreadInfo",
+ "append_iteration_suffix":False, "end_regex":re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
+ "save_key":"threadinfo_responses" } ],
+ True)
+
+ def parse_threadinfo_packets(self, context):
+ """Return an array of thread ids (decimal ints), one per thread."""
+ threadinfo_responses = context.get("threadinfo_responses")
+ self.assertIsNotNone(threadinfo_responses)
+
+ thread_ids = []
+ for threadinfo_response in threadinfo_responses:
+ new_thread_infos = parse_threadinfo_response(threadinfo_response)
+ thread_ids.extend(new_thread_infos)
+ return thread_ids
+
+ def wait_for_thread_count(self, thread_count, timeout_seconds=3):
+ start_time = time.time()
+ timeout_time = start_time + timeout_seconds
+
+ actual_thread_count = 0
+ while actual_thread_count < thread_count:
+ self.reset_test_sequence()
+ self.add_threadinfo_collection_packets()
+
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ threads = self.parse_threadinfo_packets(context)
+ self.assertIsNotNone(threads)
+
+ actual_thread_count = len(threads)
+
+ if time.time() > timeout_time:
+ raise Exception(
+ 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
+ timeout_seconds, thread_count, actual_thread_count))
+
+ return threads
+
+ def add_set_breakpoint_packets(self, address, do_continue=True, breakpoint_kind=1):
+ self.test_sequence.add_log_lines(
+ [# Set the breakpoint.
+ "read packet: $Z0,{0:x},{1}#00".format(address, breakpoint_kind),
+ # Verify the stub could set it.
+ "send packet: $OK#00",
+ ], True)
+
+ if (do_continue):
+ self.test_sequence.add_log_lines(
+ [# Continue the inferior.
+ "read packet: $c#00",
+ # Expect a breakpoint stop report.
+ {"direction":"send", "regex":r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture":{1:"stop_signo", 2:"stop_thread_id"} },
+ ], True)
+
+ def add_remove_breakpoint_packets(self, address, breakpoint_kind=1):
+ self.test_sequence.add_log_lines(
+ [# Remove the breakpoint.
+ "read packet: $z0,{0:x},{1}#00".format(address, breakpoint_kind),
+ # Verify the stub could unset it.
+ "send packet: $OK#00",
+ ], True)
+
+ def add_qSupported_packets(self):
+ self.test_sequence.add_log_lines(
+ ["read packet: $qSupported#00",
+ {"direction":"send", "regex":r"^\$(.*)#[0-9a-fA-F]{2}", "capture":{1: "qSupported_response"}},
+ ], True)
+
+ _KNOWN_QSUPPORTED_STUB_FEATURES = [
+ "augmented-libraries-svr4-read",
+ "PacketSize",
+ "QStartNoAckMode",
+ "qXfer:auxv:read",
+ "qXfer:libraries:read",
+ "qXfer:libraries-svr4:read",
+ ]
+
+ def parse_qSupported_response(self, context):
+ self.assertIsNotNone(context)
+
+ raw_response = context.get("qSupported_response")
+ self.assertIsNotNone(raw_response)
+
+ # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the
+ # +,-,? is stripped from the key and set as the value.
+ supported_dict = {}
+ for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
+ key = match.group(1)
+ val = match.group(3)
+
+ # key=val: store as is
+ if val and len(val) > 0:
+ supported_dict[key] = val
+ else:
+ if len(key) < 2:
+ raise Exception("singular stub feature is too short: must be stub_feature{+,-,?}")
+ supported_type = key[-1]
+ key = key[:-1]
+ if not supported_type in ["+", "-", "?"]:
+ raise Exception("malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
+ supported_dict[key] = supported_type
+ # Ensure we know the supported element
+ if not key in self._KNOWN_QSUPPORTED_STUB_FEATURES:
+ raise Exception("unknown qSupported stub feature reported: %s" % key)
+
+ return supported_dict
+
+ def run_process_then_stop(self, run_seconds=1):
+ # Tell the stub to continue.
+ self.test_sequence.add_log_lines(
+ ["read packet: $vCont;c#00"],
+ True)
+ context = self.expect_gdbremote_sequence()
+
+ # Wait for run_seconds.
+ time.sleep(run_seconds)
+
+ # Send an interrupt, capture a T response.
+ self.reset_test_sequence()
+ self.test_sequence.add_log_lines(
+ ["read packet: {}".format(chr(03)),
+ {"direction":"send", "regex":r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture":{1:"stop_result"} }],
+ True)
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+ self.assertIsNotNone(context.get("stop_result"))
+
+ return context
+
+ def select_modifiable_register(self, reg_infos):
+ """Find a register that can be read/written freely."""
+ PREFERRED_REGISTER_NAMES = sets.Set(["rax",])
+
+ # First check for the first register from the preferred register name set.
+ alternative_register_index = None
+
+ self.assertIsNotNone(reg_infos)
+ for reg_info in reg_infos:
+ if ("name" in reg_info) and (reg_info["name"] in PREFERRED_REGISTER_NAMES):
+ # We found a preferred register. Use it.
+ return reg_info["lldb_register_index"]
+ if ("generic" in reg_info) and (reg_info["generic"] == "fp"):
+ # A frame pointer register will do as a register to modify temporarily.
+ alternative_register_index = reg_info["lldb_register_index"]
+
+ # We didn't find a preferred register. Return whatever alternative register
+ # we found, if any.
+ return alternative_register_index
+
+ def extract_registers_from_stop_notification(self, stop_key_vals_text):
+ self.assertIsNotNone(stop_key_vals_text)
+ kv_dict = self.parse_key_val_dict(stop_key_vals_text)
+
+ registers = {}
+ for (key, val) in kv_dict.items():
+ if re.match(r"^[0-9a-fA-F]+", key):
+ registers[int(key, 16)] = val
+ return registers
+
+ def gather_register_infos(self):
+ self.reset_test_sequence()
+ self.add_register_info_collection_packets()
+
+ context = self.expect_gdbremote_sequence()
+ self.assertIsNotNone(context)
+
+ reg_infos = self.parse_register_info_packets(context)
+ self.assertIsNotNone(reg_infos)
+ self.add_lldb_register_index(reg_infos)
+
+ return reg_infos
+
+ def find_generic_register_with_name(self, reg_infos, generic_name):
+ self.assertIsNotNone(reg_infos)
+ for reg_info in reg_infos:
+ if ("generic" in reg_info) and (reg_info["generic"] == generic_name):
+ return reg_info
+ return None
+
+
\ No newline at end of file
More information about the lldb-commits
mailing list