[llvm] [CI] Add queue size, running count metrics (PR #122714)

Nathan Gauër via llvm-commits llvm-commits at lists.llvm.org
Tue Jan 14 02:25:43 PST 2025


https://github.com/Keenuts updated https://github.com/llvm/llvm-project/pull/122714

>From 5f79857a3cb94827e762c294d0637c3fab79cd04 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Mon, 13 Jan 2025 14:36:29 +0100
Subject: [PATCH 1/3] [CI] Add queue size, running count metrics
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This commits allows the container to report 3 additional metrics at
every sampling event:
- a heartbeat
- the size of the workflow queue (filtered)
- the number of running workflows (filtered)

The heartbeat is a simple metric allowing us to monitor the metrics
health. Before this commit, a new metrics was pushed only when a
workflow was completed. This meant we had to wait a few hours
before noticing if the metrics container was unable to push metrics.

In addition to this, this commits adds a sampling of the workflow
queue size and running count. This should allow us to better understand
the load, and improve the autoscale values we pick for the cluster.

Signed-off-by: Nathan Gauër <brioche at google.com>
---
 .ci/metrics/metrics.py | 97 ++++++++++++++++++++++++++++++++++++------
 1 file changed, 83 insertions(+), 14 deletions(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 8edc00bc6bd377..c12924c4388126 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -26,7 +26,67 @@ class JobMetrics:
     workflow_id: int
 
 
-def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, int]):
+ at dataclass
+class GaugeMetric:
+    name: str
+    value: int
+    time_ns: int
+
+
+def get_sampled_workflow_metrics(github_repo: github.Repository):
+    """Gets global statistics about the Github workflow queue
+
+    Args:
+      github_repo: A github repo object to use to query the relevant information.
+
+    Returns:
+      Returns a list of GaugeMetric objects, containing the relevant metrics about
+      the workflow
+    """
+
+    # Other states are available (pending, waiting, etc), but the meaning
+    # is not documented (See #70540).
+    # "queued" seems to be the info we want.
+    queued_workflow_count = len(
+        [
+            x
+            for x in github_repo.get_workflow_runs(status="queued")
+            if x.name in WORKFLOWS_TO_TRACK
+        ]
+    )
+    running_workflow_count = len(
+        [
+            x
+            for x in github_repo.get_workflow_runs(status="in_progress")
+            if x.name in WORKFLOWS_TO_TRACK
+        ]
+    )
+
+    workflow_metrics = []
+    workflow_metrics.append(
+        GaugeMetric(
+            "workflow_queue_size",
+            queued_workflow_count,
+            time.time_ns(),
+        )
+    )
+    workflow_metrics.append(
+        GaugeMetric(
+            "running_workflow_count",
+            running_workflow_count,
+            time.time_ns(),
+        )
+    )
+    # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+    workflow_metrics.append(
+        GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+    )
+    return workflow_metrics
+
+
+def get_per_workflow_metrics(
+    github_repo: github.Repository, workflows_to_track: dict[str, int]
+):
     """Gets the metrics for specified Github workflows.
 
     This function takes in a list of workflows to track, and optionally the
@@ -43,14 +103,14 @@ def get_metrics(github_repo: github.Repository, workflows_to_track: dict[str, in
       Returns a list of JobMetrics objects, containing the relevant metrics about
       the workflow.
     """
-    workflow_runs = iter(github_repo.get_workflow_runs())
-
     workflow_metrics = []
 
     workflows_to_include = set(workflows_to_track.keys())
 
-    while len(workflows_to_include) > 0:
-        workflow_run = next(workflow_runs)
+    for workflow_run in iter(github_repo.get_workflow_runs()):
+        if len(workflows_to_include) == 0:
+            break
+
         if workflow_run.status != "completed":
             continue
 
