[llvm] [CI] Rework github workflow processing (PR #130317)

Nathan Gauër via llvm-commits llvm-commits at lists.llvm.org
Tue Mar 11 06:07:00 PDT 2025


https://github.com/Keenuts updated https://github.com/llvm/llvm-project/pull/130317

>From 56892b31bfd95378dfbc0d9eb9ed46e7823c1cf8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Fri, 7 Mar 2025 18:57:49 +0100
Subject: [PATCH 1/5] [CI] Rework github workflow processing
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Before this patch, the job/workflow name impacted the metric name,
meaning a change in the workflow definition could break monitoring.
This patch adds a map to get a stable name on metrics from a workflow
name.

In addition, it reworks a bit how we track the last processed workflow
to simplify the behavior, and work around an API issue which returns
bogus results if a filter is used.

This PR is a first step to bring buildkite metrics monitoring.

Signed-off-by: Nathan Gauër <brioche at google.com>
---
 .ci/metrics/metrics.py | 229 ++++++++++++++++++++---------------------
 1 file changed, 114 insertions(+), 115 deletions(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index bd2b51154768d..4f44dbdb1d7ec 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -1,4 +1,5 @@
 import requests
+import collections
 import time
 import os
 from dataclasses import dataclass
@@ -12,9 +13,29 @@
 GRAFANA_URL = (
     "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
 )
-GITHUB_PROJECT = "llvm/llvm-project"
-WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
-SCRAPE_INTERVAL_SECONDS = 5 * 60
+SCRAPE_INTERVAL_SECONDS = 60
+
+# Lists the Github workflows we want to track. Maps the Github job name to
+# the metric name prefix in grafana.
+# This metric name is also used as a key in the job->name map.
+GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"}
+
+# Lists the Github jobs to track for a given workflow. The key is the stable
+# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
+# Each value is a map to link the github job name to the corresponding metric
+# name.
+GITHUB_JOB_TO_TRACK = {
+    "github_llvm_premerge_checks": {
+        "Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux",
+        "Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows",
+    }
+}
+
+# The number of workflows to pull when sampling queue size & running count.
+# Filtering at the query level doesn't work, and thus sampling workflow counts
+# cannot be done in a clean way.
+# If we miss running/queued workflows, we might want to bump this value.
+GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200
 
 
 @dataclass
@@ -34,7 +55,6 @@ class GaugeMetric:
     value: int
     time_ns: int
 
-
 def get_sampled_workflow_metrics(github_repo: github.Repository):
     """Gets global statistics about the Github workflow queue
 
@@ -45,60 +65,48 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
       Returns a list of GaugeMetric objects, containing the relevant metrics about
       the workflow
     """
-    queued_job_counts = {}
-    running_job_counts = {}
-
-    # Other states are available (pending, waiting, etc), but the meaning
-    # is not documented (See #70540).
-    # "queued" seems to be the info we want.
-    for queued_workflow in github_repo.get_workflow_runs(status="queued"):
-        if queued_workflow.name not in WORKFLOWS_TO_TRACK:
-            continue
-        for queued_workflow_job in queued_workflow.jobs():
-            job_name = queued_workflow_job.name
-            # Workflows marked as queued can potentially only have some jobs
-            # queued, so make sure to also count jobs currently in progress.
-            if queued_workflow_job.status == "queued":
-                if job_name not in queued_job_counts:
-                    queued_job_counts[job_name] = 1
-                else:
-                    queued_job_counts[job_name] += 1
-            elif queued_workflow_job.status == "in_progress":
-                if job_name not in running_job_counts:
-                    running_job_counts[job_name] = 1
-                else:
-                    running_job_counts[job_name] += 1
-
-    for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
-        if running_workflow.name not in WORKFLOWS_TO_TRACK:
+    queued_count = collections.Counter()
+    running_count = collections.Counter()
+
+    # Do not apply any filters to this query.
+    # See https://github.com/orgs/community/discussions/86766
+    # Applying filters like `status=completed` will break pagination, and
+    # return a non-sorted and incomplete list of workflows.
+    i = 0
+    for task in iter(github_repo.get_workflow_runs()):
+        if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
+            break
+        i += 1
+
+        if task.name not in GITHUB_WORKFLOW_TO_TRACK:
             continue
-        for running_workflow_job in running_workflow.jobs():
-            job_name = running_workflow_job.name
-            if running_workflow_job.status != "in_progress":
+
+        prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
+        for job in task.jobs():
+            if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
                 continue
+            suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
+            metric_name = f"{prefix_name}_{suffix_name}"
+
+            # Other states are available (pending, waiting, etc), but the meaning
+            # is not documented (See #70540).
+            # "queued" seems to be the info we want.
+            if job.status == "queued":
+                queued_count[metric_name] += 1
+            elif job.status == "in_progress":
+                running_count[metric_name] += 1
 
-            if job_name not in running_job_counts:
-                running_job_counts[job_name] = 1
-            else:
-                running_job_counts[job_name] += 1
 
     workflow_metrics = []
-    for queued_job in queued_job_counts:
+    for name, value in queued_count.items():
         workflow_metrics.append(
-            GaugeMetric(
-                f"workflow_queue_size_{queued_job}",
-                queued_job_counts[queued_job],
-                time.time_ns(),
-            )
+            GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
         )
-    for running_job in running_job_counts:
+    for name, value in running_count.items():
         workflow_metrics.append(
-            GaugeMetric(
-                f"running_workflow_count_{running_job}",
-                running_job_counts[running_job],
-                time.time_ns(),
-            )
+            GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
         )
+
     # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
     workflow_metrics.append(
         GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
@@ -106,70 +114,68 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
     return workflow_metrics
 
 
-def get_per_workflow_metrics(
-    github_repo: github.Repository, workflows_to_track: dict[str, int]
-):
+def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
     """Gets the metrics for specified Github workflows.
 
     This function takes in a list of workflows to track, and optionally the
     workflow ID of the last tracked invocation. It grabs the relevant data
     from Github, returning it to the caller.
+    If the last_seen_workflow parameter is None, this returns no metrics, but
+    returns the id of the most recent workflow.
 
     Args:
       github_repo: A github repo object to use to query the relevant information.
-      workflows_to_track: A dictionary mapping workflow names to the last
-        invocation ID where metrics have been collected, or None to collect the
-        last five results.
+      last_seen_workflow: the last workflow this function processed.
 
     Returns:
-      Returns a list of JobMetrics objects, containing the relevant metrics about
-      the workflow.
+      Returns a tuple with 2 elements:
+        - a list of JobMetrics objects, one per processed job.
+        - the ID of the most recent processed workflow run.
     """
     workflow_metrics = []
+    most_recent_workflow_processed = None
+
+    # Do not apply any filters to this query.
+    # See https://github.com/orgs/community/discussions/86766
+    # Applying filters like `status=completed` will break pagination, and
+    # return a non-sorted and incomplete list of workflows.
+    for task in iter(github_repo.get_workflow_runs()):
+        # Ignoring non-completed workflows.
+        if task.status != "completed":
+            continue
 
-    workflows_to_include = set(workflows_to_track.keys())
+        # Record the most recent workflow we processed so this script
+        # only processes it once.
+        if most_recent_workflow_processed is None:
+            most_recent_workflow_processed = task.id
 
-    for workflow_run in iter(github_repo.get_workflow_runs()):
-        if len(workflows_to_include) == 0:
+        # This condition only happens when this script starts:
+        # this is used to determine a start point. Don't return any
+        # metrics, just the most recent workflow ID.
+        if last_seen_workflow is None:
             break
 
-        if workflow_run.status != "completed":
-            continue
-
-        # This workflow was already sampled for this run, or is not tracked at
-        # all. Ignoring.
-        if workflow_run.name not in workflows_to_include:
-            continue
+        # This workflow has already been processed. We can stop now.
+        if last_seen_workflow == task.id:
+            break
 
-        # There were no new workflow invocations since the previous scrape.
-        # The API returns a sorted list with the most recent invocations first,
-        # so we can stop looking for this particular workflow. Continue to grab
-        # information on the other workflows of interest, if present.
-        if workflows_to_track[workflow_run.name] == workflow_run.id:
-            workflows_to_include.remove(workflow_run.name)
+        # This workflow is not interesting to us.
+        if task.name not in GITHUB_WORKFLOW_TO_TRACK:
             continue
 
-        workflow_jobs = workflow_run.jobs()
-        if workflow_jobs.totalCount == 0:
-            continue
+        name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
 
-        if (
-            workflows_to_track[workflow_run.name] is None
-            or workflows_to_track[workflow_run.name] == workflow_run.id
-        ):
-            workflows_to_include.remove(workflow_run.name)
-        if (
-            workflows_to_track[workflow_run.name] is not None
-            and len(workflows_to_include) == 0
-        ):
-            break
+        for job in task.jobs():
+            # This job is not interesting to us.
+            if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
+                continue
 
-        for workflow_job in workflow_jobs:
-            created_at = workflow_job.created_at
-            started_at = workflow_job.started_at
-            completed_at = workflow_job.completed_at
+            name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
+            created_at = job.created_at
+            started_at = job.started_at
+            completed_at = job.completed_at
 
-            job_result = int(workflow_job.conclusion == "success")
+            job_result = int(job.conclusion == "success")
             if job_result:
                 # We still might want to mark the job as a failure if one of the steps
                 # failed. This is required due to use setting continue-on-error in
@@ -178,7 +184,7 @@ def get_per_workflow_metrics(
                 # TODO(boomanaiden154): Remove this once the premerge pipeline is no
                 # longer in a testing state and we can directly assert the workflow
                 # result.
-                for step in workflow_job.steps:
+                for step in job.steps:
                     if step.conclusion != "success" and step.conclusion != "skipped":
                         job_result = 0
                         break
@@ -191,25 +197,23 @@ def get_per_workflow_metrics(
 
             # The timestamp associated with the event is expected by Grafana to be
             # in nanoseconds.
-            created_at_ns = int(created_at.timestamp()) * 10**9
+            completed_at_ns = int(completed_at.timestamp()) * 10**9
 
-            logging.info(
-                f"Adding a job metric for job {workflow_job.id} in workflow {workflow_run.id}"
-            )
+            logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
 
             workflow_metrics.append(
                 JobMetrics(
-                    workflow_run.name + "-" + workflow_job.name,
+                    name_prefix + "_" + name_suffix,
                     queue_time.seconds,
                     run_time.seconds,
                     job_result,
-                    created_at_ns,
+                    completed_at_ns,
                     workflow_run.id,
                     workflow_run.name,
                 )
             )
 
-    return workflow_metrics
+    return workflow_metrics, most_recent_workflow_processed
 
 
 def upload_metrics(workflow_metrics, metrics_userid, api_key):
@@ -259,32 +263,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
 
 def main():
     # Authenticate with Github
-    auth = Auth.Token(os.environ["GITHUB_TOKEN"])
-
+    github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
     grafana_api_key = os.environ["GRAFANA_API_KEY"]
     grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
 
-    workflows_to_track = {}
-    for workflow_to_track in WORKFLOWS_TO_TRACK:
-        workflows_to_track[workflow_to_track] = None
+    # The last workflow this script processed.
+    github_last_seen_workflow = None
 
     # Enter the main loop. Every five minutes we wake up and dump metrics for
     # the relevant jobs.
     while True:
-        github_object = Github(auth=auth)
+        github_object = Github(auth=github_auth)
         github_repo = github_object.get_repo("llvm/llvm-project")
 
-        current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
-        current_metrics += get_sampled_workflow_metrics(github_repo)
-
-        upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
-        logging.info(f"Uploaded {len(current_metrics)} metrics")
+        github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
+            github_repo, github_last_seen_workflow
+        )
+        sampled_metrics = get_sampled_workflow_metrics(github_repo)
+        metrics = github_metrics + sampled_metrics
 
-        for workflow_metric in reversed(current_metrics):
-            if isinstance(workflow_metric, JobMetrics):
-                workflows_to_track[
-                    workflow_metric.workflow_name
-                ] = workflow_metric.workflow_id
+        upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
+        logging.info(f"Uploaded {len(metrics)} metrics")
 
         time.sleep(SCRAPE_INTERVAL_SECONDS)
 

>From fbf6505c827d6e2eef84079eb1c285120e58f9bd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Fri, 7 Mar 2025 19:06:41 +0100
Subject: [PATCH 2/5] format

---
 .ci/metrics/metrics.py | 1 -
 1 file changed, 1 deletion(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 4f44dbdb1d7ec..4cb5deba65ac2 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -96,7 +96,6 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
             elif job.status == "in_progress":
                 running_count[metric_name] += 1
 
-
     workflow_metrics = []
     for name, value in queued_count.items():
         workflow_metrics.append(

>From a952994dfa83424aa996b4327001d72118d0f163 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Mon, 10 Mar 2025 11:39:13 +0100
Subject: [PATCH 3/5] iterate over fixed depth

---
 .ci/metrics/metrics.py | 224 +++++++++++++++++++++--------------------
 1 file changed, 116 insertions(+), 108 deletions(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 4cb5deba65ac2..b2fef5b9da96c 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -1,19 +1,20 @@
-import requests
 import collections
-import time
+import datetime
+import github
+import logging
 import os
-from dataclasses import dataclass
+import requests
 import sys
-import logging
+import time
 
-import github
-from github import Github
+from dataclasses import dataclass
 from github import Auth
+from github import Github
 
 GRAFANA_URL = (
     "https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
 )
-SCRAPE_INTERVAL_SECONDS = 60
+SCRAPE_INTERVAL_SECONDS = 5 * 60
 
 # Lists the Github workflows we want to track. Maps the Github job name to
 # the metric name prefix in grafana.
@@ -31,12 +32,26 @@
     }
 }
 
-# The number of workflows to pull when sampling queue size & running count.
-# Filtering at the query level doesn't work, and thus sampling workflow counts
-# cannot be done in a clean way.
-# If we miss running/queued workflows, we might want to bump this value.
-GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200
-
+# The number of workflows to pull when sampling Github workflows.
+# - Github API filtering is broken: we cannot apply any filtering:
+# - See https://github.com/orgs/community/discussions/86766
+# - A workflow can complete before another workflow, even when starting later.
+# - We don't want to sample the same workflow twice.
+#
+# This means we essentially have a list of workflows sorted by creation date,
+# and that's all we can deduce from it. So for each iteration, we'll blindly
+# process the last N workflows.
+GITHUB_WORKFLOWS_MAX_PROCESS_COUNT = 1000
+# Second reason for the cut: reaching a workflow older than X.
+# This means we will miss long-tails (exceptional jobs running for more than
+# X hours), but that's also the case with the count cutoff above.
+# Only solution to avoid missing any workflow would be to process the complete
+# list, which is not possible.
+GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS = 8
+
+# Grafana will fail to insert any metric older than ~2 hours (value determined
+# by trial and error).
+GRAFANA_METRIC_MAX_AGE_MN = 120
 
 @dataclass
 class JobMetrics:
@@ -44,76 +59,20 @@ class JobMetrics:
     queue_time: int
     run_time: int
     status: int
-    created_at_ns: int
+    completed_at_ns: int
     workflow_id: int
     workflow_name: str
 
-
 @dataclass
 class GaugeMetric:
     name: str
     value: int
     time_ns: int
 
-def get_sampled_workflow_metrics(github_repo: github.Repository):
-    """Gets global statistics about the Github workflow queue
-
-    Args:
-      github_repo: A github repo object to use to query the relevant information.
-
-    Returns:
-      Returns a list of GaugeMetric objects, containing the relevant metrics about
-      the workflow
-    """
-    queued_count = collections.Counter()
-    running_count = collections.Counter()
-
-    # Do not apply any filters to this query.
-    # See https://github.com/orgs/community/discussions/86766
-    # Applying filters like `status=completed` will break pagination, and
-    # return a non-sorted and incomplete list of workflows.
-    i = 0
-    for task in iter(github_repo.get_workflow_runs()):
-        if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
-            break
-        i += 1
-
-        if task.name not in GITHUB_WORKFLOW_TO_TRACK:
-            continue
-
-        prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
-        for job in task.jobs():
-            if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
-                continue
-            suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
-            metric_name = f"{prefix_name}_{suffix_name}"
-
-            # Other states are available (pending, waiting, etc), but the meaning
-            # is not documented (See #70540).
-            # "queued" seems to be the info we want.
-            if job.status == "queued":
-                queued_count[metric_name] += 1
-            elif job.status == "in_progress":
-                running_count[metric_name] += 1
-
-    workflow_metrics = []
-    for name, value in queued_count.items():
-        workflow_metrics.append(
-            GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
-        )
-    for name, value in running_count.items():
-        workflow_metrics.append(
-            GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
-        )
-
-    # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
-    workflow_metrics.append(
-        GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
-    )
-    return workflow_metrics
-
 
-def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
+def github_get_metrics(
+    github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
+):
     """Gets the metrics for specified Github workflows.
 
     This function takes in a list of workflows to track, and optionally the
@@ -132,47 +91,65 @@ def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow:
         - the ID of the most recent processed workflow run.
     """
     workflow_metrics = []
-    most_recent_workflow_processed = None
+    queued_count = collections.Counter()
+    running_count = collections.Counter()
+
+    # The list of workflows this iteration will process.
+    # MaxSize = GITHUB_WORKFLOWS_MAX_PROCESS_COUNT
+    workflow_seen_as_completed = set()
+
+    # Since we process a fixed count of workflows, we want to know when
+    # the depth is too small and if we miss workflows.
+    # E.g.: is there was more than N workflows int last 2 hours.
+    # To monitor this, we'll log the age of the oldest workflow processed,
+    # and setup alterting in Grafana to help us adjust this depth.
+    oldest_seen_workflow_age_mn = None
 
     # Do not apply any filters to this query.
     # See https://github.com/orgs/community/discussions/86766
     # Applying filters like `status=completed` will break pagination, and
     # return a non-sorted and incomplete list of workflows.
+    i = 0
     for task in iter(github_repo.get_workflow_runs()):
-        # Ignoring non-completed workflows.
-        if task.status != "completed":
-            continue
-
-        # Record the most recent workflow we processed so this script
-        # only processes it once.
-        if most_recent_workflow_processed is None:
-            most_recent_workflow_processed = task.id
-
-        # This condition only happens when this script starts:
-        # this is used to determine a start point. Don't return any
-        # metrics, just the most recent workflow ID.
-        if last_seen_workflow is None:
+        # Max depth reached, stopping.
+        if i >= GITHUB_WORKFLOWS_MAX_PROCESS_COUNT:
             break
+        i += 1
 
-        # This workflow has already been processed. We can stop now.
-        if last_seen_workflow == task.id:
+        workflow_age_mn = (
+            datetime.datetime.now(datetime.timezone.utc) - task.created_at
+        ).total_seconds() / 60
+        oldest_seen_workflow_age_mn = workflow_age_mn
+        # If we reach a workflow older than X, stop.
+        if workflow_age_mn > GITHUB_WORKFLOW_MAX_CREATED_AGE_HOURS * 60:
             break
 
         # This workflow is not interesting to us.
         if task.name not in GITHUB_WORKFLOW_TO_TRACK:
             continue
 
-        name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
+        if task.status == "completed":
+            workflow_seen_as_completed.add(task.id)
+
+        # This workflow has already been seen completed in the previous run.
+        if task.id in last_workflows_seen_as_completed:
+            continue
 
+        name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
         for job in task.jobs():
             # This job is not interesting to us.
             if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
                 continue
 
             name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
-            created_at = job.created_at
-            started_at = job.started_at
-            completed_at = job.completed_at
+            metric_name = name_prefix + "_" + name_suffix
+
+            if task.status != "completed":
+                if job.status == "queued":
+                    queued_count[metric_name] += 1
+                elif job.status == "in_progress":
+                    running_count[metric_name] += 1
+                continue
 
             job_result = int(job.conclusion == "success")
             if job_result:
@@ -188,31 +165,63 @@ def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow:
                         job_result = 0
                         break
 
+            created_at = job.created_at
+            started_at = job.started_at
+            completed_at = job.completed_at
             queue_time = started_at - created_at
             run_time = completed_at - started_at
-
             if run_time.seconds == 0:
                 continue
 
+            # Grafana will refuse to ingest metrics older than ~2 hours, so we
+            # should avoid sending historical data.
+            metric_age_mn = (
+                datetime.datetime.now(datetime.timezone.utc) - completed_at
+            ).total_seconds() / 60
+            if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
+                continue
+
+            logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
             # The timestamp associated with the event is expected by Grafana to be
             # in nanoseconds.
             completed_at_ns = int(completed_at.timestamp()) * 10**9
-
-            logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
-
             workflow_metrics.append(
                 JobMetrics(
-                    name_prefix + "_" + name_suffix,
+                    metric_name,
                     queue_time.seconds,
                     run_time.seconds,
                     job_result,
                     completed_at_ns,
-                    workflow_run.id,
-                    workflow_run.name,
+                    task.id,
+                    task.name,
                 )
             )
 
-    return workflow_metrics, most_recent_workflow_processed
+    for name, value in queued_count.items():
+        workflow_metrics.append(
+            GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
+        )
+    for name, value in running_count.items():
+        workflow_metrics.append(
+            GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
+        )
+
+    # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+    workflow_metrics.append(
+        GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+    )
+
+    # Log the oldest workflow we saw, allowing us to monitor if the processing
+    # depth is correctly set-up.
+    if oldest_seen_workflow_age_mn is not None:
+        workflow_metrics.append(
+            GaugeMetric(
+                "github_oldest_processed_workflow_mn",
+                oldest_seen_workflow_age_mn,
+                time.time_ns(),
+            )
+        )
+    return workflow_metrics, workflow_seen_as_completed
 
 
 def upload_metrics(workflow_metrics, metrics_userid, api_key):
@@ -241,7 +250,7 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
         elif isinstance(workflow_metric, JobMetrics):
             name = workflow_metric.job_name.lower().replace(" ", "_")
             metrics_batch.append(
-                f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
+                f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.completed_at_ns}"
             )
         else:
             raise ValueError(
@@ -267,7 +276,9 @@ def main():
     grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
 
     # The last workflow this script processed.
-    github_last_seen_workflow = None
+    # Because the Github queries are broken, we'll simply log a 'processed'
+    # bit for the last COUNT_TO_PROCESS workflows.
+    gh_last_workflows_seen_as_completed = set()
 
     # Enter the main loop. Every five minutes we wake up and dump metrics for
     # the relevant jobs.
@@ -275,12 +286,9 @@ def main():
         github_object = Github(auth=github_auth)
         github_repo = github_object.get_repo("llvm/llvm-project")
 
-        github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
-            github_repo, github_last_seen_workflow
+        metrics, gh_last_workflows_seen_as_completed = github_get_metrics(
+            github_repo, gh_last_workflows_seen_as_completed
         )
-        sampled_metrics = get_sampled_workflow_metrics(github_repo)
-        metrics = github_metrics + sampled_metrics
-
         upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
         logging.info(f"Uploaded {len(metrics)} metrics")
 

>From 4c313350320117a850a89b85275a434ae8c2de7c Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Mon, 10 Mar 2025 11:41:42 +0100
Subject: [PATCH 4/5] pr-feedback

---
 .ci/metrics/metrics.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index b2fef5b9da96c..3fb2bd122f53c 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -72,7 +72,7 @@ class GaugeMetric:
 
 def github_get_metrics(
     github_repo: github.Repository, last_workflows_seen_as_completed: set[int]
-):
+) -> tuple[list[JobMetrics], int]:
     """Gets the metrics for specified Github workflows.
 
     This function takes in a list of workflows to track, and optionally the

>From 44d2967248d86fdf9c5cfe62bdb7917d139c524d Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Tue, 11 Mar 2025 14:05:46 +0100
Subject: [PATCH 5/5] add logging when dropping stale metrics

---
 .ci/metrics/metrics.py | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 3fb2bd122f53c..e1712bd015151 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -179,6 +179,10 @@ def github_get_metrics(
                 datetime.datetime.now(datetime.timezone.utc) - completed_at
             ).total_seconds() / 60
             if metric_age_mn > GRAFANA_METRIC_MAX_AGE_MN:
+                logging.info(
+                    f"Job {job.id} from workflow {task.id} dropped due"
+                    + f" to staleness: {metric_age_mn}mn old."
+                )
                 continue
 
             logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")



More information about the llvm-commits mailing list