[llvm] [llvm][utils] Add a script to use PRs over pushing to main (PR #166473)

Aiden Grossman via llvm-commits llvm-commits at lists.llvm.org
Wed Nov 19 11:06:20 PST 2025


================
@@ -0,0 +1,696 @@
+#!/usr/bin/env python3
+"""A script to automate the creation and landing of a stack of Pull Requests."""
+
+import argparse
+import json
+import os
+import re
+import subprocess
+import sys
+import time
+import urllib.error
+import urllib.request
+
+from typing import List, Optional
+from http.client import HTTPResponse
+from dataclasses import dataclass
+
+# --- Constants --- #
+BASE_BRANCH = "main"
+GITHUB_REMOTE_NAME = "origin"
+UPSTREAM_REMOTE_NAME = "upstream"
+
+LLVM_GITHUB_TOKEN_VAR = "LLVM_GITHUB_TOKEN"
+LLVM_REPO = "llvm/llvm-project"
+GITHUB_API = "https://api.github.com"
+
+MERGE_MAX_RETRIES = 10
+MERGE_RETRY_DELAY = 5  # seconds
+REQUEST_TIMEOUT = 30  # seconds
+
+
+class LlvmPrError(Exception):
+    """Custom exception for errors in the PR automator script."""
+
+
+ at dataclass
+class PRAutomatorConfig:
+    """Configuration Data."""
+
+    user_login: str
+    token: str
+    base_branch: str
+    upstream_remote: str
+    prefix: str
+    draft: bool
+    no_merge: bool
+    auto_merge: bool
+
+
+class CommandRunner:
+    """Handles command execution and output.
+    Supports dry runs and verbosity level."""
+
+    def __init__(
+        self, dry_run: bool = False, verbose: bool = False, quiet: bool = False
+    ):
+        self.dry_run = dry_run
+        self.verbose = verbose
+        self.quiet = quiet
+
+    def print(self, message: str, file=sys.stdout) -> None:
+        if self.quiet and file == sys.stdout:
+            return
+        print(message, file=file)
+
+    def verbose_print(self, message: str, file=sys.stdout) -> None:
+        if self.verbose:
+            print(message, file)
+
+    def run_command(
+        self,
+        command: List[str],
+        check: bool = True,
+        capture_output: bool = False,
+        text: bool = False,
+        stdin_input: Optional[str] = None,
+        read_only: bool = False,
+        env: Optional[dict] = None,
+    ) -> subprocess.CompletedProcess:
+        if self.dry_run and not read_only:
+            self.print(f"[Dry Run] Would run: {' '.join(command)}")
+            return subprocess.CompletedProcess(command, 0, "", "")
+
+        self.verbose_print(f"Running: {' '.join(command)}")
+
+        try:
+            return subprocess.run(
+                command,
+                check=check,
+                capture_output=capture_output,
+                text=text,
+                input=stdin_input,
+                env=env,
+            )
+        except FileNotFoundError as e:
+            raise LlvmPrError(
+                f"Command '{command[0]}' not found. Is it installed and in your PATH?"
+            ) from e
+        except subprocess.CalledProcessError as e:
+            self.print(f"Error running command: {' '.join(command)}", file=sys.stderr)
+            if e.stdout:
+                self.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
+            if e.stderr:
+                self.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
+            raise e
+
+
+class GitHubAPI:
+    """A wrapper for the GitHub API."""
+
+    def __init__(self, runner: CommandRunner, token: str):
+        self.runner = runner
+        self.headers = {
+            "Authorization": f"token {token}",
+            "Accept": "application/vnd.github.v3+json",
+            "User-Agent": "llvm-push-pr",
+        }
+        self.opener = urllib.request.build_opener(
+            urllib.request.HTTPHandler(), urllib.request.HTTPSHandler()
+        )
+
+    def _request(
+        self, method: str, endpoint: str, json_payload: Optional[dict] = None
+    ) -> HTTPResponse:
+        url = f"{GITHUB_API}{endpoint}"
+        self.runner.verbose_print(f"API Request: {method.upper()} {url}")
+        if json_payload:
+            self.runner.verbose_print(f"Payload: {json_payload}")
+
+        data = None
+        headers = self.headers.copy()
+        if json_payload:
+            data = json.dumps(json_payload).encode("utf-8")
+            headers["Content-Type"] = "application/json"
+
+        req = urllib.request.Request(
+            url, data=data, headers=headers, method=method.upper()
+        )
+
+        try:
+            return self.opener.open(req, timeout=REQUEST_TIMEOUT)
+        except urllib.error.HTTPError as e:
+            self.runner.print(
+                f"Error making API request to {url}: {e}", file=sys.stderr
+            )
+            self.runner.verbose_print(
+                f"Error response body: {e.read().decode()}", file=sys.stderr
+            )
+            raise e
+
+    def _request_and_parse_json(
+        self, method: str, endpoint: str, json_payload: Optional[dict] = None
+    ) -> dict:
+        with self._request(method, endpoint, json_payload) as response:
+            # Expect a 200 'OK' or 201 'Created' status on success and JSON body.
+            self._log_unexpected_status([200, 201], response.status)
+
+            response_text = response.read().decode("utf-8")
+            if response_text:
+                return json.loads(response_text)
+            return {}
+
+    def _request_no_content(
+        self, method: str, endpoint: str, json_payload: Optional[dict] = None
+    ) -> None:
+        with self._request(method, endpoint, json_payload) as response:
+            # Expected a 204 No Content status on success, indicating the
+            # operation was successful but there is no body.
+            self._log_unexpected_status([204], response.status)
+
+    def _log_unexpected_status(
+        self, expected_statuses: List[int], actual_status: int
+    ) -> None:
+        if actual_status not in expected_statuses:
+            self.runner.print(
+                f"Warning: Expected status {', '.join(map(str, expected_statuses))}, but got {actual_status}",
+                file=sys.stderr,
+            )
+
+    def get_user_login(self) -> str:
+        return self._request_and_parse_json("GET", "/user")["login"]
+
+    def create_pr(
+        self,
+        head_branch: str,
+        base_branch: str,
+        title: str,
+        body: str,
+        draft: bool,
+    ) -> Optional[str]:
+        self.runner.print(f"Creating pull request for '{head_branch}'...")
+        data = {
+            "title": title,
+            "body": body,
+            "head": head_branch,
+            "base": base_branch,
+            "draft": draft,
+        }
+        response_data = self._request_and_parse_json(
+            "POST", f"/repos/{LLVM_REPO}/pulls", json_payload=data
+        )
+        pr_url = response_data.get("html_url")
+        if not self.runner.dry_run:
+            self.runner.print(f"Pull request created: {pr_url}")
+        return pr_url
+
+    def get_repo_settings(self) -> dict:
+        return self._request_and_parse_json("GET", f"/repos/{LLVM_REPO}")
+
+    def _get_pr_details(self, pr_number: str) -> dict:
+        """Fetches the JSON details for a given pull request number."""
+        return self._request_and_parse_json(
+            "GET", f"/repos/{LLVM_REPO}/pulls/{pr_number}"
+        )
+
+    def _attempt_squash_merge(self, pr_number: str) -> bool:
+        """Attempts to squash merge a PR, returning True on success."""
+        try:
+            self._request_and_parse_json(
+                "PUT",
+                f"/repos/{LLVM_REPO}/pulls/{pr_number}/merge",
+                json_payload={"merge_method": "squash"},
+            )
+            return True
+        except urllib.error.HTTPError as e:
+            # A 405 status code means the PR is not in a mergeable state.
+            if e.code == 405:
+                return False
+            # Re-raise other HTTP errors.
+            raise e
+
+    def merge_pr(self, pr_url: str) -> Optional[str]:
+        if not pr_url:
+            return None
+
+        if self.runner.dry_run:
+            self.runner.print(f"[Dry Run] Would merge {pr_url}")
+            return None
+
+        pr_number_match = re.search(r"/pull/(\d+)", pr_url)
+        if not pr_number_match:
+            raise LlvmPrError(f"Could not extract PR number from URL: {pr_url}")
+        pr_number = pr_number_match.group(1)
+
+        for i in range(MERGE_MAX_RETRIES):
+            self.runner.print(
+                f"Attempting to merge {pr_url} (attempt {i + 1}/{MERGE_MAX_RETRIES})..."
+            )
+
+            pr_data = self._get_pr_details(pr_number)
+            head_branch = pr_data["head"]["ref"]
+
+            if pr_data.get("mergeable_state") == "dirty":
+                raise LlvmPrError("Merge conflict.")
+
+            if pr_data.get("mergeable"):
+                if self._attempt_squash_merge(pr_number):
+                    self.runner.print("Successfully merged.")
+                    time.sleep(2)  # Allow GitHub's API to update.
+                    return head_branch
+
+            self.runner.print(
+                f"PR not mergeable yet (state: {pr_data.get('mergeable_state', 'unknown')}). Retrying in {MERGE_RETRY_DELAY} seconds..."
+            )
+            time.sleep(MERGE_RETRY_DELAY)
+
+        raise LlvmPrError(f"PR was not mergeable after {MERGE_MAX_RETRIES} attempts.")
+
+    def enable_auto_merge(self, pr_url: str) -> None:
+        if not pr_url:
+            return
+
+        if self.runner.dry_run:
+            self.runner.print(f"[Dry Run] Would enable auto-merge for {pr_url}")
+            return
+
+        pr_number_match = re.search(r"/pull/(\d+)", pr_url)
+        if not pr_number_match:
+            raise LlvmPrError(f"Could not extract PR number from URL: {pr_url}")
+        pr_number = pr_number_match.group(1)
+
+        self.runner.print(f"Enabling auto-merge for {pr_url}...")
+        data = {
+            "enabled": True,
+            "merge_method": "squash",
+        }
+        self._request_no_content(
+            "PUT",
+            f"/repos/{LLVM_REPO}/pulls/{pr_number}/auto-merge",
+            json_payload=data,
+        )
+        self.runner.print("Auto-merge enabled.")
+
+    def delete_branch(
+        self, branch_name: str, default_branch: Optional[str] = None
+    ) -> None:
+        if default_branch and branch_name == default_branch:
+            self.runner.print(
+                f"Error: Refusing to delete the default branch '{branch_name}'.",
+                file=sys.stderr,
+            )
+            return
+        try:
+            self._request_no_content(
+                "DELETE", f"/repos/{LLVM_REPO}/git/refs/heads/{branch_name}"
+            )
+        except urllib.error.HTTPError as e:
+            if e.code == 422:
+                self.runner.print(
+                    f"Warning: Remote branch '{branch_name}' was already deleted, skipping deletion.",
+                    file=sys.stderr,
+                )
+            else:
+                raise e
+
+
+class LLVMPRAutomator:
+    """Automates the process of creating and landing a stack of GitHub Pull Requests."""
+
+    def __init__(
+        self,
+        runner: CommandRunner,
+        github_api: "GitHubAPI",
+        config: "PRAutomatorConfig",
+        remote: str,
+    ):
+        self.runner = runner
+        self.github_api = github_api
+        self.config = config
+        self.remote = remote
+        self.original_branch: str = ""
+        self.created_branches: List[str] = []
+        self.repo_settings: dict = {}
+        self._git_askpass_cmd = (
+            f"python3 -c \"import os; print(os.environ['{LLVM_GITHUB_TOKEN_VAR}'])\""
+        )
+
+    def _get_git_env(self) -> dict:
+        git_env = os.environ.copy()
+        git_env["GIT_ASKPASS"] = self._git_askpass_cmd
+        git_env[LLVM_GITHUB_TOKEN_VAR] = self.config.token
+        git_env["GIT_TERMINAL_PROMPT"] = "0"
+        return git_env
+
+    def _run_cmd(
----------------
boomanaiden154 wrote:

Is an extra function here really necessary?

https://github.com/llvm/llvm-project/pull/166473


More information about the llvm-commits mailing list