[llvm] [CI] Rework github workflow processing (PR #130317)

Nathan Gauër via llvm-commits llvm-commits at lists.llvm.org
Fri Mar 7 10:07:02 PST 2025


https://github.com/Keenuts updated https://github.com/llvm/llvm-project/pull/130317

>From 56892b31bfd95378dfbc0d9eb9ed46e7823c1cf8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Fri, 7 Mar 2025 18:57:49 +0100
Subject: [PATCH 1/2] [CI] Rework github workflow processing
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Before this patch, the job/workflow name impacted the metric name,
meaning a change in the workflow definition could break monitoring.
This patch adds a map to get a stable name on metrics from a workflow
name.

In addition, it reworks a bit how we track the last processed workflow
to simplify the behavior, and work around an API issue which returns
bogus results if a filter is used.

This PR is a first step to bring buildkite metrics monitoring.

Signed-off-by: Nathan Gauër <brioche at google.com>
---
 .ci/metrics/metrics.py | 229 ++++++++++++++++++++---------------------
 1 file changed, 114 insertions(+), 115 deletions(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index bd2b51154768d..4f44dbdb1d7ec 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -1,4 +1,5 @@
 import requests
+import collections
 import time
 import os
 from dataclasses import dataclass
@@ -12,9 +13,29 @@
 GRAFANA_URL = (
     "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
 )
-GITHUB_PROJECT = "llvm/llvm-project"
-WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
-SCRAPE_INTERVAL_SECONDS = 5 * 60
+SCRAPE_INTERVAL_SECONDS = 60
+
+# Lists the Github workflows we want to track. Maps the Github job name to
+# the metric name prefix in grafana.
+# This metric name is also used as a key in the job->name map.
+GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"}
+
+# Lists the Github jobs to track for a given workflow. The key is the stable
+# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
+# Each value is a map to link the github job name to the corresponding metric
+# name.
+GITHUB_JOB_TO_TRACK = {
+    "github_llvm_premerge_checks": {
+        "Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux",
+        "Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows",
+    }
+}
+
+# The number of workflows to pull when sampling queue size & running count.
+# Filtering at the query level doesn't work, and thus sampling workflow counts
+# cannot be done in a clean way.
+# If we miss running/queued workflows, we might want to bump this value.
+GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200
 
 
 @dataclass
@@ -34,7 +55,6 @@ class GaugeMetric:
     value: int
     time_ns: int
 
-
 def get_sampled_workflow_metrics(github_repo: github.Repository):
     """Gets global statistics about the Github workflow queue
 
@@ -45,60 +65,48 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
       Returns a list of GaugeMetric objects, containing the relevant metrics about
       the workflow
     """
-    queued_job_counts = {}
-    running_job_counts = {}
-
-    # Other states are available (pending, waiting, etc), but the meaning
-    # is not documented (See #70540).
-    # "queued" seems to be the info we want.
-    for queued_workflow in github_repo.get_workflow_runs(status="queued"):
-        if queued_workflow.name not in WORKFLOWS_TO_TRACK:
-            continue
-        for queued_workflow_job in queued_workflow.jobs():
-            job_name = queued_workflow_job.name
-            # Workflows marked as queued can potentially only have some jobs
-            # queued, so make sure to also count jobs currently in progress.
-            if queued_workflow_job.status == "queued":
-                if job_name not in queued_job_counts:
-                    queued_job_counts[job_name] = 1
-                else:
-                    queued_job_counts[job_name] += 1
-            elif queued_workflow_job.status == "in_progress":
-                if job_name not in running_job_counts:
-                    running_job_counts[job_name] = 1
-                else:
-                    running_job_counts[job_name] += 1
-
-    for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
-        if running_workflow.name not in WORKFLOWS_TO_TRACK:
+    queued_count = collections.Counter()
+    running_count = collections.Counter()
+
+    # Do not apply any filters to this query.
+    # See https://github.com/orgs/community/discussions/86766
+    # Applying filters like `status=completed` will break pagination, and
+    # return a non-sorted and incomplete list of workflows.
+    i = 0
+    for task in iter(github_repo.get_workflow_runs()):
+        if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
+            break
+        i += 1
+
+        if task.name not in GITHUB_WORKFLOW_TO_TRACK:
             continue
-        for running_workflow_job in running_workflow.jobs():
-            job_name = running_workflow_job.name
-            if running_workflow_job.status != "in_progress":
+
+        prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
+        for job in task.jobs():
+            if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
                 continue
+            suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
+            metric_name = f"{prefix_name}_{suffix_name}"
+
+            # Other states are available (pending, waiting, etc), but the meaning
+            # is not documented (See #70540).
+            # "queued" seems to be the info we want.
+            if job.status == "queued":
+                queued_count[metric_name] += 1
+            elif job.status == "in_progress":
+                running_count[metric_name] += 1
 
-            if job_name not in running_job_counts:
-                running_job_counts[job_name] = 1
-            else:
-                running_job_counts[job_name] += 1
 
     workflow_metrics = []
-    for queued_job in queued_job_counts:
+    for name, value in queued_count.items():
         workflow_metrics.append(
-            GaugeMetric(
-                f"workflow_queue_size_{queued_job}",
-                queued_job_counts[queued_job],
-                time.time_ns(),
-            )
+            GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
         )
-    for running_job in running_job_counts:
+    for name, value in running_count.items():
         workflow_metrics.append(
-            GaugeMetric(
-                f"running_workflow_count_{running_job}",
-                running_job_counts[running_job],
-                time.time_ns(),
-            )
+            GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
         )
+
     # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
     workflow_metrics.append(
         GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
@@ -106,70 +114,68 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
     return workflow_metrics
 
 
-def get_per_workflow_metrics(
-    github_repo: github.Repository, workflows_to_track: dict[str, int]
-):
+def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
     """Gets the metrics for specified Github workflows.
 
     This function takes in a list of workflows to track, and optionally the
     workflow ID of the last tracked invocation. It grabs the relevant data
     from Github, returning it to the caller.
+    If the last_seen_workflow parameter is None, this returns no metrics, but
+    returns the id of the most recent workflow.
 
     Args:
       github_repo: A github repo object to use to query the relevant information.
-      workflows_to_track: A dictionary mapping workflow names to the last
-        invocation ID where metrics have been collected, or None to collect the
-        last five results.
+      last_seen_workflow: the last workflow this function processed.
 
     Returns:
-      Returns a list of JobMetrics objects, containing the relevant metrics about
-      the workflow.
+      Returns a tuple with 2 elements:
+        - a list of JobMetrics objects, one per processed job.
+        - the ID of the most recent processed workflow run.
     """
     workflow_metrics = []
+    most_recent_workflow_processed = None
+
+    # Do not apply any filters to this query.
+    # See https://github.com/orgs/community/discussions/86766
+    # Applying filters like `status=completed` will break pagination, and
+    # return a non-sorted and incomplete list of workflows.
+    for task in iter(github_repo.get_workflow_runs()):
+        # Ignoring non-completed workflows.
+        if task.status != "completed":
+            continue
 
-    workflows_to_include = set(workflows_to_track.keys())
+        # Record the most recent workflow we processed so this script
+        # only processes it once.
+        if most_recent_workflow_processed is None:
+            most_recent_workflow_processed = task.id
 
-    for workflow_run in iter(github_repo.get_workflow_runs()):
-        if len(workflows_to_include) == 0:
+        # This condition only happens when this script starts:
+        # this is used to determine a start point. Don't return any
+        # metrics, just the most recent workflow ID.
+        if last_seen_workflow is None:
             break
 
-        if workflow_run.status != "completed":
-            continue
-
-        # This workflow was already sampled for this run, or is not tracked at
-        # all. Ignoring.
-        if workflow_run.name not in workflows_to_include:
-            continue
+        # This workflow has already been processed. We can stop now.
+        if last_seen_workflow == task.id:
+            break
 
-        # There were no new workflow invocations since the previous scrape.
-        # The API returns a sorted list with the most recent invocations first,
-        # so we can stop looking for this particular workflow. Continue to grab
-        # information on the other workflows of interest, if present.
-        if workflows_to_track[workflow_run.name] == workflow_run.id:
-            workflows_to_include.remove(workflow_run.name)
+        # This workflow is not interesting to us.
+        if task.name not in GITHUB_WORKFLOW_TO_TRACK:
             continue
 
-        workflow_jobs = workflow_run.jobs()
-        if workflow_jobs.totalCount == 0:
-            continue
+        name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
 
-        if (
-            workflows_to_track[workflow_run.name] is None
-            or workflows_to_track[workflow_run.name] == workflow_run.id
-        ):
-            workflows_to_include.remove(workflow_run.name)
-        if (
-            workflows_to_track[workflow_run.name] is not None
-            and len(workflows_to_include) == 0
-        ):
-            break
+        for job in task.jobs():
+            # This job is not interesting to us.
+            if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
+                continue
 
-        for workflow_job in workflow_jobs:
-            created_at = workflow_job.created_at
-            started_at = workflow_job.started_at
-            completed_at = workflow_job.completed_at
+            name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
+            created_at = job.created_at
+            started_at = job.started_at
+            completed_at = job.completed_at
 
-            job_result = int(workflow_job.conclusion == "success")
+            job_result = int(job.conclusion == "success")
             if job_result:
                 # We still might want to mark the job as a failure if one of the steps
                 # failed. This is required due to use setting continue-on-error in
@@ -178,7 +184,7 @@ def get_per_workflow_metrics(
                 # TODO(boomanaiden154): Remove this once the premerge pipeline is no
                 # longer in a testing state and we can directly assert the workflow
                 # result.
-                for step in workflow_job.steps:
+                for step in job.steps:
                     if step.conclusion != "success" and step.conclusion != "skipped":
                         job_result = 0
                         break
@@ -191,25 +197,23 @@ def get_per_workflow_metrics(
 
             # The timestamp associated with the event is expected by Grafana to be
             # in nanoseconds.
-            created_at_ns = int(created_at.timestamp()) * 10**9
+            completed_at_ns = int(completed_at.timestamp()) * 10**9
 
-            logging.info(
-                f"Adding a job metric for job {workflow_job.id} in workflow {workflow_run.id}"
-            )
+            logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
 
             workflow_metrics.append(
                 JobMetrics(
-                    workflow_run.name + "-" + workflow_job.name,
+                    name_prefix + "_" + name_suffix,
                     queue_time.seconds,
                     run_time.seconds,
                     job_result,
-                    created_at_ns,
+                    completed_at_ns,
                     workflow_run.id,
                     workflow_run.name,
                 )
             )
 
-    return workflow_metrics
+    return workflow_metrics, most_recent_workflow_processed
 
 
 def upload_metrics(workflow_metrics, metrics_userid, api_key):
@@ -259,32 +263,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
 
 def main():
     # Authenticate with Github
-    auth = Auth.Token(os.environ["GITHUB_TOKEN"])
-
+    github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
     grafana_api_key = os.environ["GRAFANA_API_KEY"]
     grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
 
-    workflows_to_track = {}
-    for workflow_to_track in WORKFLOWS_TO_TRACK:
-        workflows_to_track[workflow_to_track] = None
+    # The last workflow this script processed.
+    github_last_seen_workflow = None
 
     # Enter the main loop. Every five minutes we wake up and dump metrics for
     # the relevant jobs.
     while True:
-        github_object = Github(auth=auth)
+        github_object = Github(auth=github_auth)
         github_repo = github_object.get_repo("llvm/llvm-project")
 
-        current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
-        current_metrics += get_sampled_workflow_metrics(github_repo)
-
-        upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
-        logging.info(f"Uploaded {len(current_metrics)} metrics")
+        github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
+            github_repo, github_last_seen_workflow
+        )
+        sampled_metrics = get_sampled_workflow_metrics(github_repo)
+        metrics = github_metrics + sampled_metrics
 
-        for workflow_metric in reversed(current_metrics):
-            if isinstance(workflow_metric, JobMetrics):
-                workflows_to_track[
-                    workflow_metric.workflow_name
-                ] = workflow_metric.workflow_id
+        upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
+        logging.info(f"Uploaded {len(metrics)} metrics")
 
         time.sleep(SCRAPE_INTERVAL_SECONDS)
 

>From fbf6505c827d6e2eef84079eb1c285120e58f9bd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Fri, 7 Mar 2025 19:06:41 +0100
Subject: [PATCH 2/2] format

---
 .ci/metrics/metrics.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 4f44dbdb1d7ec..4cb5deba65ac2 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -96,7 +96,6 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
             elif job.status == "in_progress":
                 running_count[metric_name] += 1
 
-
     workflow_metrics = []
     for name, value in queued_count.items():
         workflow_metrics.append(



More information about the llvm-commits mailing list