[llvm] [CI] Rework github workflow processing (PR #130317)
Nathan Gauër via llvm-commits
llvm-commits at lists.llvm.org
Fri Mar 7 10:07:02 PST 2025
https://github.com/Keenuts updated https://github.com/llvm/llvm-project/pull/130317
>From 56892b31bfd95378dfbc0d9eb9ed46e7823c1cf8 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Fri, 7 Mar 2025 18:57:49 +0100
Subject: [PATCH 1/2] [CI] Rework github workflow processing
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Before this patch, the job/workflow name impacted the metric name,
meaning a change in the workflow definition could break monitoring.
This patch adds a map to get a stable name on metrics from a workflow
name.
In addition, it reworks a bit how we track the last processed workflow
to simplify the behavior, and work around an API issue which returns
bogus results if a filter is used.
This PR is a first step to bring buildkite metrics monitoring.
Signed-off-by: Nathan Gauër <brioche at google.com>
---
.ci/metrics/metrics.py | 229 ++++++++++++++++++++---------------------
1 file changed, 114 insertions(+), 115 deletions(-)
diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index bd2b51154768d..4f44dbdb1d7ec 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -1,4 +1,5 @@
import requests
+import collections
import time
import os
from dataclasses import dataclass
@@ -12,9 +13,29 @@
GRAFANA_URL = (
"https://influx-prod-13-prod-us-east-0.grafana.net/api/v1/push/influx/write"
)
-GITHUB_PROJECT = "llvm/llvm-project"
-WORKFLOWS_TO_TRACK = ["LLVM Premerge Checks"]
-SCRAPE_INTERVAL_SECONDS = 5 * 60
+SCRAPE_INTERVAL_SECONDS = 60
+
+# Lists the Github workflows we want to track. Maps the Github job name to
+# the metric name prefix in grafana.
+# This metric name is also used as a key in the job->name map.
+GITHUB_WORKFLOW_TO_TRACK = {"LLVM Premerge Checks": "github_llvm_premerge_checks"}
+
+# Lists the Github jobs to track for a given workflow. The key is the stable
+# name (metric name) of the workflow (see GITHUB_WORKFLOW_TO_TRACK).
+# Each value is a map to link the github job name to the corresponding metric
+# name.
+GITHUB_JOB_TO_TRACK = {
+ "github_llvm_premerge_checks": {
+ "Linux Premerge Checks (Test Only - Please Ignore Results)": "premerge_linux",
+ "Windows Premerge Checks (Test Only - Please Ignore Results)": "premerge_windows",
+ }
+}
+
+# The number of workflows to pull when sampling queue size & running count.
+# Filtering at the query level doesn't work, and thus sampling workflow counts
+# cannot be done in a clean way.
+# If we miss running/queued workflows, we might want to bump this value.
+GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING = 200
@dataclass
@@ -34,7 +55,6 @@ class GaugeMetric:
value: int
time_ns: int
-
def get_sampled_workflow_metrics(github_repo: github.Repository):
"""Gets global statistics about the Github workflow queue
@@ -45,60 +65,48 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
Returns a list of GaugeMetric objects, containing the relevant metrics about
the workflow
"""
- queued_job_counts = {}
- running_job_counts = {}
-
- # Other states are available (pending, waiting, etc), but the meaning
- # is not documented (See #70540).
- # "queued" seems to be the info we want.
- for queued_workflow in github_repo.get_workflow_runs(status="queued"):
- if queued_workflow.name not in WORKFLOWS_TO_TRACK:
- continue
- for queued_workflow_job in queued_workflow.jobs():
- job_name = queued_workflow_job.name
- # Workflows marked as queued can potentially only have some jobs
- # queued, so make sure to also count jobs currently in progress.
- if queued_workflow_job.status == "queued":
- if job_name not in queued_job_counts:
- queued_job_counts[job_name] = 1
- else:
- queued_job_counts[job_name] += 1
- elif queued_workflow_job.status == "in_progress":
- if job_name not in running_job_counts:
- running_job_counts[job_name] = 1
- else:
- running_job_counts[job_name] += 1
-
- for running_workflow in github_repo.get_workflow_runs(status="in_progress"):
- if running_workflow.name not in WORKFLOWS_TO_TRACK:
+ queued_count = collections.Counter()
+ running_count = collections.Counter()
+
+ # Do not apply any filters to this query.
+ # See https://github.com/orgs/community/discussions/86766
+ # Applying filters like `status=completed` will break pagination, and
+ # return a non-sorted and incomplete list of workflows.
+ i = 0
+ for task in iter(github_repo.get_workflow_runs()):
+ if i > GITHUB_WORKFLOWS_COUNT_FOR_SAMPLING:
+ break
+ i += 1
+
+ if task.name not in GITHUB_WORKFLOW_TO_TRACK:
continue
- for running_workflow_job in running_workflow.jobs():
- job_name = running_workflow_job.name
- if running_workflow_job.status != "in_progress":
+
+ prefix_name = GITHUB_WORKFLOW_TO_TRACK[task.name]
+ for job in task.jobs():
+ if job.name not in GITHUB_JOB_TO_TRACK[prefix_name]:
continue
+ suffix_name = GITHUB_JOB_TO_TRACK[prefix_name][job.name]
+ metric_name = f"{prefix_name}_{suffix_name}"
+
+ # Other states are available (pending, waiting, etc), but the meaning
+ # is not documented (See #70540).
+ # "queued" seems to be the info we want.
+ if job.status == "queued":
+ queued_count[metric_name] += 1
+ elif job.status == "in_progress":
+ running_count[metric_name] += 1
- if job_name not in running_job_counts:
- running_job_counts[job_name] = 1
- else:
- running_job_counts[job_name] += 1
workflow_metrics = []
- for queued_job in queued_job_counts:
+ for name, value in queued_count.items():
workflow_metrics.append(
- GaugeMetric(
- f"workflow_queue_size_{queued_job}",
- queued_job_counts[queued_job],
- time.time_ns(),
- )
+ GaugeMetric(f"workflow_queue_size_{name}", value, time.time_ns())
)
- for running_job in running_job_counts:
+ for name, value in running_count.items():
workflow_metrics.append(
- GaugeMetric(
- f"running_workflow_count_{running_job}",
- running_job_counts[running_job],
- time.time_ns(),
- )
+ GaugeMetric(f"running_workflow_count_{name}", value, time.time_ns())
)
+
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
workflow_metrics.append(
GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
@@ -106,70 +114,68 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
return workflow_metrics
-def get_per_workflow_metrics(
- github_repo: github.Repository, workflows_to_track: dict[str, int]
-):
+def get_per_workflow_metrics(github_repo: github.Repository, last_seen_workflow: str):
"""Gets the metrics for specified Github workflows.
This function takes in a list of workflows to track, and optionally the
workflow ID of the last tracked invocation. It grabs the relevant data
from Github, returning it to the caller.
+ If the last_seen_workflow parameter is None, this returns no metrics, but
+ returns the id of the most recent workflow.
Args:
github_repo: A github repo object to use to query the relevant information.
- workflows_to_track: A dictionary mapping workflow names to the last
- invocation ID where metrics have been collected, or None to collect the
- last five results.
+ last_seen_workflow: the last workflow this function processed.
Returns:
- Returns a list of JobMetrics objects, containing the relevant metrics about
- the workflow.
+ Returns a tuple with 2 elements:
+ - a list of JobMetrics objects, one per processed job.
+ - the ID of the most recent processed workflow run.
"""
workflow_metrics = []
+ most_recent_workflow_processed = None
+
+ # Do not apply any filters to this query.
+ # See https://github.com/orgs/community/discussions/86766
+ # Applying filters like `status=completed` will break pagination, and
+ # return a non-sorted and incomplete list of workflows.
+ for task in iter(github_repo.get_workflow_runs()):
+ # Ignoring non-completed workflows.
+ if task.status != "completed":
+ continue
- workflows_to_include = set(workflows_to_track.keys())
+ # Record the most recent workflow we processed so this script
+ # only processes it once.
+ if most_recent_workflow_processed is None:
+ most_recent_workflow_processed = task.id
- for workflow_run in iter(github_repo.get_workflow_runs()):
- if len(workflows_to_include) == 0:
+ # This condition only happens when this script starts:
+ # this is used to determine a start point. Don't return any
+ # metrics, just the most recent workflow ID.
+ if last_seen_workflow is None:
break
- if workflow_run.status != "completed":
- continue
-
- # This workflow was already sampled for this run, or is not tracked at
- # all. Ignoring.
- if workflow_run.name not in workflows_to_include:
- continue
+ # This workflow has already been processed. We can stop now.
+ if last_seen_workflow == task.id:
+ break
- # There were no new workflow invocations since the previous scrape.
- # The API returns a sorted list with the most recent invocations first,
- # so we can stop looking for this particular workflow. Continue to grab
- # information on the other workflows of interest, if present.
- if workflows_to_track[workflow_run.name] == workflow_run.id:
- workflows_to_include.remove(workflow_run.name)
+ # This workflow is not interesting to us.
+ if task.name not in GITHUB_WORKFLOW_TO_TRACK:
continue
- workflow_jobs = workflow_run.jobs()
- if workflow_jobs.totalCount == 0:
- continue
+ name_prefix = GITHUB_WORKFLOW_TO_TRACK[task.name]
- if (
- workflows_to_track[workflow_run.name] is None
- or workflows_to_track[workflow_run.name] == workflow_run.id
- ):
- workflows_to_include.remove(workflow_run.name)
- if (
- workflows_to_track[workflow_run.name] is not None
- and len(workflows_to_include) == 0
- ):
- break
+ for job in task.jobs():
+ # This job is not interesting to us.
+ if job.name not in GITHUB_JOB_TO_TRACK[name_prefix]:
+ continue
- for workflow_job in workflow_jobs:
- created_at = workflow_job.created_at
- started_at = workflow_job.started_at
- completed_at = workflow_job.completed_at
+ name_suffix = GITHUB_JOB_TO_TRACK[name_prefix][job.name]
+ created_at = job.created_at
+ started_at = job.started_at
+ completed_at = job.completed_at
- job_result = int(workflow_job.conclusion == "success")
+ job_result = int(job.conclusion == "success")
if job_result:
# We still might want to mark the job as a failure if one of the steps
# failed. This is required due to use setting continue-on-error in
@@ -178,7 +184,7 @@ def get_per_workflow_metrics(
# TODO(boomanaiden154): Remove this once the premerge pipeline is no
# longer in a testing state and we can directly assert the workflow
# result.
- for step in workflow_job.steps:
+ for step in job.steps:
if step.conclusion != "success" and step.conclusion != "skipped":
job_result = 0
break
@@ -191,25 +197,23 @@ def get_per_workflow_metrics(
# The timestamp associated with the event is expected by Grafana to be
# in nanoseconds.
- created_at_ns = int(created_at.timestamp()) * 10**9
+ completed_at_ns = int(completed_at.timestamp()) * 10**9
- logging.info(
- f"Adding a job metric for job {workflow_job.id} in workflow {workflow_run.id}"
- )
+ logging.info(f"Adding a job metric for job {job.id} in workflow {task.id}")
workflow_metrics.append(
JobMetrics(
- workflow_run.name + "-" + workflow_job.name,
+ name_prefix + "_" + name_suffix,
queue_time.seconds,
run_time.seconds,
job_result,
- created_at_ns,
+ completed_at_ns,
workflow_run.id,
workflow_run.name,
)
)
- return workflow_metrics
+ return workflow_metrics, most_recent_workflow_processed
def upload_metrics(workflow_metrics, metrics_userid, api_key):
@@ -259,32 +263,27 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
def main():
# Authenticate with Github
- auth = Auth.Token(os.environ["GITHUB_TOKEN"])
-
+ github_auth = Auth.Token(os.environ["GITHUB_TOKEN"])
grafana_api_key = os.environ["GRAFANA_API_KEY"]
grafana_metrics_userid = os.environ["GRAFANA_METRICS_USERID"]
- workflows_to_track = {}
- for workflow_to_track in WORKFLOWS_TO_TRACK:
- workflows_to_track[workflow_to_track] = None
+ # The last workflow this script processed.
+ github_last_seen_workflow = None
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
- github_object = Github(auth=auth)
+ github_object = Github(auth=github_auth)
github_repo = github_object.get_repo("llvm/llvm-project")
- current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
- current_metrics += get_sampled_workflow_metrics(github_repo)
-
- upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
- logging.info(f"Uploaded {len(current_metrics)} metrics")
+ github_metrics, github_last_seen_workflow = get_per_workflow_metrics(
+ github_repo, github_last_seen_workflow
+ )
+ sampled_metrics = get_sampled_workflow_metrics(github_repo)
+ metrics = github_metrics + sampled_metrics
- for workflow_metric in reversed(current_metrics):
- if isinstance(workflow_metric, JobMetrics):
- workflows_to_track[
- workflow_metric.workflow_name
- ] = workflow_metric.workflow_id
+ upload_metrics(metrics, grafana_metrics_userid, grafana_api_key)
+ logging.info(f"Uploaded {len(metrics)} metrics")
time.sleep(SCRAPE_INTERVAL_SECONDS)
>From fbf6505c827d6e2eef84079eb1c285120e58f9bd Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Fri, 7 Mar 2025 19:06:41 +0100
Subject: [PATCH 2/2] format
---
.ci/metrics/metrics.py | 1 -
1 file changed, 1 deletion(-)
diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 4f44dbdb1d7ec..4cb5deba65ac2 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -96,7 +96,6 @@ def get_sampled_workflow_metrics(github_repo: github.Repository):
elif job.status == "in_progress":
running_count[metric_name] += 1
-
workflow_metrics = []
for name, value in queued_count.items():
workflow_metrics.append(
More information about the llvm-commits
mailing list