[Lldb-commits] [lldb] r122380 - /lldb/trunk/test/python_api/event/TestEvents.py
Johnny Chen
johnny.chen at apple.com
Tue Dec 21 16:32:54 PST 2010
Author: johnny
Date: Tue Dec 21 18:32:54 2010
New Revision: 122380
URL: http://llvm.org/viewvc/llvm-project?rev=122380&view=rev
Log:
Add test cases for registering a listener object with the broadcaster of a process
and waiting for two expected state changed events to arrive: "running" followed by
"stopped".
Modified:
lldb/trunk/test/python_api/event/TestEvents.py
Modified: lldb/trunk/test/python_api/event/TestEvents.py
URL: http://llvm.org/viewvc/llvm-project/lldb/trunk/test/python_api/event/TestEvents.py?rev=122380&r1=122379&r2=122380&view=diff
==============================================================================
--- lldb/trunk/test/python_api/event/TestEvents.py (original)
+++ lldb/trunk/test/python_api/event/TestEvents.py Tue Dec 21 18:32:54 2010
@@ -15,16 +15,29 @@
@unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
@python_api_test
def test_wait_for_event_with_dsym(self):
- """Exercise SBListener.WaitForEvent() APIs."""
+ """Exercise SBListener.WaitForEvent() API."""
self.buildDsym()
self.do_wait_for_event()
@python_api_test
def test_wait_for_event_with_dwarf(self):
- """Exercise SBListener.WaitForEvent() APIs."""
+ """Exercise SBListener.WaitForEvent() API."""
self.buildDwarf()
self.do_wait_for_event()
+ @unittest2.skipUnless(sys.platform.startswith("darwin"), "requires Darwin")
+ @python_api_test
+ def test_broadcast_event_with_dsym(self):
+ """Exercise SBBroadcaster.BroadcastEvent() API."""
+ self.buildDsym()
+ self.do_broadcast_event()
+
+ @python_api_test
+ def test_broadcast_event_with_dwarf(self):
+ """Exercise SBBroadcaster.BroadcastEvent() API."""
+ self.buildDwarf()
+ self.do_broadcast_event()
+
def setUp(self):
# Call super's setUp().
TestBase.setUp(self)
@@ -67,39 +80,136 @@
import threading
class MyListeningThread(threading.Thread):
def run(self):
- print "Running MyListeningThread:", self
+ #print "Running MyListeningThread:", self
count = 0
# Let's only try at most 3 times to retrieve any kind of event.
while not count > 3:
if listener.WaitForEvent(5, event):
- print "Got a valid event:", event
- print "Event type:", event.GetType()
- print "Event broadcaster:", event.GetBroadcaster().GetName()
+ #print "Got a valid event:", event
+ #print "Event type:", event.GetType()
+ #print "Event broadcaster:", event.GetBroadcaster().GetName()
return
count = count + 1
print "Timeout: listener.WaitForEvent"
return
- # Let's start the listening thread before we launch the inferior process.
- my_thread = MyListeningThread()
- my_thread.start()
-
- # Set the debugger to be in asynchronous mode since our listening thread
- # is waiting for events to come.
- self.dbg.SetAsync(True)
-
# Use Python API to kill the process. The listening thread should be
# able to receive a state changed event.
self.process.Kill()
+ # Let's start the listening thread to retrieve the event.
+ my_thread = MyListeningThread()
+ my_thread.start()
+
# Wait until the 'MyListeningThread' terminates.
my_thread.join()
- # Restore the original synchronous mode.
- self.dbg.SetAsync(False)
+ self.assertTrue(event.IsValid(),
+ "My listening thread successfully received an event")
+
+ def do_broadcast_event(self):
+ """Get the broadcaster associated with the process and exercise BroadcastEvent API."""
+ exe = os.path.join(os.getcwd(), "a.out")
+
+ # Create a target by the debugger.
+ target = self.dbg.CreateTarget(exe)
+ self.assertTrue(target.IsValid(), VALID_TARGET)
+
+ # Now create a breakpoint on main.c by name 'c'.
+ breakpoint = target.BreakpointCreateByName('c', 'a.out')
+ #print "breakpoint:", breakpoint
+ self.assertTrue(breakpoint.IsValid() and
+ breakpoint.GetNumLocations() == 1,
+ VALID_BREAKPOINT)
+
+ # Now launch the process, and do not stop at the entry point.
+ self.process = target.LaunchProcess([''], [''], os.ctermid(), 0, False)
+
+ self.process = target.GetProcess()
+ self.assertTrue(self.process.GetState() == lldb.eStateStopped,
+ PROCESS_STOPPED)
+
+ # Get a handle on the process's broadcaster.
+ broadcaster = self.process.GetBroadcaster()
+ self.assertTrue(broadcaster.IsValid(), "Process with valid broadcaster")
+
+ # Create an empty event object.
+ event = lldb.SBEvent()
+ self.assertFalse(event.IsValid(), "Event should not be valid initially")
+
+ # Create a listener object and register with the broadcaster.
+ listener = lldb.SBListener("TestEvents.listener")
+ rc = broadcaster.AddListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
+ self.assertTrue(rc, "AddListener successfully retruns")
+
+ # The finite state machine for our custom listening thread, with an
+ # initail state of 0, which means a "running" event is expected.
+ # It changes to 1 after "running" is received.
+ # It cahnges to 2 after "stopped" is received.
+ # 2 will be our final state and the test is complete.
+ self.state = 0
+
+ # Create MyListeningThread to wait for state changed events.
+ # By design, a "running" event is expected following by a "stopped" event.
+ import threading
+ class MyListeningThread(threading.Thread):
+ def run(self):
+ #print "Running MyListeningThread:", self
+
+ # Regular expression pattern for the event description.
+ pattern = re.compile("data = {.*, state = (.*)}$")
+
+ # Let's only try at most 6 times to retrieve our events.
+ count = 0
+ while True:
+ if listener.WaitForEvent(5, event):
+ stream = lldb.SBStream()
+ event.GetDescription(stream)
+ description = stream.GetData()
+ #print "Event data flavor:", event.GetDataFlavor()
+ #print "Event description:", description
+ match = pattern.search(description)
+ if not match:
+ break;
+ if self.context.state == 0 and match.group(1) == 'running':
+ self.context.state = 1
+ continue
+ elif self.context.state == 1 and match.group(1) == 'stopped':
+ # Whoopee, both events have been received!
+ self.context.state = 2
+ break
+ else:
+ break
+ print "Timeout: listener.WaitForEvent"
+ count = count + 1
+ if count > 6:
+ break
+
+ return
+
+ # Use Python API to continue the process. The listening thread should be
+ # able to receive the state changed events.
+ self.process.Continue()
+
+ # Start the listening thread to receive the "running" followed by the
+ # "stopped" events.
+ my_thread = MyListeningThread()
+ # Supply the enclosing context so that our listening thread can access
+ # the 'state' variable.
+ my_thread.context = self
+ my_thread.start()
+
+ # Wait until the 'MyListeningThread' terminates.
+ my_thread.join()
- self.assertTrue(event.IsValid())
+ # We are no longer interested in receiving state changed events.
+ # Remove our custom listener before the inferior is killed.
+ broadcaster.RemoveListener(listener, lldb.SBProcess.eBroadcastBitStateChanged)
+
+ # The final judgement. :-)
+ self.assertTrue(self.state == 2,
+ "Both expected state changed events received")
if __name__ == '__main__':
More information about the lldb-commits
mailing list