@@ -141,10 +201,16 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
     """
     metrics_batch = []
     for workflow_metric in workflow_metrics:
-        workflow_formatted_name = workflow_metric.job_name.lower().replace(" ", "_")
-        metrics_batch.append(
-            f"{workflow_formatted_name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
-        )
+        if isinstance(workflow_metric, GaugeMetric):
+            name = workflow_metric.name.lower().replace(" ", "_")
+            metrics_batch.append(
+                f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
+            )
+        else:
+            name = workflow_metric.job_name.lower().replace(" ", "_")
+            metrics_batch.append(
+                f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
+            )
 
     request_data = "\n".join(metrics_batch)
     response = requests.post(
@@ -176,16 +242,19 @@ def main():
     # Enter the main loop. Every five minutes we wake up and dump metrics for
     # the relevant jobs.
     while True:
-        current_metrics = get_metrics(github_repo, workflows_to_track)
+        current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
+        current_metrics += get_sampled_workflow_metrics(github_repo)
         if len(current_metrics) == 0:
-            print("No metrics found to upload.", file=sys.stderr)
-            continue
+            print("No metrics found to upload.", file=sys.stdout)
 
         upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
-        print(f"Uploaded {len(current_metrics)} metrics", file=sys.stderr)
+        print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)
 
         for workflow_metric in reversed(current_metrics):
-            workflows_to_track[workflow_metric.job_name] = workflow_metric.workflow_id
+            if isinstance(workflow_metric, JobMetrics):
+                workflows_to_track[
+                    workflow_metric.job_name
+                ] = workflow_metric.workflow_id
 
         time.sleep(SCRAPE_INTERVAL_SECONDS)
 

>From 07fb21bb198199ffbaae1f4a4154ea383144073b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Tue, 14 Jan 2025 11:19:48 +0100
Subject: [PATCH 2/3] pr-feedback

---
 .ci/metrics/metrics.py | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index c12924c4388126..27923ccaf9f30d 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -199,6 +199,11 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
       metrics_userid: The userid to use for the upload.
       api_key: The API key to use for the upload.
     """
+
+    if len(workflow_metrics) == 0:
+        print("No metrics found to upload.", file=sys.stdout)
+        return
+
     metrics_batch = []
     for workflow_metric in workflow_metrics:
         if isinstance(workflow_metric, GaugeMetric):
@@ -206,11 +211,13 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
             metrics_batch.append(
                 f"{name} value={workflow_metric.value} {workflow_metric.time_ns}"
             )
-        else:
+        elif isinstance(workflow_metric, JobMetrics):
             name = workflow_metric.job_name.lower().replace(" ", "_")
             metrics_batch.append(
                 f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
             )
+        else:
+          raise ValueError(f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}")
 
     request_data = "\n".join(metrics_batch)
     response = requests.post(
@@ -244,8 +251,8 @@ def main():
     while True:
         current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
         current_metrics += get_sampled_workflow_metrics(github_repo)
-        if len(current_metrics) == 0:
-            print("No metrics found to upload.", file=sys.stdout)
+        # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
+        current_metrics.append(GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()))
 
         upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
         print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)

>From bbb3dcea807f909e08fc05bd1b04ff529630a763 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Nathan=20Gau=C3=ABr?= <brioche at google.com>
Date: Tue, 14 Jan 2025 11:25:08 +0100
Subject: [PATCH 3/3] clang-format

---
 .ci/metrics/metrics.py | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/.ci/metrics/metrics.py b/.ci/metrics/metrics.py
index 27923ccaf9f30d..d8e2624a2de4e6 100644
--- a/.ci/metrics/metrics.py
+++ b/.ci/metrics/metrics.py
@@ -217,7 +217,9 @@ def upload_metrics(workflow_metrics, metrics_userid, api_key):
                 f"{name} queue_time={workflow_metric.queue_time},run_time={workflow_metric.run_time},status={workflow_metric.status} {workflow_metric.created_at_ns}"
             )
         else:
-          raise ValueError(f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}")
+            raise ValueError(
+                f"Unsupported object type {type(workflow_metric)}: {str(workflow_metric)}"
+            )
 
     request_data = "\n".join(metrics_batch)
     response = requests.post(
@@ -252,7 +254,9 @@ def main():
         current_metrics = get_per_workflow_metrics(github_repo, workflows_to_track)
         current_metrics += get_sampled_workflow_metrics(github_repo)
         # Always send a hearbeat metric so we can monitor is this container is still able to log to Grafana.
-        current_metrics.append(GaugeMetric("metrics_container_heartbeat", 1, time.time_ns()))
+        current_metrics.append(
+            GaugeMetric("metrics_container_heartbeat", 1, time.time_ns())
+        )
 
         upload_metrics(current_metrics, grafana_metrics_userid, grafana_api_key)
         print(f"Uploaded {len(current_metrics)} metrics", file=sys.stdout)



More information about the llvm-commits mailing list