[Lldb-commits] [lldb] [lldb-mcp] Backporting lldb-mcp using python. (PR #160619)
John Harrison via lldb-commits
lldb-commits at lists.llvm.org
Thu Sep 25 08:25:05 PDT 2025
https://github.com/ashgti updated https://github.com/llvm/llvm-project/pull/160619
>From c5311e20b9c22194e02330494f73df724859571f Mon Sep 17 00:00:00 2001
From: John Harrison <harjohn at google.com>
Date: Wed, 24 Sep 2025 17:06:29 -0700
Subject: [PATCH 1/3] [lldb-mcp] Backporting lldb-mcp using python.
This adds a backport of lldb-mcp using some helper python scripts.
This registeres a new command for running an lldb-mcp server and includes a helper binary for launching and connecting to lldb.
This can be used for older releases of lldb, for example I was able to use this with lldb included in Xcode 16.3, Xcode 16.4 and Xcode 26.0 (RC).
Using this helper, I am able to utilize gemini-cli for running lldb commands and this should work with other MCP clients.
---
lldb/examples/mcp/README.md | 26 +++
lldb/examples/mcp/lldb-mcp | 12 ++
lldb/examples/mcp/lldb-mcp.py | 162 ++++++++++++++++
lldb/examples/mcp/protocol.py | 218 +++++++++++++++++++++
lldb/examples/mcp/server.py | 278 +++++++++++++++++++++++++++
lldb/examples/mcp/transport.py | 337 +++++++++++++++++++++++++++++++++
6 files changed, 1033 insertions(+)
create mode 100644 lldb/examples/mcp/README.md
create mode 100755 lldb/examples/mcp/lldb-mcp
create mode 100644 lldb/examples/mcp/lldb-mcp.py
create mode 100644 lldb/examples/mcp/protocol.py
create mode 100644 lldb/examples/mcp/server.py
create mode 100644 lldb/examples/mcp/transport.py
diff --git a/lldb/examples/mcp/README.md b/lldb/examples/mcp/README.md
new file mode 100644
index 0000000000000..7296fc75db3b4
--- /dev/null
+++ b/lldb/examples/mcp/README.md
@@ -0,0 +1,26 @@
+# lldb-mcp backport
+
+A backport of the lldb-mcp protocol for older releases of lldb.
+
+To load the backport use:
+
+```
+(lldb) command script import --allow-reload server.py
+(lldb) start_mcp
+```
+
+Then you can use the `lldb-mcp` script in this directory to launch a client for
+the running server.
+
+For example,
+
+```json
+{
+ "mcpServers": {
+ "lldb": {
+ "command": "<path>/lldb-mcp",
+ "args": ["--log-file=/tmp/lldb-mcp.log", "--timeout=30.0"]
+ }
+ }
+}
+```
diff --git a/lldb/examples/mcp/lldb-mcp b/lldb/examples/mcp/lldb-mcp
new file mode 100755
index 0000000000000..d91b48be6092a
--- /dev/null
+++ b/lldb/examples/mcp/lldb-mcp
@@ -0,0 +1,12 @@
+#!/bin/sh
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )"
+
+PYTHONPATH="$(lldb -P)"
+export PYTHONPATH
+
+if [ "$(uname)" == "Darwin" ]; then
+ exec xcrun python3 $SCRIPT_DIR/lldb-mcp.py $@
+else
+ exec python3 $SCRIPT_DIR/lldb-mcp.py $@
+fi
diff --git a/lldb/examples/mcp/lldb-mcp.py b/lldb/examples/mcp/lldb-mcp.py
new file mode 100644
index 0000000000000..808155b179858
--- /dev/null
+++ b/lldb/examples/mcp/lldb-mcp.py
@@ -0,0 +1,162 @@
+import atexit
+import logging
+import argparse
+import pathlib
+import asyncio
+import os
+import sys
+import signal
+import transport
+import protocol
+from typing import Optional
+
+logger = logging.getLogger("lldb-mcp")
+
+
+class MCPClient(transport.MessageHandler):
+ initialize = protocol.initialize.invoker
+ initialized = protocol.initialized.invoker
+ toolsList = protocol.toolsList.invoker
+ toolsCall = protocol.toolsCall.invoker
+
+
+def parse(uri: str) -> tuple[str, int]:
+ assert uri.startswith("connection://")
+ uri = uri.removeprefix("connection://")
+ host, port = uri.rsplit(":", maxsplit=1)
+ if host != "[::1]":
+ host = host.removeprefix("[").removesuffix("]")
+ return (host, int(port))
+
+
+async def test_client(uri: str):
+ host, port = parse(uri)
+ print("connecting to", host, port)
+ reader, writer = await asyncio.open_connection(host, int(port))
+ with transport.Transport(reader, writer) as conn:
+ async with MCPClient(conn) as client:
+ _ = await client.initialize()
+ client.initialized()
+
+ tools_list_result = await client.toolsList()
+ for tool in tools_list_result["tools"]:
+ print("tool", tool)
+
+ await client.toolsCall(
+ name="command",
+ arguments={
+ "command": "bt",
+ "debugger": "lldb://debugger/1",
+ },
+ )
+ await client.toolsCall(
+ name="debugger_list",
+ arguments=None,
+ )
+
+
+async def launchLLDB(log_file: Optional[str] = None):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ server_script = os.path.join(dir, "server.py")
+ args = [
+ "lldb",
+ "-O",
+ f"command script import --allow-reload {server_script}",
+ "-O",
+ "start_mcp" + " --log-file=" + str(log_file) if log_file else "",
+ ]
+ process = await asyncio.subprocess.create_subprocess_exec(
+ *args,
+ stdout=asyncio.subprocess.DEVNULL,
+ stderr=asyncio.subprocess.DEVNULL,
+ )
+
+ def shutdown():
+ try:
+ if process.returncode is None:
+ process.send_signal(signal.SIGHUP)
+ os.waitpid(process.pid, 0)
+ except:
+ pass
+
+ atexit.register(shutdown)
+
+
+async def main() -> None:
+ parser = argparse.ArgumentParser("lldb-mcp")
+ parser.add_argument("-l", "--log-file", type=pathlib.Path)
+ parser.add_argument("-t", "--timeout", type=float, default=30.0)
+ parser.add_argument("--test", action="store_true")
+ opts = parser.parse_args()
+ if opts.log_file or opts.test:
+ logging.basicConfig(
+ filename=opts.log_file,
+ format="%(created)f:%(process)d:%(levelname)s:%(name)s:%(message)s",
+ level=logging.DEBUG,
+ )
+ logger.info("Loading lldb-mcp server configurations...")
+ loop = asyncio.get_running_loop()
+
+ launched = False
+ deadline: float = loop.time() + opts.timeout
+ servers: list[protocol.ServerInfo] = []
+ while not servers and loop.time() < deadline:
+ logger.info("loading host server details")
+ servers = protocol.load()
+
+ if not servers and not launched:
+ launched = True
+ logger.info("Starting lldb with server loaded...")
+ await launchLLDB(log_file=opts.log_file)
+ continue
+
+ if not servers:
+ logger.info("Waiting for server to start...")
+ await asyncio.sleep(1.0)
+ continue
+
+ if len(servers) != 1:
+ logger.error("to many lldb-mcp servers detected, exiting...")
+ sys.exit(
+ "Multiple servers detected, selecting a single server is not yet supported."
+ )
+
+ break
+
+ assert servers
+
+ if opts.test:
+ for server in servers:
+ await test_client(server["connection_uri"])
+ return
+
+ logger.info("Forwarding stdio to first server %r", servers[0])
+ try:
+ server_info = servers[0]
+ host, port = parse(server_info["connection_uri"])
+ cr, cw = await asyncio.open_connection(host, port)
+ loop = asyncio.get_event_loop()
+
+ def forward():
+ buf = sys.stdin.buffer.read(4096)
+ if not buf: # eof detected
+ cr.feed_eof()
+ loop.remove_reader(sys.stdin)
+ return
+ logger.info("--> %s", buf.decode().strip())
+ cw.write(buf)
+
+ os.set_blocking(sys.stdin.fileno(), False)
+ loop.add_reader(sys.stdin, forward)
+ async for f in cr:
+ logger.info("<-- %s", f.decode().strip())
+ sys.stdout.buffer.write(f)
+ sys.stdout.buffer.flush()
+ except:
+ logger.exception("forwarding client failed")
+ finally:
+ logger.info("lldb-mcp client shut down")
+
+
+if __name__ == "__main__":
+ asyncio.run(main())
diff --git a/lldb/examples/mcp/protocol.py b/lldb/examples/mcp/protocol.py
new file mode 100644
index 0000000000000..067b03af7221b
--- /dev/null
+++ b/lldb/examples/mcp/protocol.py
@@ -0,0 +1,218 @@
+import os
+import io
+import sys
+import json
+import logging
+import ctypes
+import ctypes.util
+from typing import TypedDict, Any, Literal, Optional
+from transport import RequestDescriptor, EventDescriptor
+
+logger = logging.getLogger(__name__)
+
+PROC_PIDPATHINFO_MAXSIZE = 4 * 1024
+
+
+def _is_valid_lldb_process(pid: int) -> bool:
+ logger.info("checking if process %d is alive and is an lldb process", pid)
+ try:
+ # raises ProcessLookupError if pid does not exist.
+ os.kill(pid, 0)
+ if sys.platform == "darwin":
+ libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+ assert libc
+ proc_pidpath = libc.proc_pidpath
+ proc_pidpath.restype = ctypes.c_int
+ proc_pidpath.argtypes = [ctypes.c_int, ctypes.c_void_p, ctypes.c_uint32]
+ buf = ctypes.create_string_buffer(PROC_PIDPATHINFO_MAXSIZE)
+ if proc_pidpath(pid, buf, PROC_PIDPATHINFO_MAXSIZE) <= 0:
+ raise OSError(ctypes.get_errno())
+ path = bytes(buf.value).decode()
+ logger.info("path=%r", path)
+ if "lldb" not in os.path.basename(path):
+ logger.info("pid %d is invalid", pid)
+ return False
+ logger.info("pid %d is valid", pid)
+ return True
+ except ProcessLookupError:
+ logger.info("pid %d is not alive", pid)
+ return False
+ except:
+ logger.exception("failed to validate pid %d", pid)
+ return False
+
+
+class ServerInfo(TypedDict):
+ connection_uri: str
+
+
+def load() -> list[ServerInfo]:
+ dir = os.path.expanduser("~/.lldb")
+ contents = os.listdir(dir)
+ server_infos = []
+ for file in contents:
+ if not file.startswith("lldb-mcp-") or not file.endswith(".json"):
+ continue
+
+ filename = os.path.join(dir, file)
+ pid = int(file.removeprefix("lldb-mcp-").removesuffix(".json"))
+ if not _is_valid_lldb_process(pid):
+ # Process is dead, clean up the stale file.
+ os.remove(filename)
+ continue
+
+ with open(filename) as f:
+ server_infos.append(json.load(f))
+ return server_infos
+
+
+def cleanup():
+ server_info_config = os.path.expanduser(f"~/.lldb/lldb-mcp-{os.getpid()}.json")
+ if os.path.exists(server_info_config):
+ os.remove(server_info_config)
+
+
+def save(uri: str):
+ server_info: ServerInfo = {"connection_uri": uri}
+ with open(os.path.expanduser(f"~/.lldb/lldb-mcp-{os.getpid()}.json"), "w+") as f:
+ json.dump(server_info, f)
+
+
+class URI:
+ scheme: str
+ host: Optional[str]
+ port: Optional[int]
+ path: str
+
+ def __init__(
+ self,
+ *,
+ scheme="",
+ host: Optional[str] = None,
+ port: Optional[int] = None,
+ path="",
+ ):
+ self.scheme = scheme
+ self.host = host
+ self.port = port
+ self.path = path
+
+ @classmethod
+ def parse(cls, input: str) -> "URI":
+ assert ":" in input
+ uri = URI()
+ uri.scheme, rest = input.split(":", maxsplit=1)
+ assert uri.scheme.isascii()
+ if rest.startswith("//"):
+ rest = rest.removeprefix("//")
+ if "/" in rest:
+ uri.host, rest = rest.split("/", maxsplit=1)
+ else:
+ uri.host = rest
+ rest = ""
+ uri.path = rest
+ if uri.host is not None and ":" in uri.host:
+ uri.host, raw_port = uri.host.rsplit(":", maxsplit=1)
+ assert raw_port.isdigit()
+ uri.port = int(raw_port)
+ return uri
+
+ def append(self, path: str) -> "URI":
+ return URI(
+ scheme=self.scheme,
+ host=self.host,
+ port=self.port,
+ path=os.path.join(self.path, path),
+ )
+
+ def __str__(self):
+ os = io.StringIO()
+ os.write(self.scheme)
+ os.write(":")
+ if self.host or self.port:
+ os.write("//")
+ if self.host:
+ os.write(self.host)
+ if self.port:
+ os.write(":")
+ os.write(self.port)
+ if self.path and self.path != "/":
+ os.write(self.path)
+ return os.getvalue()
+
+
+class ImplementationVersion(TypedDict):
+ name: str
+ version: str
+
+
+class Tool(TypedDict):
+ name: str
+ title: str
+ description: str
+ inputSchema: dict
+
+
+class Resource(TypedDict):
+ uri: str
+ name: str
+
+
+class ListToolsResult(TypedDict):
+ tools: list[Tool]
+
+
+class CallToolParams(TypedDict):
+ name: str
+ arguments: Any
+
+
+class TextContent(TypedDict):
+ type: Literal["text"]
+ text: str
+
+
+class CallToolResult(TypedDict):
+ content: list[TextContent]
+ isError: bool
+
+
+class ComponentCapabilities(TypedDict, total=False):
+ listChanged: bool
+ subscribe: bool
+
+
+class ServerCapabilities(TypedDict):
+ tools: ComponentCapabilities
+
+
+class InitializeParams(TypedDict):
+ capabilities: dict
+ clientInfo: ImplementationVersion
+ protocolVersion: str
+
+
+class InitializeResult(TypedDict):
+ capabilities: ServerCapabilities
+ protocolVersion: str
+ serverInfo: ImplementationVersion
+
+
+initialize = RequestDescriptor[InitializeParams, InitializeResult](
+ "initialize",
+ defaults={
+ "protocolVersion": "2024-11-05",
+ "clientInfo": {
+ "name": "lldb-mcp",
+ "version": "0.0.1",
+ },
+ "capabilities": {
+ "roots": {"listChanged": True},
+ "sampling": {},
+ "elicitation": {},
+ },
+ },
+)
+initialized = EventDescriptor[None](name="initialized")
+toolsList = RequestDescriptor[None, ListToolsResult](name="tools/list")
+toolsCall = RequestDescriptor[CallToolParams, CallToolResult](name="tools/call")
diff --git a/lldb/examples/mcp/server.py b/lldb/examples/mcp/server.py
new file mode 100644
index 0000000000000..59d55ac10ec9f
--- /dev/null
+++ b/lldb/examples/mcp/server.py
@@ -0,0 +1,278 @@
+"""
+An implementation of the lldb-mcp server.
+"""
+
+from typing import Any, Optional
+import argparse
+import asyncio
+import lldb
+import logging
+import protocol
+import queue
+import pathlib
+import traceback
+import shlex
+import threading
+import transport
+
+logger = logging.getLogger(__name__)
+
+
+SCHEME = "lldb-mcp"
+DEBUGGER_HOST = "debugger"
+BASE_DEBUGGER_URI = protocol.URI(scheme=SCHEME, host=DEBUGGER_HOST, path="/")
+
+
+class Tool:
+ name: str
+ title: str
+ description: str
+ inputSchema: dict
+
+ def to_protocol(self) -> protocol.Tool:
+ return {
+ "name": self.name,
+ "title": self.title,
+ "description": self.description,
+ "inputSchema": self.inputSchema,
+ }
+
+ async def call(self, **kwargs) -> protocol.CallToolResult:
+ assert False, "Implement in a subclass."
+
+
+class CommandTool(Tool):
+ name = "command"
+ title = "LLDB Command"
+ description = "Evaluates an lldb command."
+ inputSchema = {
+ "type": "object",
+ "properties": {
+ "debugger": {
+ "type": "string",
+ "description": "The debugger ID or URI to a specific debug session. If not specified, the first debugger will be used.",
+ },
+ "command": {
+ "type": "string",
+ "description": "An lldb command to run.",
+ },
+ },
+ }
+
+ async def call(
+ self, *, command: Optional[str] = None, debugger: Optional[str] = None, **kwargs
+ ) -> protocol.CallToolResult:
+ if debugger:
+ if debugger.isdigit():
+ id = int(debugger)
+ else:
+ logger.info("Parsing %s", debugger)
+ uri = protocol.URI.parse(debugger)
+ logger.info("Parsed URI: %s", uri)
+ assert uri.scheme == SCHEME
+ assert uri.host == DEBUGGER_HOST
+ raw_id = uri.path.removeprefix("/")
+ assert raw_id.isdigit()
+ id = int(raw_id)
+ dbg_inst = lldb.SBDebugger.FindDebuggerWithID(id)
+ else:
+ for i in range(100):
+ dbg_inst = lldb.SBDebugger.FindDebuggerWithID(i)
+ if dbg_inst.IsValid():
+ break
+ assert dbg_inst.IsValid()
+ result = lldb.SBCommandReturnObject()
+ dbg_inst.GetCommandInterpreter().HandleCommand(command, result)
+ contents: list[protocol.TextContent] = []
+ if result.GetOutputSize():
+ contents.append({"type": "text", "text": str(result.GetOutput())})
+ if result.GetErrorSize():
+ contents.append({"type": "text", "text": str(result.GetError())})
+ return {
+ "content": contents,
+ "isError": not result.Succeeded(),
+ }
+
+
+class DebuggerList(Tool):
+ name = "debugger_list"
+ title = "List Debuggers"
+ description = "List debuggers associated with this server."
+ inputSchema = {"type": "object"}
+
+ async def call(self, **_kwargs) -> protocol.CallToolResult:
+ out = ""
+
+ for i in range(100):
+ debugger = lldb.SBDebugger.FindDebuggerWithID(i)
+ if debugger.IsValid():
+ uri = BASE_DEBUGGER_URI.append(str(i))
+ out += f"- {uri}\n"
+
+ return {
+ "content": [
+ {"type": "text", "text": out},
+ ],
+ "isError": False,
+ }
+
+
+class MCPServer(transport.MessageHandler):
+ tools: dict[str, Tool]
+
+ def __init__(
+ self, transport: transport.Transport, tools=[CommandTool(), DebuggerList()]
+ ):
+ super().__init__(transport)
+ self.tools = {tool.name: tool for tool in tools}
+
+ def __del__(self):
+ print("deleting MCPServer....")
+
+ @protocol.initialize.handler()
+ async def initialize(
+ self, **params: protocol.InitializeParams
+ ) -> protocol.InitializeResult:
+ return protocol.InitializeResult(
+ capabilities={"tools": {"listChanged": True}},
+ protocolVersion="2024-11-05",
+ serverInfo={"name": "lldb-mcp", "version": "0.0.1"},
+ )
+
+ @protocol.initialized.handler()
+ def initialized(self):
+ print("Client initialized...")
+
+ @protocol.toolsList.handler()
+ async def listTools(self) -> protocol.ListToolsResult:
+ return {"tools": [tool.to_protocol() for tool in self.tools.values()]}
+
+ @protocol.toolsCall.handler()
+ async def callTool(
+ self, name: str, arguments: Optional[Any] = None
+ ) -> protocol.CallToolResult:
+ tool = self.tools[name]
+ if arguments is None:
+ arguments = {}
+ return await tool.call(**arguments)
+
+
+server: Optional[asyncio.AbstractServer] = None
+
+
+def get_parser():
+ parser = argparse.ArgumentParser("lldb-mcp")
+ parser.add_argument("-l", "--log-file", type=pathlib.Path)
+ parser.add_argument("-t", "--timeout", type=float, default=30.0)
+ parser.add_argument("connection", nargs="?", default="listen://[127.0.0.1]:0")
+ return parser
+
+
+async def run(opts: argparse.Namespace, notify: Optional[queue.Queue] = None):
+ global server
+ conn: str = opts.connection
+ assert conn.startswith("listen://"), "Invalid connection specifier"
+ hostname, port = conn.removeprefix("listen://").split(":")
+ hostname = hostname.removeprefix("[").removesuffix("]")
+
+ logging.basicConfig(filename=opts.log_file, level=logging.DEBUG, force=True)
+
+ server = await asyncio.start_server(MCPServer.acceptClient, hostname, int(port))
+ addrs = ", ".join(str(sock.getsockname()) for sock in server.sockets)
+ if notify:
+ notify.put(addrs)
+ else:
+ print(f"Serving on {addrs}")
+
+ sock_name = server.sockets[0].getsockname()
+ (h, p) = sock_name[0], sock_name[1]
+ protocol.save(f"connection://[{h}]:{p}")
+
+ async with server:
+ await server.serve_forever()
+
+
+# A registration count, if this module is loaded for multiple debugger then we
+# should only stop the global server if all registrations have been removed
+registration_count = 0
+
+
+def stop():
+ """Stop the server, if one exists."""
+ global server
+ protocol.cleanup()
+ if not server:
+ return
+ server.close() # Stop accepting new connections
+ server = None
+
+
+class CommandStart:
+ # The CommandStart is being used to track when the interpreter exits. lldb
+ # does not call `Py_Finalize()`, so `atexit` calls are never invoked. In
+ # order to ensure we shutdown the server and clean up the server info
+ # records we use the `__del__` method to trigger the clean up as a best
+ # effort attempt at a clean shutdown.
+ def __init__(self, debugger, internal_dict):
+ global registration_count
+ registration_count += 1
+
+ def __del__(self):
+ global registration_count
+ registration_count -= 1
+ if registration_count == 0:
+ stop()
+
+ def __call__(self, debugger, command, exe_ctx, result):
+ """Start an MCP server in a background thread."""
+ global server
+
+ if server is not None:
+ print("Server already running.", file=result)
+ return
+
+ command_args = shlex.split(command)
+ opts = get_parser().parse_args(command_args)
+
+ print("Starting LLDB MCP Server...", file=result)
+
+ notify = queue.Queue()
+
+ def start_server():
+ asyncio.run(run(opts, notify))
+
+ thr = threading.Thread(target=start_server)
+ thr.start()
+
+ addrs = notify.get()
+ print(f"Serving on {addrs}", file=result)
+ result.SetStatus(lldb.eReturnStatusSuccessFinishNoResult)
+
+
+def lldb_stop(debugger, command, exe_ctx, result, internal_dict):
+ """Stop an MCP server."""
+ global server
+ try:
+ if server is None:
+ print("Server is stopped.", file=result)
+ result.SetStatus(lldb.eReturnStatusSuccessFinishNoResult)
+ return
+
+ print("Server stopping...", file=result)
+ stop()
+ print("Server stopped.", file=result)
+
+ result.SetStatus(lldb.eReturnStatusSuccessFinishNoResult)
+ except:
+ logging.exception("failed to stop MCP server")
+ traceback.print_exc(file=result)
+ result.SetStatus(lldb.eReturnStatusFailed)
+
+
+def __lldb_init_module(
+ debugger: lldb.SBDebugger,
+ internal_dict: dict[Any, Any],
+) -> None:
+ debugger.HandleCommand("command script add -o -c server.CommandStart start_mcp")
+ debugger.HandleCommand("command script add -o -f server.lldb_stop stop_mcp")
+ print("Registered command 'start_mcp' and 'stop_mcp'.")
diff --git a/lldb/examples/mcp/transport.py b/lldb/examples/mcp/transport.py
new file mode 100644
index 0000000000000..1fdb3f4d494b2
--- /dev/null
+++ b/lldb/examples/mcp/transport.py
@@ -0,0 +1,337 @@
+import asyncio
+import dataclasses
+import enum
+import functools
+import json
+import logging
+import pprint
+import sys
+import traceback
+from typing import (
+ Any,
+ Awaitable,
+ Generic,
+ TypeVar,
+ Callable,
+ Union,
+ Optional,
+)
+
+logger = logging.getLogger(__name__)
+
+
+ at enum.unique
+class MessageType(enum.Enum):
+ REQ = enum.auto()
+ RESP = enum.auto()
+ NOTE = enum.auto()
+
+
+ at dataclasses.dataclass(frozen=True, repr=False)
+class Message:
+ """Wrapper around the JSON payload of a MCP message."""
+
+ payload: dict[str, Any]
+
+ @classmethod
+ def from_dict(cls, payload: dict[str, Any]) -> "Message":
+ # Ensure the jsonrpc field is always set.
+ payload["jsonrpc"] = "2.0"
+ return cls(payload=payload)
+
+ @classmethod
+ def decode(cls, bytes: Union[str, bytes, bytearray]) -> "Message":
+ # Ensure the jsonrpc field is always set.
+ return cls(payload=json.loads(bytes))
+
+ def __str__(self):
+ return json.dumps(self.payload, indent=None, separators=(",", ":"))
+
+ def __repr__(self):
+ return "{}: {}".format(
+ self.message_type.name.title(),
+ pprint.pformat(self.payload, sort_dicts=False),
+ )
+
+ @functools.cached_property
+ def message_type(self) -> MessageType:
+ if "id" in self.payload and "method" in self.payload:
+ return MessageType.REQ
+ elif "id" in self.payload:
+ return MessageType.RESP
+ elif "method" in self.payload:
+ return MessageType.NOTE
+ assert False, f"Unknown message type: {self.payload}"
+
+ def encode(self) -> bytes:
+ msg = json.dumps(self.payload, indent=None, separators=(",", ":"))
+ return f"{msg}\n".encode()
+
+ def matches(self, other: Optional["Message"]) -> bool:
+ """Returns true iff other is a subset of this message."""
+ if not other:
+ return True
+
+ # The other payload must be a subset of this payload, meaning if we were to
+ # add its payload to ours, the payload is the same.
+ return self.payload | other.payload == self.payload
+
+ # Various typed wrappers around self.payload['field']
+
+ @property
+ def method(self) -> str:
+ return self.payload["method"]
+
+ @property
+ def id(self) -> int:
+ return self.payload["id"]
+
+ @property
+ def params(self) -> dict[str, Any]:
+ return self.payload.get("params", {})
+
+ @property
+ def result(self) -> dict[str, Any]:
+ return self.payload.get("result", {})
+
+ @property
+ def error(self) -> dict[str, Any]:
+ return self.payload.get("error", {})
+
+ @property
+ def success(self) -> bool:
+ return not hasattr(self.payload, "error")
+
+
+class Transport:
+ r: asyncio.StreamReader
+ w: asyncio.StreamWriter
+
+ def __init__(self, r: asyncio.StreamReader, w: asyncio.StreamWriter):
+ self.r = r
+ self.w = w
+
+ def write(self, message: Message):
+ logger.info("--> %s", message)
+ self.w.write(message.encode())
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.w.close()
+
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ line = await self.r.readline()
+ if line == b"":
+ raise StopAsyncIteration
+ return Message.decode(line)
+
+
+class Invoker:
+ name: str
+ message_type: MessageType
+ defaults: dict
+ transport: Optional[Transport] = None
+ handler: Optional["MessageHandler"] = None
+
+ def __init__(self, name, message_type: MessageType, defaults: dict = {}):
+ self.name = name
+ self.message_type = message_type
+ self.defaults = defaults
+
+ def __call__(self, **kwargs):
+ assert self.transport and self.handler
+ if self.message_type == MessageType.REQ:
+ return self.handler.request(self.name, params=self.defaults | kwargs)
+ elif self.message_type == MessageType.NOTE:
+ return self.handler.event(self.name, params=self.defaults | kwargs)
+
+
+class Handler:
+ name: str
+ message_type: MessageType
+
+ def __init__(self, name: str, message_type: MessageType):
+ self.name = name
+ self.message_type = message_type
+
+ def __call__(self):
+ def wrap(fn):
+ return RequestWrapper(self.name, fn)
+
+ return wrap
+
+
+Params = TypeVar("Params", bound=dict)
+Result = TypeVar("Result")
+
+
+class EventDescriptor(Generic[Params]):
+ invoker: Callable[Params, None]
+ handler: Callable[Params, None]
+
+ def __init__(self, name: str):
+ self.name = name
+
+ self.invoker = Invoker(name=name, message_type=MessageType.NOTE)
+ self.handler = Handler(name, MessageType.NOTE)
+
+
+class RequestDescriptor(Generic[Params, Result]):
+ invoker: Callable[Params, Awaitable[Result]]
+ handler: Callable[Params, Awaitable[Result]]
+
+ def __init__(self, name: str, defaults: Params = {}):
+ self.name = name
+
+ self.invoker = Invoker(name, MessageType.REQ, defaults)
+ self.handler = Handler(name, MessageType.REQ)
+
+
+class RequestWrapper:
+ name: str
+ fn: Callable
+ handler: "MessageHandler"
+
+ def __init__(self, name, fn):
+ self.name = name
+ self.fn = fn
+
+ def __call__(self, *args, **kwargs):
+ assert self.handler is not None
+ _ = kwargs.pop("_meta", None)
+ return self.fn(self.handler, *args, **kwargs)
+
+
+class MessageHandler:
+ seq: int = 0
+ handlers: dict[str, RequestWrapper] = {}
+ invokers: dict[str, Invoker] = {}
+ inflight: dict[int, asyncio.Future] = {}
+ transport: Transport
+
+ def __init_subclass__(cls):
+ super().__init_subclass__()
+ for i in dir(cls):
+ attr = getattr(cls, i)
+ if isinstance(attr, RequestWrapper):
+ cls.handlers[attr.name] = attr
+ if isinstance(attr, Invoker):
+ cls.invokers[attr.name] = attr
+
+ def __init__(self, transport: Transport):
+ self.transport = transport
+ for invoker in self.invokers.values():
+ invoker.transport = transport
+ invoker.handler = self
+ for handlers in self.handlers.values():
+ handlers.handler = self
+
+ _handler: Optional[asyncio.Task] = None
+
+ async def __aenter__(self):
+ self._handler = asyncio.create_task(self.run())
+ return self
+
+ async def run(self):
+ async for message in self.transport:
+ logger.info("<-- %s", message)
+ if message.message_type == MessageType.REQ:
+ handler = self.handlers.get(message.method)
+ if not handler:
+ self.transport.write(
+ Message.from_dict(
+ {
+ "id": message.id,
+ "error": {
+ "code": -32601,
+ "message": "Method not found",
+ },
+ }
+ )
+ )
+ continue
+ try:
+ result = await handler(**message.params)
+ self.transport.write(
+ Message.from_dict(
+ {
+ "id": message.id,
+ "result": result,
+ }
+ )
+ )
+ except Exception as e:
+ print("Internal error:", file=sys.stderr)
+ traceback.print_exc(file=sys.stderr)
+ self.transport.write(
+ Message.from_dict(
+ {
+ "id": message.id,
+ "error": {
+ "code": -32603,
+ "message": "Internal error",
+ },
+ }
+ )
+ )
+ elif message.message_type == MessageType.RESP:
+ future = self.inflight.pop(message.id, None)
+ if not future:
+ continue
+ future.set_result(message.result)
+ elif message.message_type == MessageType.NOTE:
+ fn = self.handlers.get(message.method)
+ if fn:
+ fn(**message.params)
+ else:
+ logger.info("no handler for %s", message.method)
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self._handler:
+ self._handler.cancel()
+ try:
+ await self._handler
+ except asyncio.CancelledError:
+ pass
+ self._handler = None
+
+ async def request(self, name: str, params: dict):
+ self.seq += 1
+ msg = Message.from_dict(
+ {
+ "id": self.seq,
+ "method": name,
+ "params": params,
+ }
+ )
+ resp_future = asyncio.get_running_loop().create_future()
+ self.inflight[self.seq] = resp_future
+ self.transport.write(msg)
+ return await resp_future
+
+ def event(self, name: str, params: dict):
+ msg = Message.from_dict(
+ {
+ "method": name,
+ "params": params,
+ }
+ )
+ self.transport.write(msg)
+
+ @classmethod
+ async def acceptClient(
+ cls,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter,
+ ):
+ try:
+ with Transport(reader, writer) as client:
+ server = cls(client)
+ await server.run()
+ except:
+ logger.exception("mcp client failed", exc_info=True)
>From d4e3bcdbcac6dd365660fa874b7090a4bf81f251 Mon Sep 17 00:00:00 2001
From: John Harrison <harjohn at google.com>
Date: Wed, 24 Sep 2025 17:14:04 -0700
Subject: [PATCH 2/3] Cleanups.
---
lldb/examples/mcp/README.md | 4 ++--
lldb/examples/mcp/server.py | 9 ++++-----
2 files changed, 6 insertions(+), 7 deletions(-)
diff --git a/lldb/examples/mcp/README.md b/lldb/examples/mcp/README.md
index 7296fc75db3b4..a2cea30dd4ef8 100644
--- a/lldb/examples/mcp/README.md
+++ b/lldb/examples/mcp/README.md
@@ -9,8 +9,8 @@ To load the backport use:
(lldb) start_mcp
```
-Then you can use the `lldb-mcp` script in this directory to launch a client for
-the running server.
+Then you can use the `./lldb-mcp` script in this directory to launch a client
+for the running server.
For example,
diff --git a/lldb/examples/mcp/server.py b/lldb/examples/mcp/server.py
index 59d55ac10ec9f..af05800a87e8f 100644
--- a/lldb/examples/mcp/server.py
+++ b/lldb/examples/mcp/server.py
@@ -126,9 +126,6 @@ def __init__(
super().__init__(transport)
self.tools = {tool.name: tool for tool in tools}
- def __del__(self):
- print("deleting MCPServer....")
-
@protocol.initialize.handler()
async def initialize(
self, **params: protocol.InitializeParams
@@ -192,8 +189,10 @@ async def run(opts: argparse.Namespace, notify: Optional[queue.Queue] = None):
await server.serve_forever()
-# A registration count, if this module is loaded for multiple debugger then we
-# should only stop the global server if all registrations have been removed
+# A registration count, if this module is loaded for multiple lldb.SBDebugger
+# instances then we should only stop the global server if all registrations have
+# been removed. This could happen with lldb-rpc-server or lldb-dap in server
+# mode.
registration_count = 0
>From 1213c6c80f3ff0c844335f3de61716bec73bff6e Mon Sep 17 00:00:00 2001
From: John Harrison <harjohn at google.com>
Date: Wed, 24 Sep 2025 17:20:13 -0700
Subject: [PATCH 3/3] Adding a development section to the readme.
---
lldb/examples/mcp/README.md | 20 ++++++++++++++++++++
1 file changed, 20 insertions(+)
diff --git a/lldb/examples/mcp/README.md b/lldb/examples/mcp/README.md
index a2cea30dd4ef8..2b8977c95951e 100644
--- a/lldb/examples/mcp/README.md
+++ b/lldb/examples/mcp/README.md
@@ -24,3 +24,23 @@ For example,
}
}
```
+
+## Development
+
+For getting started with making changes to this backport, use the
+[MCP Inspector](https://modelcontextprotocol.io/docs/tools/inspector) to run the
+binary.
+
+In one terminal, start the lldb server:
+
+```
+$ lldb
+(lldb) command script import --allow-reload server.py
+(lldb) start_mcp --log-file=/tmp/lldb-mcp-server.log
+```
+
+Then launch the inspector to run specific operations.
+
+```sh
+$ npx @modelcontextprotocol/inspector ./lldb-mcp --log-file=/tmp/lldb-mcp.log
+```
More information about the lldb-commits
mailing list