[llvm] [CI] Add queue size, running count metrics (PR #122714)
Nathan Gauër via llvm-commits
llvm-commits at lists.llvm.org
Tue Jan 14 02:25:43 PST 2025
https://github.com/Keenuts updated https://github.com/llvm/llvm-project/pull/122714
>From 5f79857a3cb94827e762c294d0637c3fab79cd04 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Mon, 13 Jan 2025 14:36:29 +0100
Subject: [PATCH 1/3] [CI] Add queue size, running count metrics
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
This commits allows the container to report 3 additional metrics at
every sampling event:
- a heartbeat
- the size of the workflow queue (filtered)
- the number of running workflows (filtered)
The heartbeat is a simple metric allowing us to monitor the metrics
health. Before this commit, a new metrics was pushed only when a
workflow was completed. This meant we had to wait a few hours
before noticing if the metrics container was unable to push metrics.
In addition to this, this commits adds a sampling of the workflow
queue size and running count. This should allow us to better understand
the load, and improve the autoscale values we pick for the cluster.
Signed-off-by: Nathan Gauër <brioche at google.com>
---
.ci/metrics/metrics.py | 97 ++++++++++++++++++++++++++++++++++++------
1 file changed, 83 insertions(+), 14 deletions(-)
diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 8edc00bc6bd377..c12924c4388126 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -26,7 +26,67 @@ class JobMetrics:
workflow_id: int
-def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
+ at dataclass
+class GaugeMetric:
+ name: str
+ value: int
+ time_ns: int
+
+
+def get_sampled_workflow_metrics(github_repo: github.Repository):
+ """Gets global statistics about the Github workflow queue
+
+ Args:
+ github_repo: A github repo object to use to query the relevant information.
+
+ Returns:
+ Returns a list of GaugeMetric objects, containing the relevant metrics about
+ the workflow
+ """
+
+ # Other states are available (pending, waiting, etc), but the meaning
+ # is not documented (See #70540).
+ # "queued" seems to be the info we want.
+ queued_workflow_count = len(
+ [
+ x
+ for x in github_repo.get_workflow_runs(status="queued")
+ if x.name in WORKFLOWS_TO_TRACK
+ ]
+ )
+ running_workflow_count = len(
+ [
+ x
+ for x in github_repo.get_workflow_runs(status="in_progress")
+ if x.name in WORKFLOWS_TO_TRACK
+ ]
+ )
+
+ workflow_metrics = []
+ workflow_metrics.append(
+ GaugeMetric(
+ "workflow_queue_size",
+ queued_workflow_count,
+ time.time_ns(),
+ )
+ )
+ workflow_metrics.append(
+ GaugeMetric(
+ "running_workflow_count",
+ running_workflow_count,
+ time.time_ns(),
+ )
+ )
+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+ workflow_metrics.append(
+ GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+ )
+ return workflow_metrics
+
+
+def get_per_workflow_metrics(
+ github_repo: github.Repository, workflows_to_track: dict[str, int]
+):
"""Gets the metrics for specified Github workflows.
This function takes in a list of workflows to track, and optionally the
@@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
Returns a list of JobMetrics objects, containing the relevant metrics about
the workflow.
"""
- workflow_runs = iter(github_repo.get_workflow_runs())
-
workflow_metrics = []
workflows_to_include = set(workflows_to_track.keys())
- while len(workflows_to_include) > 0:
- workflow_run = next(workflow_runs)
+ for workflow_run in iter(github_repo.get_workflow_runs()):
+ if len(workflows_to_include) == 0:
+ break
+
if workflow_run.status != "completed":
continue
@@ -141,10 +201,16 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
"""
metrics_batch = []
for workflow_metric in workflow_metrics:
- workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
- metrics_batch.append(
- f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
- )
+ if isinstance(workflow_metric, GaugeMetric):
+ name = workflow_metric.name.lower().replace(" ", "_")
+ metrics_batch.append(
+ f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
+ )
+ else:
+ name = workflow_metric.job_name.lower().replace(" ", "_")
+ metrics_batch.append(
+ f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
+ )
request_data = "\n".join(metrics_batch)
response = requests.post(
@@ -176,16 +242,19 @@ def main():
# Enter the main loop. Every five minutes we wake up and dump metrics for
# the relevant jobs.
while True:
- current_metrics = get_metrics(github_repo, workflows_to_track)
+ current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
+ current_metrics += get_sampled_workflow_metrics(github_repo)
if len(current_metrics) == 0:
- print("No metrics found to upload.", file=sys.stderr)
- continue
+ print("No metrics found to upload.", file=sys.stdout)
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
- print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
+ print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)
for workflow_metric in reversed(current_metrics):
- workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
+ if isinstance(workflow_metric, JobMetrics):
+ workflows_to_track[
+ workflow_metric.job_name
+ ] = workflow_metric.workflow_id
time.sleep(SCRAPE_INTERVAL_SECONDS)
>From 07fb21bb198199ffbaae1f4a4154ea383144073b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Tue, 14 Jan 2025 11:19:48 +0100
Subject: [PATCH 2/3] pr-feedback
---
.ci/metrics/metrics.py | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index c12924c4388126..27923ccaf9f30d 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -199,6 +199,11 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
metrics_userid: The userid to use for the upload.
api_key: The API key to use for the upload.
"""
+
+ if len(workflow_metrics) == 0:
+ print("No metrics found to upload.", file=sys.stdout)
+ return
+
metrics_batch = []
for workflow_metric in workflow_metrics:
if isinstance(workflow_metric, GaugeMetric):
@@ -206,11 +211,13 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
metrics_batch.append(
f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
)
- else:
+ elif isinstance(workflow_metric, JobMetrics):
name = workflow_metric.job_name.lower().replace(" ", "_")
metrics_batch.append(
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
+ else:
+ raise ValueError(f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}")
request_data = "\n".join(metrics_batch)
response = requests.post(
@@ -244,8 +251,8 @@ def main():
while True:
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)
- if len(current_metrics) == 0:
- print("No metrics found to upload.", file=sys.stdout)
+ # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+ current_metrics.append(GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()))
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)
>From bbb3dcea807f909e08fc05bd1b04ff529630a763 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Tue, 14 Jan 2025 11:25:08 +0100
Subject: [PATCH 3/3] clang-format
---
.ci/metrics/metrics.py | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 27923ccaf9f30d..d8e2624a2de4e6 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -217,7 +217,9 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
)
else:
- raise ValueError(f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}")
+ raise ValueError(
+ f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
+ )
request_data = "\n".join(metrics_batch)
response = requests.post(
@@ -252,7 +254,9 @@ def main():
current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
current_metrics += get_sampled_workflow_metrics(github_repo)
# Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
- current_metrics.append(GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()))
+ current_metrics.append(
+ GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+ )
upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)
More information about the llvm-commits
mailing list