[llvm] [llvm][utils] Add a script to use PRs over pushing to main (PR #166473)

Paul Kirth via llvm-commits llvm-commits at lists.llvm.org
Tue Nov 18 11:05:27 PST 2025


https://github.com/ilovepi updated https://github.com/llvm/llvm-project/pull/166473

>From 5f6f156b268837bea0a7d69649f711967713e5e6 Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Tue, 4 Nov 2025 15:55:32 -0800
Subject: [PATCH 1/9] [llvm][utils] Add a script to use PRs over pushing to
 main

As discussed on discourse
https://discourse.llvm.org/t/rfc-require-pull-requests-for-all-llvm-project-commits/88164

This is an attempt at a script to automatically make and land PRs.

It creates a branch (or a series of branches for stacks), and lands them
one by one, without waiting for review or checks to pass.

It supports --auto-merge, for single PRs. For stacks, use Graphite or
some other tool.

It can work with GitHub tokens for https or ssh keys.

Example:
```console
GITHUB_TOKEN=$(gh auth token) python3 llvm_push_pr.py --upstream-remote origin
```
---
 llvm/utils/llvm_push_pr.py | 653 +++++++++++++++++++++++++++++++++++++
 1 file changed, 653 insertions(+)
 create mode 100644 llvm/utils/llvm_push_pr.py

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
new file mode 100644
index 0000000000000..260dd8c03970e
--- /dev/null
+++ b/llvm/utils/llvm_push_pr.py
@@ -0,0 +1,653 @@
+#!/usr/bin/env python3
+"""A script to automate the creation and landing of a stack of Pull Requests."""
+
+import argparse
+import os
+import re
+import subprocess
+import sys
+import time
+from typing import List, Optional
+
+import requests
+
+
+class Printer:
+    """Handles all output and command execution, with options for dry runs and verbosity."""
+
+    def __init__(
+        self, dry_run: bool = False, verbose: bool = False, quiet: bool = False
+    ):
+        """Initializes the Printer with dry_run, verbose, and quiet settings."""
+        self.dry_run = dry_run
+        self.verbose = verbose
+        self.quiet = quiet
+
+    def print(self, message: str, file=sys.stdout):
+        """Prints a message to the specified file, respecting quiet mode."""
+        if self.quiet and file == sys.stdout:
+            return
+        print(message, file=file)
+
+    def run_command(
+        self,
+        command: List[str],
+        check: bool = True,
+        capture_output: bool = False,
+        text: bool = False,
+        stdin_input: Optional[str] = None,
+        read_only: bool = False,
+    ) -> subprocess.CompletedProcess:
+        """Runs a shell command, handling dry runs, verbosity, and errors."""
+        if self.dry_run and not read_only:
+            self.print(f"[Dry Run] Would run: {' '.join(command)}")
+            return subprocess.CompletedProcess(command, 0, "", "")
+
+        if self.verbose:
+            self.print(f"Running: {' '.join(command)}")
+
+        try:
+            return subprocess.run(
+                command,
+                check=check,
+                capture_output=capture_output,
+                text=text,
+                input=stdin_input,
+            )
+        except FileNotFoundError:
+            self.print(
+                f"Error: Command '{command[0]}' not found. Is it installed and in your PATH?",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+        except subprocess.CalledProcessError as e:
+            if check:
+                self.print(
+                    f"Error running command: {' '.join(command)}", file=sys.stderr
+                )
+                if e.stdout:
+                    self.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
+                if e.stderr:
+                    self.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
+                raise e
+            return e
+
+
+class GitHubAPI:
+    """A wrapper for the GitHub API."""
+
+    BASE_URL = "https://api.github.com"
+
+    def __init__(self, repo_slug: str, printer: Printer, token: str):
+        self.repo_slug = repo_slug
+        self.printer = printer
+        self.headers = {
+            "Authorization": f"token {token}",
+            "Accept": "application/vnd.github.v3+json",
+        }
+
+    def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
+        url = f"{self.BASE_URL}{endpoint}"
+        if self.printer.verbose:
+            self.printer.print(f"API Request: {method.upper()} {url}")
+            if "json" in kwargs:
+                self.printer.print(f"Payload: {kwargs['json']}")
+
+        try:
+            response = requests.request(
+                method, url, headers=self.headers, timeout=30, **kwargs
+            )
+            response.raise_for_status()
+            return response
+        except requests.exceptions.RequestException as e:
+            self.printer.print(
+                f"Error making API request to {url}: {e}", file=sys.stderr
+            )
+            if e.response is not None:
+                self.printer.print(f"Response: {e.response.text}", file=sys.stderr)
+            raise
+
+    def get_user_login(self) -> str:
+        """Gets the current user's login name."""
+        response = self._request("get", "/user")
+        return response.json()["login"]
+
+    def create_pr(
+        self,
+        head_branch: str,
+        base_branch: str,
+        title: str,
+        body: str,
+        draft: bool,
+    ) -> Optional[str]:
+        """Creates a GitHub Pull Request."""
+        self.printer.print(f"Creating pull request for '{head_branch}'...")
+        data = {
+            "title": title,
+            "body": body,
+            "head": head_branch,
+            "base": base_branch,
+            "draft": draft,
+        }
+        response = self._request("post", f"/repos/{self.repo_slug}/pulls", json=data)
+        pr_url = response.json().get("html_url")
+        if not self.printer.dry_run:
+            self.printer.print(f"Pull request created: {pr_url}")
+        return pr_url
+
+    def get_repo_settings(self) -> dict:
+        """Gets repository settings."""
+        response = self._request("get", f"/repos/{self.repo_slug}")
+        return response.json()
+
+    def merge_pr(self, pr_url: str):
+        """Merges a PR, retrying if it's not yet mergeable."""
+        if not pr_url:
+            return
+
+        if self.printer.dry_run:
+            self.printer.print(f"[Dry Run] Would merge {pr_url}")
+            return
+
+        pr_number_match = re.search(r"/pull/(\d+)", pr_url)
+        if not pr_number_match:
+            self.printer.print(
+                f"Could not extract PR number from URL: {pr_url}",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+        pr_number = pr_number_match.group(1)
+
+        head_branch = ""
+        max_retries = 10
+        retry_delay = 5  # seconds
+        for i in range(max_retries):
+            self.printer.print(
+                f"Attempting to merge {pr_url} (attempt {i+1}/{max_retries})..."
+            )
+
+            pr_data_response = self._request(
+                "get", f"/repos/{self.repo_slug}/pulls/{pr_number}"
+            )
+            pr_data = pr_data_response.json()
+            head_branch = pr_data["head"]["ref"]
+
+            if pr_data["mergeable"]:
+                merge_data = {
+                    "commit_title": f"{pr_data['title']} (#{pr_number})",
+                    "merge_method": "squash",
+                }
+                try:
+                    self._request(
+                        "put",
+                        f"/repos/{self.repo_slug}/pulls/{pr_number}/merge",
+                        json=merge_data,
+                    )
+                    self.printer.print("Successfully merged.")
+                    time.sleep(2)
+                    return head_branch
+                except requests.exceptions.RequestException as e:
+                    if e.response and e.response.status_code == 405:
+                        self.printer.print(
+                            "PR not mergeable yet. Retrying in "
+                            f"{retry_delay} seconds..."
+                        )
+                        time.sleep(retry_delay)
+                    else:
+                        raise e
+            elif pr_data["mergeable_state"] == "dirty":
+                self.printer.print("Error: Merge conflict.", file=sys.stderr)
+                sys.exit(1)
+            else:
+                self.printer.print(
+                    f"PR not mergeable yet ({pr_data['mergeable_state']}). "
+                    f"Retrying in {retry_delay} seconds..."
+                )
+                time.sleep(retry_delay)
+
+        self.printer.print(
+            f"Error: PR was not mergeable after {max_retries} attempts.",
+            file=sys.stderr,
+        )
+        sys.exit(1)
+
+    def enable_auto_merge(self, pr_url: str):
+        """Enables auto-merge for a pull request."""
+        if not pr_url:
+            return
+
+        if self.printer.dry_run:
+            self.printer.print(f"[Dry Run] Would enable auto-merge for {pr_url}")
+            return
+
+        pr_number_match = re.search(r"/pull/(\d+)", pr_url)
+        if not pr_number_match:
+            self.printer.print(
+                f"Could not extract PR number from URL: {pr_url}",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+        pr_number = pr_number_match.group(1)
+
+        self.printer.print(f"Enabling auto-merge for {pr_url}...")
+        data = {
+            "enabled": True,
+            "merge_method": "squash",
+        }
+        self._request(
+            "put",
+            f"/repos/{self.repo_slug}/pulls/{pr_number}/auto-merge",
+            json=data,
+        )
+        self.printer.print("Auto-merge enabled.")
+
+    def delete_branch(self, branch_name: str, default_branch: Optional[str] = None):
+        """Deletes a remote branch."""
+        if default_branch and branch_name == default_branch:
+            self.printer.print(
+                f"Error: Refusing to delete the default branch '{branch_name}'.",
+                file=sys.stderr,
+            )
+            return
+        self.printer.print(f"Deleting remote branch '{branch_name}'")
+        try:
+            self._request(
+                "delete", f"/repos/{self.repo_slug}/git/refs/heads/{branch_name}"
+            )
+        except requests.exceptions.RequestException as e:
+            if (
+                e.response is not None
+                and e.response.status_code == 422
+                and "Reference does not exist" in e.response.text
+            ):
+                if self.printer.verbose:
+                    self.printer.print(
+                        f"Warning: Remote branch '{branch_name}' was already deleted, skipping deletion.",
+                        file=sys.stderr,
+                    )
+                return
+            self.printer.print(
+                f"Could not delete remote branch '{branch_name}': {e}",
+                file=sys.stderr,
+            )
+            raise
+
+
+class LLVMPRAutomator:
+    """Automates the process of creating and landing a stack of GitHub Pull Requests."""
+
+    def __init__(
+        self,
+        args: argparse.Namespace,
+        printer: Printer,
+        github_api: "GitHubAPI",
+    ):
+        self.args = args
+        self.printer = printer
+        self.github_api = github_api
+        self.original_branch: str = ""
+        self.repo_slug: str = ""
+        self.created_branches: List[str] = []
+        self.repo_settings: dict = {}
+
+    def _run_cmd(self, command: List[str], read_only: bool = False, **kwargs):
+        """Wrapper for run_command that passes the dry_run flag."""
+        return self.printer.run_command(command, read_only=read_only, **kwargs)
+
+    def _get_repo_slug(self) -> str:
+        """Gets the GitHub repository slug from the remote URL."""
+        result = self._run_cmd(
+            ["git", "remote", "get-url", self.args.remote],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        url = result.stdout.strip()
+        match = re.search(r"github\.com[/:]([\w.-]+/[\w.-]+)", url)
+        if not match:
+            self.printer.print(
+                f"Error: Could not parse repository slug from remote URL: {url}",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+        return match.group(1).replace(".git", "")
+
+    def _get_current_branch(self) -> str:
+        """Gets the current git branch."""
+        result = self._run_cmd(
+            ["git", "rev-parse", "--abbrev-ref", "HEAD"],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        return result.stdout.strip()
+
+    def _check_work_tree_is_clean(self):
+        """Exits if the git work tree has uncommitted or unstaged changes."""
+        result = self._run_cmd(
+            ["git", "status", "--porcelain"],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        if result.stdout.strip():
+            self.printer.print(
+                "Error: Your working tree is dirty. Please stash or commit your changes.",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+
+    def _rebase_current_branch(self):
+        """Rebases the current branch on top of the upstream base."""
+        self._check_work_tree_is_clean()
+
+        target = f"{self.args.upstream_remote}/{self.args.base}"
+        self.printer.print(
+            f"Fetching from '{self.args.upstream_remote}' and rebasing '{self.original_branch}' on top of '{target}'..."
+        )
+        self._run_cmd(["git", "fetch", self.args.upstream_remote, self.args.base])
+
+        try:
+            self._run_cmd(["git", "rebase", target])
+        except subprocess.CalledProcessError as e:
+            self.printer.print(
+                "Error: The rebase operation failed, likely due to a merge conflict.",
+                file=sys.stderr,
+            )
+            if e.stdout:
+                self.printer.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
+            if e.stderr:
+                self.printer.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
+
+            # Check if rebase is in progress before aborting
+            rebase_status_result = self._run_cmd(
+                ["git", "status", "--verify-status=REBASE_HEAD"],
+                check=False,
+                capture_output=True,
+                text=True,
+                read_only=True,
+            )
+            if (
+                rebase_status_result.returncode == 0
+            ):  # REBASE_HEAD exists, so rebase is in progress
+                self.printer.print("Aborting rebase...", file=sys.stderr)
+                self._run_cmd(["git", "rebase", "--abort"], check=False)
+            sys.exit(1)
+
+    def _get_commit_stack(self) -> List[str]:
+        """Gets the stack of commits between the current branch's HEAD and its merge base with upstream."""
+        target = f"{self.args.upstream_remote}/{self.args.base}"
+        merge_base_result = self._run_cmd(
+            ["git", "merge-base", "HEAD", target],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        merge_base = merge_base_result.stdout.strip()
+        if not merge_base:
+            self.printer.print(
+                f"Error: Could not find a merge base between HEAD and {target}.",
+                file=sys.stderr,
+            )
+            sys.exit(1)
+
+        result = self._run_cmd(
+            ["git", "rev-list", "--reverse", f"{merge_base}..HEAD"],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        commits = result.stdout.strip().split("\n")
+        return [c for c in commits if c]
+
+    def _get_commit_details(self, commit_hash: str) -> tuple[str, str]:
+        """Gets the title and body of a commit."""
+        result = self._run_cmd(
+            ["git", "show", "-s", "--format=%s%n%n%b", commit_hash],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        parts = result.stdout.strip().split("\n\n", 1)
+        title = parts[0]
+        body = parts[1] if len(parts) > 1 else ""
+        return title, body
+
+    def _sanitize_for_branch_name(self, text: str) -> str:
+        """Sanitizes a string to be used as a git branch name."""
+        sanitized = re.sub(r"[^\w\s-]", "", text).strip().lower()
+        sanitized = re.sub(r"[-\s]+", "-", sanitized)
+        # Use "auto-pr" as a fallback.
+        return sanitized or "auto-pr"
+
+    def _create_and_push_branch_for_commit(
+        self, commit_hash: str, base_branch_name: str, index: int
+    ) -> str:
+        """Creates and pushes a temporary branch pointing to a specific commit."""
+        branch_name = f"{self.args.prefix}{base_branch_name}-{index + 1}"
+        commit_title, _ = self._get_commit_details(commit_hash)
+        self.printer.print(f"Processing commit {commit_hash[:7]}: {commit_title}")
+        self.printer.print(f"Creating and pushing temporary branch '{branch_name}'")
+
+        self._run_cmd(["git", "branch", "-f", branch_name, commit_hash])
+        push_command = ["git", "push", self.args.remote, branch_name]
+        self._run_cmd(push_command)
+        self.created_branches.append(branch_name)
+        return branch_name
+
+    def run(self):
+        """Main entry point for the automator, orchestrates the PR creation and merging process."""
+        self.repo_slug = self._get_repo_slug()
+        self.repo_settings = self.github_api.get_repo_settings()
+        self.original_branch = self._get_current_branch()
+        self.printer.print(f"On branch: {self.original_branch}")
+
+        try:
+            self._rebase_current_branch()
+            initial_commits = self._get_commit_stack()
+
+            if not initial_commits:
+                self.printer.print("No new commits to process.")
+                return
+
+            if self.args.auto_merge and len(initial_commits) > 1:
+                self.printer.print(
+                    "Error: --auto-merge is only supported for a single commit.",
+                    file=sys.stderr,
+                )
+                sys.exit(1)
+
+            if self.args.no_merge and len(initial_commits) > 1:
+                self.printer.print(
+                    "Error: --no-merge is only supported for a single commit. "
+                    "For stacks, the script must merge sequentially.",
+                    file=sys.stderr,
+                )
+                sys.exit(1)
+
+            self.printer.print(f"Found {len(initial_commits)} commit(s) to process.")
+            branch_base_name = self.original_branch
+            if self.original_branch in ["main", "master"]:
+                first_commit_title, _ = self._get_commit_details(initial_commits[0])
+                branch_base_name = self._sanitize_for_branch_name(first_commit_title)
+
+            for i in range(len(initial_commits)):
+                if i > 0:
+                    self._rebase_current_branch()
+
+                commits = self._get_commit_stack()
+                if not commits:
+                    self.printer.print("Success! All commits have been landed.")
+                    break
+
+                commit_to_process = commits[0]
+                commit_title, commit_body = self._get_commit_details(commit_to_process)
+
+                temp_branch = self._create_and_push_branch_for_commit(
+                    commit_to_process, branch_base_name, i
+                )
+                pr_url = self.github_api.create_pr(
+                    head_branch=temp_branch,
+                    base_branch=self.args.base,
+                    title=commit_title,
+                    body=commit_body,
+                    draft=self.args.draft,
+                )
+
+                if not self.args.no_merge:
+                    if self.args.auto_merge:
+                        self.github_api.enable_auto_merge(pr_url)
+                    else:
+                        merged_branch = self.github_api.merge_pr(pr_url)
+                        if merged_branch and not self.repo_settings.get(
+                            "delete_branch_on_merge"
+                        ):
+                            self.github_api.delete_branch(
+                                merged_branch, self.repo_settings.get("default_branch")
+                            )
+
+                    if temp_branch in self.created_branches:
+                        self.created_branches.remove(temp_branch)
+
+        finally:
+            self._cleanup()
+
+    def _cleanup(self):
+        """Cleans up by returning to the original branch and deleting all temporary branches."""
+        self.printer.print(f"Returning to original branch: {self.original_branch}")
+        self._run_cmd(["git", "checkout", self.original_branch], capture_output=True)
+        if self.created_branches:
+            self.printer.print("Cleaning up temporary local branches...")
+            self._run_cmd(["git", "branch", "-D"] + self.created_branches)
+            self.printer.print("Cleaning up temporary remote branches...")
+            self._run_cmd(
+                ["git", "push", self.args.remote, "--delete"] + self.created_branches,
+                check=False,
+            )
+
+
+def check_prerequisites(printer: Printer):
+    """Checks if git is installed and if inside a git repository."""
+    printer.print("Checking prerequisites...")
+    printer.run_command(["git", "--version"], capture_output=True, read_only=True)
+    if not os.getenv("GITHUB_TOKEN"):
+        printer.print(
+            "Error: GITHUB_TOKEN environment variable not set.", file=sys.stderr
+        )
+        sys.exit(1)
+
+    result = printer.run_command(
+        ["git", "rev-parse", "--is-inside-work-tree"],
+        check=False,
+        capture_output=True,
+        text=True,
+        read_only=True,
+    )
+    if result.returncode != 0 or result.stdout.strip() != "true":
+        printer.print(
+            "Error: This script must be run inside a git repository.", file=sys.stderr
+        )
+        sys.exit(1)
+    printer.print("Prerequisites met.")
+
+
+def main():
+    """main entry point"""
+    parser = argparse.ArgumentParser(
+        description="Create and land a stack of Pull Requests."
+    )
+    GITHUB_REMOTE_NAME = "origin"
+    UPSTREAM_REMOTE_NAME = "upstream"
+    BASE_BRANCH = "main"
+
+    printer = Printer()
+    token = os.getenv("GITHUB_TOKEN")
+    default_prefix = "dev/"
+    if token:
+        # Create a temporary API client to get the user login
+        # We don't know the repo slug yet, so pass a dummy value.
+        temp_api = GitHubAPI("", printer, token)
+        try:
+            user_login = temp_api.get_user_login()
+            default_prefix = f"{user_login}/"
+        except requests.exceptions.RequestException as e:
+            printer.print(
+                f"Could not fetch user login from GitHub: {e}", file=sys.stderr
+            )
+
+    parser.add_argument(
+        "--base",
+        default=BASE_BRANCH,
+        help=f"Base branch to target (default: {BASE_BRANCH})",
+    )
+    parser.add_argument(
+        "--remote",
+        default=GITHUB_REMOTE_NAME,
+        help=f"Remote for your fork to push to (default: {GITHUB_REMOTE_NAME})",
+    )
+    parser.add_argument(
+        "--upstream-remote",
+        default=UPSTREAM_REMOTE_NAME,
+        help=f"Remote for the upstream repository (default: {UPSTREAM_REMOTE_NAME})",
+    )
+    parser.add_argument(
+        "--prefix",
+        default=default_prefix,
+        help=f"Prefix for temporary branches (default: {default_prefix})",
+    )
+    parser.add_argument(
+        "--draft", action="store_true", help="Create pull requests as drafts."
+    )
+    parser.add_argument(
+        "--no-merge", action="store_true", help="Create PRs but do not merge them."
+    )
+    parser.add_argument(
+        "--auto-merge",
+        action="store_true",
+        help="Enable auto-merge for each PR instead of attempting to merge immediately.",
+    )
+    parser.add_argument(
+        "--dry-run", action="store_true", help="Print commands without executing them."
+    )
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument(
+        "-v", "--verbose", action="store_true", help="Print all commands being run."
+    )
+    group.add_argument(
+        "-q",
+        "--quiet",
+        action="store_true",
+        help="Print only essential output and errors.",
+    )
+
+    args = parser.parse_args()
+    if args.prefix and not args.prefix.endswith("/"):
+        args.prefix += "/"
+
+    printer = Printer(dry_run=args.dry_run, verbose=args.verbose, quiet=args.quiet)
+    check_prerequisites(printer)
+
+    # Get repo slug from git remote url
+    result = printer.run_command(
+        ["git", "remote", "get-url", args.remote],
+        capture_output=True,
+        text=True,
+        read_only=True,
+    )
+    url = result.stdout.strip()
+    match = re.search(r"github\.com[/:]([\w.-]+/[\w.-]+)", url)
+    if not match:
+        printer.print(
+            f"Error: Could not parse repository slug from remote URL: {url}",
+            file=sys.stderr,
+        )
+        sys.exit(1)
+    repo_slug = match.group(1).replace(".git", "")
+
+    github_api = GitHubAPI(repo_slug, printer, token)
+    automator = LLVMPRAutomator(args, printer, github_api)
+    automator.run()
+
+
+if __name__ == "__main__":
+    main()

>From 1bd011a2c4de23023b4bb9a6c5d5d674e7aee619 Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Thu, 13 Nov 2025 18:23:04 -0800
Subject: [PATCH 2/9] Rewrite script based on review feedback

After starting to address a few comments, I basically realized I needed
a new approach. I by and large kept the structure the same, but have
revised the library usage to ust use things from the standard library.

I've slightly modified some of the initial classes to be provide more
useful abstractions. There's probably room for more improvements, but
I wanted to keep this as simple as possible.
---
 llvm/utils/llvm_push_pr.py | 745 ++++++++++++++++++++-----------------
 1 file changed, 399 insertions(+), 346 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index 260dd8c03970e..15c663318050a 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -2,33 +2,71 @@
 """A script to automate the creation and landing of a stack of Pull Requests."""
 
 import argparse
+import json
 import os
 import re
 import subprocess
 import sys
 import time
+import urllib.error
+import urllib.request
+
 from typing import List, Optional
+from http.client import HTTPResponse
+from dataclasses import dataclass
+
+# --- Constants --- #
+BASE_BRANCH = "main"
+GITHUB_REMOTE_NAME = "origin"
+UPSTREAM_REMOTE_NAME = "upstream"
+
+LLVM_GITHUB_TOKEN_VAR = "LLVM_GITHUB_TOKEN"
+LLVM_REPO = "llvm/llvm-project"
+GITHUB_API = "https://api.github.com"
+
+MERGE_MAX_RETRIES = 10
+MERGE_RETRY_DELAY = 5  # seconds
+REQUEST_TIMEOUT = 30  # seconds
+
+
+class LlvmPrError(Exception):
+    """Custom exception for errors in the PR automator script."""
+
+
+ at dataclass
+class PRAutomatorConfig:
+    """Configuration Data."""
 
-import requests
+    user_login: str
+    token: str
+    base_branch: str
+    upstream_remote: str
+    prefix: str
+    draft: bool
+    no_merge: bool
+    auto_merge: bool
 
 
-class Printer:
-    """Handles all output and command execution, with options for dry runs and verbosity."""
+class CommandRunner:
+    """Handles command execution and output.
+    Supports dry runs and verbosity level."""
 
     def __init__(
         self, dry_run: bool = False, verbose: bool = False, quiet: bool = False
     ):
-        """Initializes the Printer with dry_run, verbose, and quiet settings."""
         self.dry_run = dry_run
         self.verbose = verbose
         self.quiet = quiet
 
-    def print(self, message: str, file=sys.stdout):
-        """Prints a message to the specified file, respecting quiet mode."""
+    def print(self, message: str, file=sys.stdout) -> None:
         if self.quiet and file == sys.stdout:
             return
         print(message, file=file)
 
+    def verbose_print(self, message: str, file=sys.stdout) -> None:
+        if self.verbose:
+            print(message, file)
+
     def run_command(
         self,
         command: List[str],
@@ -38,13 +76,11 @@ def run_command(
         stdin_input: Optional[str] = None,
         read_only: bool = False,
     ) -> subprocess.CompletedProcess:
-        """Runs a shell command, handling dry runs, verbosity, and errors."""
         if self.dry_run and not read_only:
             self.print(f"[Dry Run] Would run: {' '.join(command)}")
             return subprocess.CompletedProcess(command, 0, "", "")
 
-        if self.verbose:
-            self.print(f"Running: {' '.join(command)}")
+        self.verbose_print(f"Running: {' '.join(command)}")
 
         try:
             return subprocess.run(
@@ -54,63 +90,93 @@ def run_command(
                 text=text,
                 input=stdin_input,
             )
-        except FileNotFoundError:
-            self.print(
-                f"Error: Command '{command[0]}' not found. Is it installed and in your PATH?",
-                file=sys.stderr,
-            )
-            sys.exit(1)
+        except FileNotFoundError as e:
+            raise LlvmPrError(
+                f"Command '{command[0]}' not found. Is it installed and in your PATH?"
+            ) from e
         except subprocess.CalledProcessError as e:
-            if check:
-                self.print(
-                    f"Error running command: {' '.join(command)}", file=sys.stderr
-                )
-                if e.stdout:
-                    self.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
-                if e.stderr:
-                    self.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
-                raise e
-            return e
+            self.print(f"Error running command: {' '.join(command)}", file=sys.stderr)
+            if e.stdout:
+                self.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
+            if e.stderr:
+                self.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
+            raise e
 
 
 class GitHubAPI:
     """A wrapper for the GitHub API."""
 
-    BASE_URL = "https://api.github.com"
-
-    def __init__(self, repo_slug: str, printer: Printer, token: str):
-        self.repo_slug = repo_slug
-        self.printer = printer
+    def __init__(self, runner: CommandRunner, token: str):
+        self.runner = runner
         self.headers = {
             "Authorization": f"token {token}",
             "Accept": "application/vnd.github.v3+json",
+            "User-Agent": "llvm-push-pr",
         }
+        self.opener = urllib.request.build_opener(
+            urllib.request.HTTPHandler(), urllib.request.HTTPSHandler()
+        )
 
-    def _request(self, method: str, endpoint: str, **kwargs) -> requests.Response:
-        url = f"{self.BASE_URL}{endpoint}"
-        if self.printer.verbose:
-            self.printer.print(f"API Request: {method.upper()} {url}")
-            if "json" in kwargs:
-                self.printer.print(f"Payload: {kwargs['json']}")
+    def _request(
+        self, method: str, endpoint: str, json_payload: Optional[dict] = None
+    ) -> HTTPResponse:
+        url = f"{GITHUB_API}{endpoint}"
+        self.runner.verbose_print(f"API Request: {method.upper()} {url}")
+        if json_payload:
+            self.runner.verbose_print(f"Payload: {json_payload}")
+
+        data = None
+        headers = self.headers.copy()
+        if json_payload:
+            data = json.dumps(json_payload).encode("utf-8")
+            headers["Content-Type"] = "application/json"
+
+        req = urllib.request.Request(
+            url, data=data, headers=headers, method=method.upper()
+        )
 
         try:
-            response = requests.request(
-                method, url, headers=self.headers, timeout=30, **kwargs
-            )
-            response.raise_for_status()
-            return response
-        except requests.exceptions.RequestException as e:
-            self.printer.print(
+            return self.opener.open(req, timeout=REQUEST_TIMEOUT)
+        except urllib.error.HTTPError as e:
+            self.runner.print(
                 f"Error making API request to {url}: {e}", file=sys.stderr
             )
-            if e.response is not None:
-                self.printer.print(f"Response: {e.response.text}", file=sys.stderr)
-            raise
+            self.runner.verbose_print(
+                f"Error response body: {e.read().decode()}", file=sys.stderr
+            )
+            raise e
+
+    def _request_and_parse_json(
+        self, method: str, endpoint: str, json_payload: Optional[dict] = None
+    ) -> dict:
+        with self._request(method, endpoint, json_payload) as response:
+            # expect a 200 'OK' or 201 'Created' status on success and JSON body.
+            self._log_unexpected_status([200, 201], response.status)
+
+            response_text = response.read().decode("utf-8")
+            if response_text:
+                return json.loads(response_text)
+            return {}
+
+    def _request_no_content(
+        self, method: str, endpoint: str, json_payload: Optional[dict] = None
+    ) -> None:
+        with self._request(method, endpoint, json_payload) as response:
+            # expected a 204 No Content status on success,
+            # indicating the operation was successful but there is no body.
+            self._log_unexpected_status([204], response.status)
+
+    def _log_unexpected_status(
+        self, expected_statuses: List[int], actual_status: int
+    ) -> None:
+        if actual_status not in expected_statuses:
+            self.runner.print(
+                f"Warning: Expected status {', '.join(map(str, expected_statuses))}, but got {actual_status}",
+                file=sys.stderr,
+            )
 
     def get_user_login(self) -> str:
-        """Gets the current user's login name."""
-        response = self._request("get", "/user")
-        return response.json()["login"]
+        return self._request_and_parse_json("GET", "/user")["login"]
 
     def create_pr(
         self,
@@ -120,8 +186,7 @@ def create_pr(
         body: str,
         draft: bool,
     ) -> Optional[str]:
-        """Creates a GitHub Pull Request."""
-        self.printer.print(f"Creating pull request for '{head_branch}'...")
+        self.runner.print(f"Creating pull request for '{head_branch}'...")
         data = {
             "title": title,
             "body": body,
@@ -129,148 +194,124 @@ def create_pr(
             "base": base_branch,
             "draft": draft,
         }
-        response = self._request("post", f"/repos/{self.repo_slug}/pulls", json=data)
-        pr_url = response.json().get("html_url")
-        if not self.printer.dry_run:
-            self.printer.print(f"Pull request created: {pr_url}")
+        response_data = self._request_and_parse_json(
+            "POST", f"/repos/{LLVM_REPO}/pulls", json_payload=data
+        )
+        pr_url = response_data.get("html_url")
+        if not self.runner.dry_run:
+            self.runner.print(f"Pull request created: {pr_url}")
         return pr_url
 
     def get_repo_settings(self) -> dict:
-        """Gets repository settings."""
-        response = self._request("get", f"/repos/{self.repo_slug}")
-        return response.json()
+        return self._request_and_parse_json("GET", f"/repos/{LLVM_REPO}")
+
+    def _get_pr_details(self, pr_number: str) -> dict:
+        """Fetches the JSON details for a given pull request number."""
+        return self._request_and_parse_json(
+            "GET", f"/repos/{LLVM_REPO}/pulls/{pr_number}"
+        )
 
-    def merge_pr(self, pr_url: str):
-        """Merges a PR, retrying if it's not yet mergeable."""
+    def _attempt_squash_merge(self, pr_number: str) -> bool:
+        """Attempts to squash merge a PR, returning True on success."""
+        try:
+            self._request_and_parse_json(
+                "PUT",
+                f"/repos/{LLVM_REPO}/pulls/{pr_number}/merge",
+                json_payload={"merge_method": "squash"},
+            )
+            return True
+        except urllib.error.HTTPError as e:
+            # A 405 status code means the PR is not in a mergeable state.
+            if e.code == 405:
+                return False
+            # Re-raise other HTTP errors.
+            raise e
+
+    def merge_pr(self, pr_url: str) -> Optional[str]:
         if not pr_url:
-            return
+            return None
 
-        if self.printer.dry_run:
-            self.printer.print(f"[Dry Run] Would merge {pr_url}")
-            return
+        if self.runner.dry_run:
+            self.runner.print(f"[Dry Run] Would merge {pr_url}")
+            return None
 
         pr_number_match = re.search(r"/pull/(\d+)", pr_url)
         if not pr_number_match:
-            self.printer.print(
-                f"Could not extract PR number from URL: {pr_url}",
-                file=sys.stderr,
-            )
-            sys.exit(1)
+            raise LlvmPrError(f"Could not extract PR number from URL: {pr_url}")
         pr_number = pr_number_match.group(1)
 
-        head_branch = ""
-        max_retries = 10
-        retry_delay = 5  # seconds
-        for i in range(max_retries):
-            self.printer.print(
-                f"Attempting to merge {pr_url} (attempt {i+1}/{max_retries})..."
+        for i in range(MERGE_MAX_RETRIES):
+            self.runner.print(
+                f"Attempting to merge {pr_url} (attempt {i + 1}/{MERGE_MAX_RETRIES})..."
             )
 
-            pr_data_response = self._request(
-                "get", f"/repos/{self.repo_slug}/pulls/{pr_number}"
-            )
-            pr_data = pr_data_response.json()
+            pr_data = self._get_pr_details(pr_number)
             head_branch = pr_data["head"]["ref"]
 
-            if pr_data["mergeable"]:
-                merge_data = {
-                    "commit_title": f"{pr_data['title']} (#{pr_number})",
-                    "merge_method": "squash",
-                }
-                try:
-                    self._request(
-                        "put",
-                        f"/repos/{self.repo_slug}/pulls/{pr_number}/merge",
-                        json=merge_data,
-                    )
-                    self.printer.print("Successfully merged.")
-                    time.sleep(2)
+            if pr_data.get("mergeable_state") == "dirty":
+                raise LlvmPrError("Merge conflict.")
+
+            if pr_data.get("mergeable"):
+                if self._attempt_squash_merge(pr_number):
+                    self.runner.print("Successfully merged.")
+                    time.sleep(2)  # Allow GitHub's API to update.
                     return head_branch
-                except requests.exceptions.RequestException as e:
-                    if e.response and e.response.status_code == 405:
-                        self.printer.print(
-                            "PR not mergeable yet. Retrying in "
-                            f"{retry_delay} seconds..."
-                        )
-                        time.sleep(retry_delay)
-                    else:
-                        raise e
-            elif pr_data["mergeable_state"] == "dirty":
-                self.printer.print("Error: Merge conflict.", file=sys.stderr)
-                sys.exit(1)
-            else:
-                self.printer.print(
-                    f"PR not mergeable yet ({pr_data['mergeable_state']}). "
-                    f"Retrying in {retry_delay} seconds..."
-                )
-                time.sleep(retry_delay)
 
-        self.printer.print(
-            f"Error: PR was not mergeable after {max_retries} attempts.",
-            file=sys.stderr,
+            self.runner.print(
+                f"PR not mergeable yet (state: {pr_data.get('mergeable_state', 'unknown')}). Retrying in {MERGE_RETRY_DELAY} seconds..."
+            )
+            time.sleep(MERGE_RETRY_DELAY)
+
+        raise LlvmPrError(
+            f"PR was not mergeable after {MERGE_MAX_RETRIES} attempts."
         )
-        sys.exit(1)
 
-    def enable_auto_merge(self, pr_url: str):
-        """Enables auto-merge for a pull request."""
+    def enable_auto_merge(self, pr_url: str) -> None:
         if not pr_url:
             return
 
-        if self.printer.dry_run:
-            self.printer.print(f"[Dry Run] Would enable auto-merge for {pr_url}")
+        if self.runner.dry_run:
+            self.runner.print(f"[Dry Run] Would enable auto-merge for {pr_url}")
             return
 
         pr_number_match = re.search(r"/pull/(\d+)", pr_url)
         if not pr_number_match:
-            self.printer.print(
-                f"Could not extract PR number from URL: {pr_url}",
-                file=sys.stderr,
-            )
-            sys.exit(1)
+            raise LlvmPrError(f"Could not extract PR number from URL: {pr_url}")
         pr_number = pr_number_match.group(1)
 
-        self.printer.print(f"Enabling auto-merge for {pr_url}...")
+        self.runner.print(f"Enabling auto-merge for {pr_url}...")
         data = {
             "enabled": True,
             "merge_method": "squash",
         }
-        self._request(
-            "put",
-            f"/repos/{self.repo_slug}/pulls/{pr_number}/auto-merge",
-            json=data,
+        self._request_no_content(
+            "PUT",
+            f"/repos/{LLVM_REPO}/pulls/{pr_number}/auto-merge",
+            json_payload=data,
         )
-        self.printer.print("Auto-merge enabled.")
+        self.runner.print("Auto-merge enabled.")
 
-    def delete_branch(self, branch_name: str, default_branch: Optional[str] = None):
-        """Deletes a remote branch."""
+    def delete_branch(
+        self, branch_name: str, default_branch: Optional[str] = None
+    ) -> None:
         if default_branch and branch_name == default_branch:
-            self.printer.print(
+            self.runner.print(
                 f"Error: Refusing to delete the default branch '{branch_name}'.",
                 file=sys.stderr,
             )
             return
-        self.printer.print(f"Deleting remote branch '{branch_name}'")
         try:
-            self._request(
-                "delete", f"/repos/{self.repo_slug}/git/refs/heads/{branch_name}"
-            )
-        except requests.exceptions.RequestException as e:
-            if (
-                e.response is not None
-                and e.response.status_code == 422
-                and "Reference does not exist" in e.response.text
-            ):
-                if self.printer.verbose:
-                    self.printer.print(
-                        f"Warning: Remote branch '{branch_name}' was already deleted, skipping deletion.",
-                        file=sys.stderr,
-                    )
-                return
-            self.printer.print(
-                f"Could not delete remote branch '{branch_name}': {e}",
-                file=sys.stderr,
+            self._request_no_content(
+                "DELETE", f"/repos/{LLVM_REPO}/git/refs/heads/{branch_name}"
             )
-            raise
+        except urllib.error.HTTPError as e:
+            if e.code == 422:
+                self.runner.print(
+                    f"Warning: Remote branch '{branch_name}' was already deleted, skipping deletion.",
+                    file=sys.stderr,
+                )
+            else:
+                raise e
 
 
 class LLVMPRAutomator:
@@ -278,42 +319,25 @@ class LLVMPRAutomator:
 
     def __init__(
         self,
-        args: argparse.Namespace,
-        printer: Printer,
+        runner: CommandRunner,
         github_api: "GitHubAPI",
+        config: "PRAutomatorConfig",
+        remote: str,
     ):
-        self.args = args
-        self.printer = printer
+        self.runner = runner
         self.github_api = github_api
+        self.config = config
+        self.remote = remote
         self.original_branch: str = ""
-        self.repo_slug: str = ""
         self.created_branches: List[str] = []
         self.repo_settings: dict = {}
 
-    def _run_cmd(self, command: List[str], read_only: bool = False, **kwargs):
-        """Wrapper for run_command that passes the dry_run flag."""
-        return self.printer.run_command(command, read_only=read_only, **kwargs)
-
-    def _get_repo_slug(self) -> str:
-        """Gets the GitHub repository slug from the remote URL."""
-        result = self._run_cmd(
-            ["git", "remote", "get-url", self.args.remote],
-            capture_output=True,
-            text=True,
-            read_only=True,
-        )
-        url = result.stdout.strip()
-        match = re.search(r"github\.com[/:]([\w.-]+/[\w.-]+)", url)
-        if not match:
-            self.printer.print(
-                f"Error: Could not parse repository slug from remote URL: {url}",
-                file=sys.stderr,
-            )
-            sys.exit(1)
-        return match.group(1).replace(".git", "")
+    def _run_cmd(
+        self, command: List[str], read_only: bool = False, **kwargs
+    ) -> subprocess.CompletedProcess:
+        return self.runner.run_command(command, read_only=read_only, **kwargs)
 
     def _get_current_branch(self) -> str:
-        """Gets the current git branch."""
         result = self._run_cmd(
             ["git", "rev-parse", "--abbrev-ref", "HEAD"],
             capture_output=True,
@@ -322,8 +346,7 @@ def _get_current_branch(self) -> str:
         )
         return result.stdout.strip()
 
-    def _check_work_tree_is_clean(self):
-        """Exits if the git work tree has uncommitted or unstaged changes."""
+    def _check_work_tree(self) -> None:
         result = self._run_cmd(
             ["git", "status", "--porcelain"],
             capture_output=True,
@@ -331,33 +354,38 @@ def _check_work_tree_is_clean(self):
             read_only=True,
         )
         if result.stdout.strip():
-            self.printer.print(
-                "Error: Your working tree is dirty. Please stash or commit your changes.",
-                file=sys.stderr,
+            raise LlvmPrError(
+                "Your working tree is dirty. Please stash or commit your changes."
             )
-            sys.exit(1)
 
-    def _rebase_current_branch(self):
-        """Rebases the current branch on top of the upstream base."""
-        self._check_work_tree_is_clean()
+    def _rebase_current_branch(self) -> None:
+        self._check_work_tree()
 
-        target = f"{self.args.upstream_remote}/{self.args.base}"
-        self.printer.print(
-            f"Fetching from '{self.args.upstream_remote}' and rebasing '{self.original_branch}' on top of '{target}'..."
+        target = f"{self.config.upstream_remote}/{self.config.base_branch}"
+        self.runner.print(
+            f"Fetching from '{self.config.upstream_remote}' and rebasing '{self.original_branch}' on top of '{target}'..."
         )
-        self._run_cmd(["git", "fetch", self.args.upstream_remote, self.args.base])
+
+        authenticated_url = self._get_authenticated_remote_url(
+            self.config.upstream_remote
+        )
+        # Use a refspec to explicitly update the local remote-tracking branch (e.g., origin/main)
+        # when fetching from an authenticated URL. This ensures that 'git rebase origin/main'
+        # operates on the most up-to-date remote state.
+        refspec = f"refs/heads/{self.config.base_branch}:refs/remotes/{self.config.upstream_remote}/{self.config.base_branch}"
+        self._run_cmd(["git", "fetch", authenticated_url, refspec])
 
         try:
             self._run_cmd(["git", "rebase", target])
         except subprocess.CalledProcessError as e:
-            self.printer.print(
+            self.runner.print(
                 "Error: The rebase operation failed, likely due to a merge conflict.",
                 file=sys.stderr,
             )
             if e.stdout:
-                self.printer.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
+                self.runner.print(f"--- stdout ---\n{e.stdout}", file=sys.stderr)
             if e.stderr:
-                self.printer.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
+                self.runner.print(f"--- stderr ---\n{e.stderr}", file=sys.stderr)
 
             # Check if rebase is in progress before aborting
             rebase_status_result = self._run_cmd(
@@ -367,16 +395,37 @@ def _rebase_current_branch(self):
                 text=True,
                 read_only=True,
             )
-            if (
-                rebase_status_result.returncode == 0
-            ):  # REBASE_HEAD exists, so rebase is in progress
-                self.printer.print("Aborting rebase...", file=sys.stderr)
+
+            # REBASE_HEAD exists, so rebase is in progress
+            if rebase_status_result.returncode == 0:
+                self.runner.print("Aborting rebase...", file=sys.stderr)
                 self._run_cmd(["git", "rebase", "--abort"], check=False)
-            sys.exit(1)
+            raise LlvmPrError("rebase operation failed.") from e
+
+    def _get_authenticated_remote_url(self, remote_name: str) -> str:
+        """
+        Generates an authenticated URL to use for all operations. This includes
+        for local operations, like rebasing after merging a PR in a stack.
+        This allows the script to avoid reauthenticating (e.g. via ssh), since
+        the token can be reused for all operations.
+        """
+        remote_url_result = self._run_cmd(
+            ["git", "remote", "get-url", remote_name],
+            capture_output=True,
+            text=True,
+            read_only=True,
+        )
+        remote_url = remote_url_result.stdout.strip()
+        if remote_url.startswith("git at github.com:"):
+            return remote_url.replace(
+                "git at github.com:", f"https://{self.config.token}@github.com/"
+            )
+        if remote_url.startswith("https://github.com/"):
+            return remote_url.replace("https://", f"https://{self.config.token}@")
+        raise LlvmPrError(f"Unsupported remote URL format: {remote_url}")
 
     def _get_commit_stack(self) -> List[str]:
-        """Gets the stack of commits between the current branch's HEAD and its merge base with upstream."""
-        target = f"{self.args.upstream_remote}/{self.args.base}"
+        target = f"{self.config.upstream_remote}/{self.config.base_branch}"
         merge_base_result = self._run_cmd(
             ["git", "merge-base", "HEAD", target],
             capture_output=True,
@@ -385,11 +434,7 @@ def _get_commit_stack(self) -> List[str]:
         )
         merge_base = merge_base_result.stdout.strip()
         if not merge_base:
-            self.printer.print(
-                f"Error: Could not find a merge base between HEAD and {target}.",
-                file=sys.stderr,
-            )
-            sys.exit(1)
+            raise LlvmPrError(f"Could not find a merge base between HEAD and {target}.")
 
         result = self._run_cmd(
             ["git", "rev-list", "--reverse", f"{merge_base}..HEAD"],
@@ -397,11 +442,9 @@ def _get_commit_stack(self) -> List[str]:
             text=True,
             read_only=True,
         )
-        commits = result.stdout.strip().split("\n")
-        return [c for c in commits if c]
+        return result.stdout.strip().splitlines()
 
     def _get_commit_details(self, commit_hash: str) -> tuple[str, str]:
-        """Gets the title and body of a commit."""
         result = self._run_cmd(
             ["git", "show", "-s", "--format=%s%n%n%b", commit_hash],
             capture_output=True,
@@ -413,167 +456,155 @@ def _get_commit_details(self, commit_hash: str) -> tuple[str, str]:
         body = parts[1] if len(parts) > 1 else ""
         return title, body
 
-    def _sanitize_for_branch_name(self, text: str) -> str:
-        """Sanitizes a string to be used as a git branch name."""
+    def _sanitize_branch_name(self, text: str) -> str:
         sanitized = re.sub(r"[^\w\s-]", "", text).strip().lower()
         sanitized = re.sub(r"[-\s]+", "-", sanitized)
         # Use "auto-pr" as a fallback.
         return sanitized or "auto-pr"
 
+    def _validate_merge_config(self, num_commits: int) -> None:
+        if num_commits > 1:
+            if self.config.auto_merge:
+                raise LlvmPrError("--auto-merge is only supported for a single commit.")
+
+            if self.config.no_merge:
+                raise LlvmPrError(
+                    "--no-merge is only supported for a single commit. "
+                    "For stacks, the script must merge sequentially."
+                )
+
+        self.runner.print(f"Found {num_commits} commit(s) to process.")
+
     def _create_and_push_branch_for_commit(
         self, commit_hash: str, base_branch_name: str, index: int
     ) -> str:
-        """Creates and pushes a temporary branch pointing to a specific commit."""
-        branch_name = f"{self.args.prefix}{base_branch_name}-{index + 1}"
+        branch_name = f"{self.config.prefix}{base_branch_name}-{index + 1}"
         commit_title, _ = self._get_commit_details(commit_hash)
-        self.printer.print(f"Processing commit {commit_hash[:7]}: {commit_title}")
-        self.printer.print(f"Creating and pushing temporary branch '{branch_name}'")
-
-        self._run_cmd(["git", "branch", "-f", branch_name, commit_hash])
-        push_command = ["git", "push", self.args.remote, branch_name]
+        self.runner.print(f"Processing commit {commit_hash[:7]}: {commit_title}")
+        self.runner.print(f"Pushing commit to temporary branch '{branch_name}'")
+
+        push_url = self._get_authenticated_remote_url(self.remote)
+        push_command = [
+            "git",
+            "push",
+            push_url,
+            f"{commit_hash}:refs/heads/{branch_name}",
+        ]
         self._run_cmd(push_command)
         self.created_branches.append(branch_name)
         return branch_name
 
-    def run(self):
-        """Main entry point for the automator, orchestrates the PR creation and merging process."""
-        self.repo_slug = self._get_repo_slug()
+    def _process_commit(
+        self, commit_hash: str, base_branch_name: str, index: int
+    ) -> None:
+        commit_title, commit_body = self._get_commit_details(commit_hash)
+
+        temp_branch = self._create_and_push_branch_for_commit(
+            commit_hash, base_branch_name, index
+        )
+        pr_url = self.github_api.create_pr(
+            head_branch=f"{self.config.user_login}:{temp_branch}",
+            base_branch=self.config.base_branch,
+            title=commit_title,
+            body=commit_body,
+            draft=self.config.draft,
+        )
+
+        if not self.config.no_merge:
+            if self.config.auto_merge:
+                self.github_api.enable_auto_merge(pr_url)
+            else:
+                merged_branch = self.github_api.merge_pr(pr_url)
+                if merged_branch and not self.repo_settings.get(
+                    "delete_branch_on_merge"
+                ):
+                    # After a merge, the temporary branch should be deleted from
+                    # the user's fork.
+                    delete_url = self._get_authenticated_remote_url(self.remote)
+                    self._run_cmd(
+                        ["git", "push", delete_url, "--delete", merged_branch],
+                        check=False,
+                    )
+            if temp_branch in self.created_branches:
+                # If the branch was successfully merged, it should not be deleted
+                # again during cleanup.
+                self.created_branches.remove(temp_branch)
+
+    def run(self) -> None:
         self.repo_settings = self.github_api.get_repo_settings()
         self.original_branch = self._get_current_branch()
-        self.printer.print(f"On branch: {self.original_branch}")
+        self.runner.print(f"On branch: {self.original_branch}")
 
         try:
+            # Rebase on top of the upstream to make sure we have an accurate
+            # stack of commits. If we don't, running the script again (e.g.,
+            # due to partially landing a stack may make a PR against the
+            # upstream that is empty.
             self._rebase_current_branch()
-            initial_commits = self._get_commit_stack()
+            commits = self._get_commit_stack()
 
-            if not initial_commits:
-                self.printer.print("No new commits to process.")
+            if not commits:
+                self.runner.print("No new commits to process.")
                 return
 
-            if self.args.auto_merge and len(initial_commits) > 1:
-                self.printer.print(
-                    "Error: --auto-merge is only supported for a single commit.",
-                    file=sys.stderr,
-                )
-                sys.exit(1)
-
-            if self.args.no_merge and len(initial_commits) > 1:
-                self.printer.print(
-                    "Error: --no-merge is only supported for a single commit. "
-                    "For stacks, the script must merge sequentially.",
-                    file=sys.stderr,
-                )
-                sys.exit(1)
-
-            self.printer.print(f"Found {len(initial_commits)} commit(s) to process.")
+            self._validate_merge_config(len(commits))
             branch_base_name = self.original_branch
-            if self.original_branch in ["main", "master"]:
-                first_commit_title, _ = self._get_commit_details(initial_commits[0])
-                branch_base_name = self._sanitize_for_branch_name(first_commit_title)
+            if self.original_branch == "main":
+                first_commit_title, _ = self._get_commit_details(commits[0])
+                branch_base_name = self._sanitize_branch_name(first_commit_title)
 
-            for i in range(len(initial_commits)):
+            for i in range(len(commits)):
                 if i > 0:
                     self._rebase_current_branch()
 
+                # After a rebase, the commit hashes change, so we need to get
+                # the latest commit stack.
                 commits = self._get_commit_stack()
                 if not commits:
-                    self.printer.print("Success! All commits have been landed.")
+                    self.runner.print("Success! All commits have been landed.")
                     break
-
-                commit_to_process = commits[0]
-                commit_title, commit_body = self._get_commit_details(commit_to_process)
-
-                temp_branch = self._create_and_push_branch_for_commit(
-                    commit_to_process, branch_base_name, i
-                )
-                pr_url = self.github_api.create_pr(
-                    head_branch=temp_branch,
-                    base_branch=self.args.base,
-                    title=commit_title,
-                    body=commit_body,
-                    draft=self.args.draft,
-                )
-
-                if not self.args.no_merge:
-                    if self.args.auto_merge:
-                        self.github_api.enable_auto_merge(pr_url)
-                    else:
-                        merged_branch = self.github_api.merge_pr(pr_url)
-                        if merged_branch and not self.repo_settings.get(
-                            "delete_branch_on_merge"
-                        ):
-                            self.github_api.delete_branch(
-                                merged_branch, self.repo_settings.get("default_branch")
-                            )
-
-                    if temp_branch in self.created_branches:
-                        self.created_branches.remove(temp_branch)
+                self._process_commit(commits[0], branch_base_name, i)
 
         finally:
             self._cleanup()
 
-    def _cleanup(self):
-        """Cleans up by returning to the original branch and deleting all temporary branches."""
-        self.printer.print(f"Returning to original branch: {self.original_branch}")
+    def _cleanup(self) -> None:
+        self.runner.print(f"Returning to original branch: {self.original_branch}")
         self._run_cmd(["git", "checkout", self.original_branch], capture_output=True)
         if self.created_branches:
-            self.printer.print("Cleaning up temporary local branches...")
-            self._run_cmd(["git", "branch", "-D"] + self.created_branches)
-            self.printer.print("Cleaning up temporary remote branches...")
+            self.runner.print("Cleaning up temporary remote branches...")
+            delete_url = self._get_authenticated_remote_url(self.remote)
             self._run_cmd(
-                ["git", "push", self.args.remote, "--delete"] + self.created_branches,
+                ["git", "push", delete_url, "--delete"] + self.created_branches,
                 check=False,
             )
 
 
-def check_prerequisites(printer: Printer):
-    """Checks if git is installed and if inside a git repository."""
-    printer.print("Checking prerequisites...")
-    printer.run_command(["git", "--version"], capture_output=True, read_only=True)
-    if not os.getenv("GITHUB_TOKEN"):
-        printer.print(
-            "Error: GITHUB_TOKEN environment variable not set.", file=sys.stderr
-        )
-        sys.exit(1)
-
-    result = printer.run_command(
+def check_prerequisites(runner: CommandRunner) -> None:
+    runner.print("Checking prerequisites...")
+    runner.run_command(["git", "--version"], capture_output=True, read_only=True)
+    result = runner.run_command(
         ["git", "rev-parse", "--is-inside-work-tree"],
         check=False,
         capture_output=True,
         text=True,
         read_only=True,
     )
+
     if result.returncode != 0 or result.stdout.strip() != "true":
-        printer.print(
-            "Error: This script must be run inside a git repository.", file=sys.stderr
-        )
-        sys.exit(1)
-    printer.print("Prerequisites met.")
+        raise LlvmPrError("This script must be run inside a git repository.")
+    runner.print("Prerequisites met.")
 
 
-def main():
-    """main entry point"""
+def main() -> None:
     parser = argparse.ArgumentParser(
         description="Create and land a stack of Pull Requests."
     )
-    GITHUB_REMOTE_NAME = "origin"
-    UPSTREAM_REMOTE_NAME = "upstream"
-    BASE_BRANCH = "main"
-
-    printer = Printer()
-    token = os.getenv("GITHUB_TOKEN")
-    default_prefix = "dev/"
-    if token:
-        # Create a temporary API client to get the user login
-        # We don't know the repo slug yet, so pass a dummy value.
-        temp_api = GitHubAPI("", printer, token)
-        try:
-            user_login = temp_api.get_user_login()
-            default_prefix = f"{user_login}/"
-        except requests.exceptions.RequestException as e:
-            printer.print(
-                f"Could not fetch user login from GitHub: {e}", file=sys.stderr
-            )
+
+    command_runner = CommandRunner()
+    token = os.getenv(LLVM_GITHUB_TOKEN_VAR)
+    if not token:
+        raise LlvmPrError(f"{LLVM_GITHUB_TOKEN_VAR} environment variable not set.")
 
     parser.add_argument(
         "--base",
@@ -590,18 +621,24 @@ def main():
         default=UPSTREAM_REMOTE_NAME,
         help=f"Remote for the upstream repository (default: {UPSTREAM_REMOTE_NAME})",
     )
+    parser.add_argument(
+        "--login",
+        default=None,
+        help="The user login to use. If not provided this will be queried from the TOKEN",
+    )
     parser.add_argument(
         "--prefix",
-        default=default_prefix,
-        help=f"Prefix for temporary branches (default: {default_prefix})",
+        default=None,
+        help="Prefix for temporary branches (default: users/<username>)",
     )
     parser.add_argument(
         "--draft", action="store_true", help="Create pull requests as drafts."
     )
-    parser.add_argument(
+    group = parser.add_mutually_exclusive_group()
+    group.add_argument(
         "--no-merge", action="store_true", help="Create PRs but do not merge them."
     )
-    parser.add_argument(
+    group.add_argument(
         "--auto-merge",
         action="store_true",
         help="Enable auto-merge for each PR instead of attempting to merge immediately.",
@@ -621,32 +658,48 @@ def main():
     )
 
     args = parser.parse_args()
-    if args.prefix and not args.prefix.endswith("/"):
-        args.prefix += "/"
-
-    printer = Printer(dry_run=args.dry_run, verbose=args.verbose, quiet=args.quiet)
-    check_prerequisites(printer)
 
-    # Get repo slug from git remote url
-    result = printer.run_command(
-        ["git", "remote", "get-url", args.remote],
-        capture_output=True,
-        text=True,
-        read_only=True,
+    command_runner = CommandRunner(
+        dry_run=args.dry_run, verbose=args.verbose, quiet=args.quiet
     )
-    url = result.stdout.strip()
-    match = re.search(r"github\.com[/:]([\w.-]+/[\w.-]+)", url)
-    if not match:
-        printer.print(
-            f"Error: Could not parse repository slug from remote URL: {url}",
-            file=sys.stderr,
-        )
-        sys.exit(1)
-    repo_slug = match.group(1).replace(".git", "")
+    check_prerequisites(command_runner)
 
-    github_api = GitHubAPI(repo_slug, printer, token)
-    automator = LLVMPRAutomator(args, printer, github_api)
-    automator.run()
+    github_api = GitHubAPI(command_runner, token)
+    if not args.login:
+        # Create a temporary API client to get the user login.
+        # We need the user login for the branch prefix and for creating PRs
+        # from a fork.
+        try:
+            args.login = github_api.get_user_login()
+        except urllib.error.HTTPError as e:
+            raise LlvmPrError(f"Could not fetch user login from GitHub: {e}") from e
+
+    if not args.prefix:
+        args.prefix = f"users/{args.login}/"
+
+    if not args.prefix.endswith("/"):
+        args.prefix += "/"
+
+    try:
+        config = PRAutomatorConfig(
+            user_login=args.login,
+            token=token,
+            base_branch=args.base,
+            upstream_remote=args.upstream_remote,
+            prefix=args.prefix,
+            draft=args.draft,
+            no_merge=args.no_merge,
+            auto_merge=args.auto_merge,
+        )
+        automator = LLVMPRAutomator(
+            runner=command_runner,
+            github_api=github_api,
+            config=config,
+            remote=args.remote,
+        )
+        automator.run()
+    except LlvmPrError as e:
+        sys.exit(f"Error: {e}")
 
 
 if __name__ == "__main__":

>From 5ba2c5f052a55818b1098d5c8021ca92ae84d8fd Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Thu, 13 Nov 2025 18:41:23 -0800
Subject: [PATCH 3/9] Fix formatting

---
 llvm/utils/llvm_push_pr.py | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index 15c663318050a..a915064041c9e 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -262,9 +262,7 @@ def merge_pr(self, pr_url: str) -> Optional[str]:
             )
             time.sleep(MERGE_RETRY_DELAY)
 
-        raise LlvmPrError(
-            f"PR was not mergeable after {MERGE_MAX_RETRIES} attempts."
-        )
+        raise LlvmPrError(f"PR was not mergeable after {MERGE_MAX_RETRIES} attempts.")
 
     def enable_auto_merge(self, pr_url: str) -> None:
         if not pr_url:

>From 844b964c11e7a8a1984863a121f304debad27dbf Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Fri, 14 Nov 2025 11:09:18 -0800
Subject: [PATCH 4/9] Use API for branch deletion

---
 llvm/utils/llvm_push_pr.py | 13 +++----------
 1 file changed, 3 insertions(+), 10 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index a915064041c9e..4e3bf7fe32405 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -518,11 +518,7 @@ def _process_commit(
                 ):
                     # After a merge, the temporary branch should be deleted from
                     # the user's fork.
-                    delete_url = self._get_authenticated_remote_url(self.remote)
-                    self._run_cmd(
-                        ["git", "push", delete_url, "--delete", merged_branch],
-                        check=False,
-                    )
+                    self.github_api.delete_branch(merged_branch)
             if temp_branch in self.created_branches:
                 # If the branch was successfully merged, it should not be deleted
                 # again during cleanup.
@@ -571,11 +567,8 @@ def _cleanup(self) -> None:
         self._run_cmd(["git", "checkout", self.original_branch], capture_output=True)
         if self.created_branches:
             self.runner.print("Cleaning up temporary remote branches...")
-            delete_url = self._get_authenticated_remote_url(self.remote)
-            self._run_cmd(
-                ["git", "push", delete_url, "--delete"] + self.created_branches,
-                check=False,
-            )
+            for branch in self.created_branches:
+                self.github_api.delete_branch(branch)
 
 
 def check_prerequisites(runner: CommandRunner) -> None:

>From d2557048ffe9ae3a7e36eace483622b2d73a28b0 Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Fri, 14 Nov 2025 13:33:23 -0800
Subject: [PATCH 5/9] Use askpass over embeddign the token in the URL

---
 llvm/utils/llvm_push_pr.py | 35 +++++++++++++++++++++++------------
 1 file changed, 23 insertions(+), 12 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index 4e3bf7fe32405..6d4ddeb5a8fbf 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -75,6 +75,7 @@ def run_command(
         text: bool = False,
         stdin_input: Optional[str] = None,
         read_only: bool = False,
+        env: Optional[dict] = None,
     ) -> subprocess.CompletedProcess:
         if self.dry_run and not read_only:
             self.print(f"[Dry Run] Would run: {' '.join(command)}")
@@ -89,6 +90,7 @@ def run_command(
                 capture_output=capture_output,
                 text=text,
                 input=stdin_input,
+                env=env,
             )
         except FileNotFoundError as e:
             raise LlvmPrError(
@@ -329,6 +331,9 @@ def __init__(
         self.original_branch: str = ""
         self.created_branches: List[str] = []
         self.repo_settings: dict = {}
+        self._git_askpass_cmd = (
+            f"python3 -c \"import os; print(os.environ['{LLVM_GITHUB_TOKEN_VAR}'])\""
+        )
 
     def _run_cmd(
         self, command: List[str], read_only: bool = False, **kwargs
@@ -364,17 +369,18 @@ def _rebase_current_branch(self) -> None:
             f"Fetching from '{self.config.upstream_remote}' and rebasing '{self.original_branch}' on top of '{target}'..."
         )
 
-        authenticated_url = self._get_authenticated_remote_url(
-            self.config.upstream_remote
+        git_env = os.environ.copy()
+        git_env["GIT_ASKPASS"] = self._git_askpass_cmd
+        git_env[LLVM_GITHUB_TOKEN_VAR] = self.config.token
+        git_env["GIT_TERMINAL_PROMPT"] = "0"
+
+        self._run_cmd(
+            ["git", "fetch", self.config.upstream_remote, self.config.base_branch],
+            env=git_env,
         )
-        # Use a refspec to explicitly update the local remote-tracking branch (e.g., origin/main)
-        # when fetching from an authenticated URL. This ensures that 'git rebase origin/main'
-        # operates on the most up-to-date remote state.
-        refspec = f"refs/heads/{self.config.base_branch}:refs/remotes/{self.config.upstream_remote}/{self.config.base_branch}"
-        self._run_cmd(["git", "fetch", authenticated_url, refspec])
 
         try:
-            self._run_cmd(["git", "rebase", target])
+            self._run_cmd(["git", "rebase", target], env=git_env)
         except subprocess.CalledProcessError as e:
             self.runner.print(
                 "Error: The rebase operation failed, likely due to a merge conflict.",
@@ -392,12 +398,13 @@ def _rebase_current_branch(self) -> None:
                 capture_output=True,
                 text=True,
                 read_only=True,
+                env=git_env,
             )
 
             # REBASE_HEAD exists, so rebase is in progress
             if rebase_status_result.returncode == 0:
                 self.runner.print("Aborting rebase...", file=sys.stderr)
-                self._run_cmd(["git", "rebase", "--abort"], check=False)
+                self._run_cmd(["git", "rebase", "--abort"], check=False, env=git_env)
             raise LlvmPrError("rebase operation failed.") from e
 
     def _get_authenticated_remote_url(self, remote_name: str) -> str:
@@ -481,14 +488,18 @@ def _create_and_push_branch_for_commit(
         self.runner.print(f"Processing commit {commit_hash[:7]}: {commit_title}")
         self.runner.print(f"Pushing commit to temporary branch '{branch_name}'")
 
-        push_url = self._get_authenticated_remote_url(self.remote)
+        git_env = os.environ.copy()
+        git_env["GIT_ASKPASS"] = self._git_askpass_cmd
+        git_env[LLVM_GITHUB_TOKEN_VAR] = self.config.token
+        git_env["GIT_TERMINAL_PROMPT"] = "0"
+
         push_command = [
             "git",
             "push",
-            push_url,
+            self.remote,
             f"{commit_hash}:refs/heads/{branch_name}",
         ]
-        self._run_cmd(push_command)
+        self._run_cmd(push_command, env=git_env)
         self.created_branches.append(branch_name)
         return branch_name
 

>From 25b6fc92271dc24875cf517389776fc7d393aeb4 Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Fri, 14 Nov 2025 15:57:39 -0800
Subject: [PATCH 6/9] Fix git remote operations no longer using the token

Also fix up some comments
---
 llvm/utils/llvm_push_pr.py | 75 ++++++++++++++++++--------------------
 1 file changed, 35 insertions(+), 40 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index 6d4ddeb5a8fbf..c0aade37953a7 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -152,7 +152,7 @@ def _request_and_parse_json(
         self, method: str, endpoint: str, json_payload: Optional[dict] = None
     ) -> dict:
         with self._request(method, endpoint, json_payload) as response:
-            # expect a 200 'OK' or 201 'Created' status on success and JSON body.
+            # Expect a 200 'OK' or 201 'Created' status on success and JSON body.
             self._log_unexpected_status([200, 201], response.status)
 
             response_text = response.read().decode("utf-8")
@@ -164,8 +164,8 @@ def _request_no_content(
         self, method: str, endpoint: str, json_payload: Optional[dict] = None
     ) -> None:
         with self._request(method, endpoint, json_payload) as response:
-            # expected a 204 No Content status on success,
-            # indicating the operation was successful but there is no body.
+            # Expected a 204 No Content status on success, indicating the
+            # operation was successful but there is no body.
             self._log_unexpected_status([204], response.status)
 
     def _log_unexpected_status(
@@ -374,10 +374,9 @@ def _rebase_current_branch(self) -> None:
         git_env[LLVM_GITHUB_TOKEN_VAR] = self.config.token
         git_env["GIT_TERMINAL_PROMPT"] = "0"
 
-        self._run_cmd(
-            ["git", "fetch", self.config.upstream_remote, self.config.base_branch],
-            env=git_env,
-        )
+        https_upstream_url = self._get_https_url_for_remote(self.config.upstream_remote)
+        refspec = f"refs/heads/{self.config.base_branch}:refs/remotes/{self.config.upstream_remote}/{self.config.base_branch}"
+        self._run_cmd(["git", "fetch", https_upstream_url, refspec], env=git_env)
 
         try:
             self._run_cmd(["git", "rebase", target], env=git_env)
@@ -407,13 +406,8 @@ def _rebase_current_branch(self) -> None:
                 self._run_cmd(["git", "rebase", "--abort"], check=False, env=git_env)
             raise LlvmPrError("rebase operation failed.") from e
 
-    def _get_authenticated_remote_url(self, remote_name: str) -> str:
-        """
-        Generates an authenticated URL to use for all operations. This includes
-        for local operations, like rebasing after merging a PR in a stack.
-        This allows the script to avoid reauthenticating (e.g. via ssh), since
-        the token can be reused for all operations.
-        """
+    def _get_https_url_for_remote(self, remote_name: str) -> str:
+        """Gets the URL for a remote and converts it to HTTPS if necessary."""
         remote_url_result = self._run_cmd(
             ["git", "remote", "get-url", remote_name],
             capture_output=True,
@@ -422,12 +416,12 @@ def _get_authenticated_remote_url(self, remote_name: str) -> str:
         )
         remote_url = remote_url_result.stdout.strip()
         if remote_url.startswith("git at github.com:"):
-            return remote_url.replace(
-                "git at github.com:", f"https://{self.config.token}@github.com/"
-            )
+            return remote_url.replace("git at github.com:", "https://github.com/")
         if remote_url.startswith("https://github.com/"):
-            return remote_url.replace("https://", f"https://{self.config.token}@")
-        raise LlvmPrError(f"Unsupported remote URL format: {remote_url}")
+            return remote_url
+        raise LlvmPrError(
+            f"Unsupported remote URL format for {remote_name}: {remote_url}"
+        )
 
     def _get_commit_stack(self) -> List[str]:
         target = f"{self.config.upstream_remote}/{self.config.base_branch}"
@@ -450,6 +444,8 @@ def _get_commit_stack(self) -> List[str]:
         return result.stdout.strip().splitlines()
 
     def _get_commit_details(self, commit_hash: str) -> tuple[str, str]:
+        # Get the subject and body from git show. Insert "\n\n" between to make
+        # parsing simple to do w/ split.
         result = self._run_cmd(
             ["git", "show", "-s", "--format=%s%n%n%b", commit_hash],
             capture_output=True,
@@ -493,10 +489,12 @@ def _create_and_push_branch_for_commit(
         git_env[LLVM_GITHUB_TOKEN_VAR] = self.config.token
         git_env["GIT_TERMINAL_PROMPT"] = "0"
 
+        https_remote_url = self._get_https_url_for_remote(self.remote)
+
         push_command = [
             "git",
             "push",
-            self.remote,
+            https_remote_url,
             f"{commit_hash}:refs/heads/{branch_name}",
         ]
         self._run_cmd(push_command, env=git_env)
@@ -519,21 +517,21 @@ def _process_commit(
             draft=self.config.draft,
         )
 
-        if not self.config.no_merge:
-            if self.config.auto_merge:
-                self.github_api.enable_auto_merge(pr_url)
-            else:
-                merged_branch = self.github_api.merge_pr(pr_url)
-                if merged_branch and not self.repo_settings.get(
-                    "delete_branch_on_merge"
-                ):
-                    # After a merge, the temporary branch should be deleted from
-                    # the user's fork.
-                    self.github_api.delete_branch(merged_branch)
-            if temp_branch in self.created_branches:
-                # If the branch was successfully merged, it should not be deleted
-                # again during cleanup.
-                self.created_branches.remove(temp_branch)
+        if self.config.no_merge:
+            return
+
+        if self.config.auto_merge:
+            self.github_api.enable_auto_merge(pr_url)
+        else:
+            merged_branch = self.github_api.merge_pr(pr_url)
+            if merged_branch and not self.repo_settings.get("delete_branch_on_merge"):
+                # After a merge, the branch should be deleted.
+                self.github_api.delete_branch(merged_branch)
+
+        if temp_branch in self.created_branches:
+            # If the branch was successfully merged, it should not be deleted
+            # again during cleanup.
+            self.created_branches.remove(temp_branch)
 
     def run(self) -> None:
         self.repo_settings = self.github_api.get_repo_settings()
@@ -562,8 +560,8 @@ def run(self) -> None:
                 if i > 0:
                     self._rebase_current_branch()
 
-                # After a rebase, the commit hashes change, so we need to get
-                # the latest commit stack.
+                # After a rebase, the commit hashes can change, so we need to
+                # get the latest commit stack.
                 commits = self._get_commit_stack()
                 if not commits:
                     self.runner.print("Success! All commits have been landed.")
@@ -668,9 +666,6 @@ def main() -> None:
 
     github_api = GitHubAPI(command_runner, token)
     if not args.login:
-        # Create a temporary API client to get the user login.
-        # We need the user login for the branch prefix and for creating PRs
-        # from a fork.
         try:
             args.login = github_api.get_user_login()
         except urllib.error.HTTPError as e:

>From 9afca9b00a750ba44f4589eaeef267c06f8f0a8d Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Fri, 14 Nov 2025 16:28:19 -0800
Subject: [PATCH 7/9] Simplify rebae logic to keep base branch up to date

---
 llvm/utils/llvm_push_pr.py | 10 ++++------
 1 file changed, 4 insertions(+), 6 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index c0aade37953a7..9e43227177a5b 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -557,16 +557,14 @@ def run(self) -> None:
                 branch_base_name = self._sanitize_branch_name(first_commit_title)
 
             for i in range(len(commits)):
-                if i > 0:
-                    self._rebase_current_branch()
-
-                # After a rebase, the commit hashes can change, so we need to
-                # get the latest commit stack.
-                commits = self._get_commit_stack()
                 if not commits:
                     self.runner.print("Success! All commits have been landed.")
                     break
                 self._process_commit(commits[0], branch_base_name, i)
+                self._rebase_current_branch()
+                # After a rebase, the commit hashes can change, so we need to
+                # get the latest commit stack.
+                commits = self._get_commit_stack()
 
         finally:
             self._cleanup()

>From 420e37903eb9eddaec6a5a0a8715bab34f1877e5 Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Tue, 18 Nov 2025 11:03:05 -0800
Subject: [PATCH 8/9] Remove initial rebase

This isn't needed anymore, since we reliably rebase after every PR lands.
---
 llvm/utils/llvm_push_pr.py | 6 ------
 1 file changed, 6 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index 9e43227177a5b..cc2e9c8cc403e 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -539,13 +539,7 @@ def run(self) -> None:
         self.runner.print(f"On branch: {self.original_branch}")
 
         try:
-            # Rebase on top of the upstream to make sure we have an accurate
-            # stack of commits. If we don't, running the script again (e.g.,
-            # due to partially landing a stack may make a PR against the
-            # upstream that is empty.
-            self._rebase_current_branch()
             commits = self._get_commit_stack()
-
             if not commits:
                 self.runner.print("No new commits to process.")
                 return

>From 78292640369e3992ad4078322d86fd67e3fb4cc7 Mon Sep 17 00:00:00 2001
From: Paul Kirth <paulkirth at google.com>
Date: Tue, 18 Nov 2025 11:03:45 -0800
Subject: [PATCH 9/9] Prefer %B over existing pattern

This was less trouble to adapt than I thought.
---
 llvm/utils/llvm_push_pr.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/llvm/utils/llvm_push_pr.py b/llvm/utils/llvm_push_pr.py
index cc2e9c8cc403e..3bae256300840 100644
--- a/llvm/utils/llvm_push_pr.py
+++ b/llvm/utils/llvm_push_pr.py
@@ -447,12 +447,12 @@ def _get_commit_details(self, commit_hash: str) -> tuple[str, str]:
         # Get the subject and body from git show. Insert "\n\n" between to make
         # parsing simple to do w/ split.
         result = self._run_cmd(
-            ["git", "show", "-s", "--format=%s%n%n%b", commit_hash],
+            ["git", "show", "-s", "--format=%B", commit_hash],
             capture_output=True,
             text=True,
             read_only=True,
         )
-        parts = result.stdout.strip().split("\n\n", 1)
+        parts = [item.strip() for item in result.stdout.split("\n", 1)]
         title = parts[0]
         body = parts[1] if len(parts) > 1 else ""
         return title, body



More information about the llvm-commits mailing list