[llvm] [tools] LLVM Advisor - optimization analysis and performance guidance tool (PR #147451)
via llvm-commits
llvm-commits at lists.llvm.org
Mon Jul 7 20:00:29 PDT 2025
Miguel =?utf-8?q?Cárdenas?= <miguelecsx at gmail.com>
Message-ID:
In-Reply-To: <llvm.org/llvm/llvm-project/pull/147451 at github.com>
github-actions[bot] wrote:
<!--LLVM CODE FORMAT COMMENT: {darker}-->
:warning: Python code formatter, darker found issues in your code. :warning:
<details>
<summary>
You can test this locally with the following command:
</summary>
``````````bash
darker --check --diff -r HEAD~1...HEAD llvm/tools/llvm-advisor/view/parser/__init__.py llvm/tools/llvm-advisor/view/parser/analyzer.py llvm/tools/llvm-advisor/view/parser/models.py llvm/tools/llvm-advisor/view/parser/profile_parser.py llvm/tools/llvm-advisor/view/parser/yaml_parser.py
``````````
</details>
<details>
<summary>
View the diff from darker here.
</summary>
``````````diff
--- __init__.py 2025-07-08 02:57:37.000000 +0000
+++ __init__.py 2025-07-08 02:59:58.577994 +0000
@@ -1,14 +1,21 @@
from .yaml_parser import OptimizationRecordParser
-from .models import OptimizationRemark, CompilationUnit, Project, RemarkType, DebugLocation, RemarkArgument
+from .models import (
+ OptimizationRemark,
+ CompilationUnit,
+ Project,
+ RemarkType,
+ DebugLocation,
+ RemarkArgument,
+)
from .analyzer import RemarkAnalyzer
__all__ = [
- 'OptimizationRecordParser',
- 'OptimizationRemark',
- 'CompilationUnit',
- 'Project',
- 'RemarkType',
- 'DebugLocation',
- 'RemarkArgument',
- 'RemarkAnalyzer'
+ "OptimizationRecordParser",
+ "OptimizationRemark",
+ "CompilationUnit",
+ "Project",
+ "RemarkType",
+ "DebugLocation",
+ "RemarkArgument",
+ "RemarkAnalyzer",
]
--- analyzer.py 2025-07-08 02:57:37.000000 +0000
+++ analyzer.py 2025-07-08 02:59:58.876561 +0000
@@ -1,17 +1,17 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# LLVM Advisor Analysis Engine - Comprehensive analysis of optimization
# remarks, profiling data, and performance insights.
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
from typing import Dict, List, Any, Optional
from collections import defaultdict, Counter
from .models import Project, RemarkType, OptimizationRemark
from .profile_parser import ProfileParser, ProfileData
@@ -24,742 +24,964 @@
self.raw_trace_data = None
if profile_path:
parser = ProfileParser(profile_path)
self.profile_data = parser.parse()
self.raw_trace_data = parser.get_raw_data()
-
+
def analyze_optimization_opportunities(self) -> Dict[str, Any]:
missed_by_pass = defaultdict(list)
-
+
for unit in self.project.compilation_units:
for remark in unit.remarks:
if remark.remark_type == RemarkType.MISSED:
missed_by_pass[remark.pass_name].append(remark)
-
+
opportunities = []
for pass_name, remarks in missed_by_pass.items():
# Count unique files affected by this pass
unique_files = set()
for remark in remarks:
for unit in self.project.compilation_units:
if remark in unit.remarks:
unique_files.add(unit.source_file)
break
-
- opportunities.append({
- "pass": pass_name,
- "missed_count": len(remarks),
- "files_affected": len(unique_files),
- "impact": "high" if len(remarks) > 10 else "medium" if len(remarks) > 5 else "low"
- })
-
- return {
- "optimization_opportunities": sorted(opportunities, key=lambda x: x["missed_count"], reverse=True),
- "total_missed": sum(len(remarks) for remarks in missed_by_pass.values())
- }
-
+
+ opportunities.append(
+ {
+ "pass": pass_name,
+ "missed_count": len(remarks),
+ "files_affected": len(unique_files),
+ "impact": "high"
+ if len(remarks) > 10
+ else "medium"
+ if len(remarks) > 5
+ else "low",
+ }
+ )
+
+ return {
+ "optimization_opportunities": sorted(
+ opportunities, key=lambda x: x["missed_count"], reverse=True
+ ),
+ "total_missed": sum(len(remarks) for remarks in missed_by_pass.values()),
+ }
+
def analyze_performance_hotspots(self) -> Dict[str, Any]:
function_remarks = defaultdict(list)
-
+
for unit in self.project.compilation_units:
for remark in unit.remarks:
function_remarks[remark.function].append(remark)
-
+
hotspots = []
for function, remarks in function_remarks.items():
if len(remarks) > 3: # Functions with many remarks are potential hotspots
passed = sum(1 for r in remarks if r.remark_type == RemarkType.PASSED)
missed = sum(1 for r in remarks if r.remark_type == RemarkType.MISSED)
-
- hotspots.append({
- "function": function,
- "total_remarks": len(remarks),
- "passed": passed,
- "missed": missed,
- "optimization_ratio": passed / len(remarks) if remarks else 0
- })
-
- return {
- "hotspots": sorted(hotspots, key=lambda x: x["total_remarks"], reverse=True)[:10],
- "total_functions_analyzed": len(function_remarks)
- }
-
+
+ hotspots.append(
+ {
+ "function": function,
+ "total_remarks": len(remarks),
+ "passed": passed,
+ "missed": missed,
+ "optimization_ratio": passed / len(remarks) if remarks else 0,
+ }
+ )
+
+ return {
+ "hotspots": sorted(
+ hotspots, key=lambda x: x["total_remarks"], reverse=True
+ )[:10],
+ "total_functions_analyzed": len(function_remarks),
+ }
+
def analyze_offloading_efficiency(self) -> Dict[str, Any]:
offloading_remarks = []
-
+
for unit in self.project.compilation_units:
offloading_remarks.extend(unit.get_offloading_remarks())
-
+
if not offloading_remarks:
return {"offloading_remarks": 0, "efficiency": "N/A"}
-
- passed = sum(1 for r in offloading_remarks if r.remark_type == RemarkType.PASSED)
- missed = sum(1 for r in offloading_remarks if r.remark_type == RemarkType.MISSED)
-
+
+ passed = sum(
+ 1 for r in offloading_remarks if r.remark_type == RemarkType.PASSED
+ )
+ missed = sum(
+ 1 for r in offloading_remarks if r.remark_type == RemarkType.MISSED
+ )
+
efficiency = passed / len(offloading_remarks) if offloading_remarks else 0
-
+
return {
"offloading_remarks": len(offloading_remarks),
"passed": passed,
"missed": missed,
"efficiency": efficiency,
- "efficiency_rating": "excellent" if efficiency > 0.8 else "good" if efficiency > 0.6 else "needs_improvement"
+ "efficiency_rating": "excellent"
+ if efficiency > 0.8
+ else "good"
+ if efficiency > 0.6
+ else "needs_improvement",
}
def analyze_profiling_data(self) -> Dict[str, Any]:
"""Analyze performance-related remarks for profiling insights"""
static_analysis = {
"loop_analysis": self._analyze_loops(),
"vectorization_analysis": self._analyze_vectorization(),
"inlining_analysis": self._analyze_inlining(),
"memory_analysis": self._analyze_memory_operations(),
"kernel_analysis": self._analyze_kernels(),
- "hotspot_files": self._analyze_file_hotspots()
- }
-
+ "hotspot_files": self._analyze_file_hotspots(),
+ }
+
if self.profile_data:
runtime_analysis = self._analyze_runtime_profile()
static_analysis.update(runtime_analysis)
-
+
return static_analysis
-
+
def analyze_optimization_insights(self) -> Dict[str, Any]:
"""Detailed optimization analysis for optimization tab"""
return {
"vectorization_opportunities": self._get_vectorization_opportunities(),
"loop_optimization": self._analyze_loop_optimizations(),
"function_optimization": self._analyze_function_optimizations(),
"memory_optimization": self._analyze_memory_optimizations(),
"parallelization_opportunities": self._get_parallelization_opportunities(),
- "compiler_recommendations": self._generate_compiler_recommendations()
- }
-
+ "compiler_recommendations": self._generate_compiler_recommendations(),
+ }
+
def analyze_hardware_insights(self) -> Dict[str, Any]:
"""Hardware-specific analysis for hardware tab"""
return {
"gpu_utilization": self._analyze_gpu_utilization(),
"memory_hierarchy": self._analyze_memory_hierarchy(),
"compute_patterns": self._analyze_compute_patterns(),
"offloading_patterns": self._analyze_offloading_patterns(),
- "architecture_recommendations": self._generate_architecture_recommendations()
- }
-
+ "architecture_recommendations": self._generate_architecture_recommendations(),
+ }
+
def _analyze_loops(self) -> Dict[str, Any]:
"""Analyze loop-related remarks"""
loop_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if any(keyword in remark.get_message().lower() for keyword in ['loop', 'unroll', 'vectorize']):
+ if any(
+ keyword in remark.get_message().lower()
+ for keyword in ["loop", "unroll", "vectorize"]
+ ):
loop_remarks.append(remark)
-
+
loop_stats = Counter([r.pass_name for r in loop_remarks])
return {
"total_loops": len(loop_remarks),
"by_pass": dict(loop_stats),
- "vectorized": len([r for r in loop_remarks if 'vectorize' in r.pass_name and r.remark_type == RemarkType.PASSED]),
- "failed_vectorization": len([r for r in loop_remarks if 'vectorize' in r.pass_name and r.remark_type == RemarkType.MISSED])
- }
-
+ "vectorized": len(
+ [
+ r
+ for r in loop_remarks
+ if "vectorize" in r.pass_name and r.remark_type == RemarkType.PASSED
+ ]
+ ),
+ "failed_vectorization": len(
+ [
+ r
+ for r in loop_remarks
+ if "vectorize" in r.pass_name and r.remark_type == RemarkType.MISSED
+ ]
+ ),
+ }
+
def _analyze_vectorization(self) -> Dict[str, Any]:
"""Analyze vectorization performance"""
vec_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if 'vectorize' in remark.pass_name.lower():
+ if "vectorize" in remark.pass_name.lower():
vec_remarks.append(remark)
-
+
successful = [r for r in vec_remarks if r.remark_type == RemarkType.PASSED]
failed = [r for r in vec_remarks if r.remark_type == RemarkType.MISSED]
-
+
return {
"total_vectorization_attempts": len(vec_remarks),
"successful": len(successful),
"failed": len(failed),
"success_rate": len(successful) / len(vec_remarks) if vec_remarks else 0,
- "common_failures": self._get_common_failure_reasons(failed)
- }
-
+ "common_failures": self._get_common_failure_reasons(failed),
+ }
+
def _analyze_inlining(self) -> Dict[str, Any]:
"""Analyze function inlining performance"""
inline_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if remark.pass_name == 'inline':
+ if remark.pass_name == "inline":
inline_remarks.append(remark)
-
+
successful = [r for r in inline_remarks if r.remark_type == RemarkType.PASSED]
failed = [r for r in inline_remarks if r.remark_type == RemarkType.MISSED]
-
+
return {
"total_inline_attempts": len(inline_remarks),
"successful": len(successful),
"failed": len(failed),
- "success_rate": len(successful) / len(inline_remarks) if inline_remarks else 0
- }
-
+ "success_rate": len(successful) / len(inline_remarks)
+ if inline_remarks
+ else 0,
+ }
+
def _analyze_memory_operations(self) -> Dict[str, Any]:
"""Analyze memory-related optimizations"""
memory_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
message = remark.get_message().lower()
- if any(keyword in message for keyword in ['load', 'store', 'memory', 'cache', 'transfer']):
+ if any(
+ keyword in message
+ for keyword in ["load", "store", "memory", "cache", "transfer"]
+ ):
memory_remarks.append(remark)
-
+
return {
"total_memory_operations": len(memory_remarks),
- "optimized": len([r for r in memory_remarks if r.remark_type == RemarkType.PASSED]),
- "missed_optimizations": len([r for r in memory_remarks if r.remark_type == RemarkType.MISSED])
- }
-
+ "optimized": len(
+ [r for r in memory_remarks if r.remark_type == RemarkType.PASSED]
+ ),
+ "missed_optimizations": len(
+ [r for r in memory_remarks if r.remark_type == RemarkType.MISSED]
+ ),
+ }
+
def _analyze_kernels(self) -> Dict[str, Any]:
"""Analyze GPU kernel performance from both remarks and profile data"""
kernel_remarks = []
kernel_functions = set()
target_regions = set()
-
+
for unit in self.project.compilation_units:
for remark in unit.remarks:
# Check multiple indicators for kernel functions
is_kernel = (
- '__omp_offloading_' in remark.function or
- 'kernel' in remark.function.lower() or
- remark.is_offloading_related()
+ "__omp_offloading_" in remark.function
+ or "kernel" in remark.function.lower()
+ or remark.is_offloading_related()
)
-
+
if is_kernel:
kernel_remarks.append(remark)
-
+
# Extract unique kernel functions
- if '__omp_offloading_' in remark.function:
+ if "__omp_offloading_" in remark.function:
kernel_functions.add(remark.function)
elif remark.is_offloading_related():
# For OpenMP optimization remarks, create unique identifiers
if remark.debug_loc:
# Use file:line as unique kernel identifier for target regions
- kernel_id = f"{remark.debug_loc.file}:{remark.debug_loc.line}"
+ kernel_id = (
+ f"{remark.debug_loc.file}:{remark.debug_loc.line}"
+ )
target_regions.add(kernel_id)
else:
# Fallback to function name
kernel_functions.add(remark.function)
-
+
# Count from static analysis of source files for target directives
static_kernel_count = 0
for unit in self.project.compilation_units:
source_lines = unit.get_source_lines(".")
if source_lines:
for line in source_lines:
# Count OpenMP target directives directly from source
- if '#pragma omp target' in line and not line.strip().startswith('//'):
+ if "#pragma omp target" in line and not line.strip().startswith(
+ "//"
+ ):
static_kernel_count += 1
-
+
# Get accurate kernel count from profile data if available
profile_kernels = 0
- if self.raw_trace_data and 'traceEvents' in self.raw_trace_data:
+ if self.raw_trace_data and "traceEvents" in self.raw_trace_data:
# Primary method: Get count from "Total Runtime: target exe" summary event
- summary_events = [e for e in self.raw_trace_data['traceEvents']
- if (e.get('name', '') == 'Total Runtime: target exe' and
- e.get('args', {}).get('count') is not None)]
-
+ summary_events = [
+ e
+ for e in self.raw_trace_data["traceEvents"]
+ if (
+ e.get("name", "") == "Total Runtime: target exe"
+ and e.get("args", {}).get("count") is not None
+ )
+ ]
+
if summary_events:
# Use the count from summary which represents actual kernel executions
- profile_kernels = summary_events[0]['args']['count']
+ profile_kernels = summary_events[0]["args"]["count"]
else:
# Fallback: Count individual kernel execution events
- kernel_events = [e for e in self.raw_trace_data['traceEvents']
- if (e.get('name', '') == 'Runtime: target exe' and
- e.get('ph') == 'X' and e.get('dur', 0) > 0)]
+ kernel_events = [
+ e
+ for e in self.raw_trace_data["traceEvents"]
+ if (
+ e.get("name", "") == "Runtime: target exe"
+ and e.get("ph") == "X"
+ and e.get("dur", 0) > 0
+ )
+ ]
profile_kernels = len(kernel_events)
-
+
# Another fallback: Look for other kernel execution patterns
if profile_kernels == 0:
- target_events = [e for e in self.raw_trace_data['traceEvents']
- if (e.get('name', '').startswith('Kernel Target') and
- e.get('ph') == 'X')]
+ target_events = [
+ e
+ for e in self.raw_trace_data["traceEvents"]
+ if (
+ e.get("name", "").startswith("Kernel Target")
+ and e.get("ph") == "X"
+ )
+ ]
profile_kernels = len(target_events)
-
- elif self.profile_data and hasattr(self.profile_data, 'kernels'):
+
+ elif self.profile_data and hasattr(self.profile_data, "kernels"):
profile_kernels = len(self.profile_data.kernels)
-
+
# Prioritize profile data when available, otherwise use static analysis
- detected_kernels = profile_kernels if profile_kernels > 0 else max(
- len(kernel_functions),
- len(target_regions),
- static_kernel_count
+ detected_kernels = (
+ profile_kernels
+ if profile_kernels > 0
+ else max(len(kernel_functions), len(target_regions), static_kernel_count)
)
-
+
return {
"total_kernels": detected_kernels,
"total_kernel_remarks": len(kernel_remarks),
- "optimized_kernels": len([f for f in kernel_functions if any(
- r.remark_type == RemarkType.PASSED for r in kernel_remarks if r.function == f
- )]),
+ "optimized_kernels": len(
+ [
+ f
+ for f in kernel_functions
+ if any(
+ r.remark_type == RemarkType.PASSED
+ for r in kernel_remarks
+ if r.function == f
+ )
+ ]
+ ),
"profile_detected_kernels": profile_kernels,
"remarks_detected_kernels": len(kernel_functions),
"target_regions_detected": len(target_regions),
- "static_analysis_kernels": static_kernel_count
- }
-
+ "static_analysis_kernels": static_kernel_count,
+ }
+
def _analyze_file_hotspots(self) -> List[Dict[str, Any]]:
"""Identify files with most optimization activity"""
file_stats = []
for unit in self.project.compilation_units:
- passed = len([r for r in unit.remarks if r.remark_type == RemarkType.PASSED])
- missed = len([r for r in unit.remarks if r.remark_type == RemarkType.MISSED])
+ passed = len(
+ [r for r in unit.remarks if r.remark_type == RemarkType.PASSED]
+ )
+ missed = len(
+ [r for r in unit.remarks if r.remark_type == RemarkType.MISSED]
+ )
total = len(unit.remarks)
-
+
if total > 0:
- file_stats.append({
- "file": unit.source_file,
- "total_remarks": total,
- "passed": passed,
- "missed": missed,
- "optimization_ratio": passed / total,
- "activity_score": total
- })
-
+ file_stats.append(
+ {
+ "file": unit.source_file,
+ "total_remarks": total,
+ "passed": passed,
+ "missed": missed,
+ "optimization_ratio": passed / total,
+ "activity_score": total,
+ }
+ )
+
return sorted(file_stats, key=lambda x: x["activity_score"], reverse=True)[:10]
-
+
def _get_vectorization_opportunities(self) -> List[Dict[str, Any]]:
"""Identify missed vectorization opportunities"""
opportunities = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if ('vectorize' in remark.pass_name.lower() and
- remark.remark_type == RemarkType.MISSED):
- opportunities.append({
- "file": unit.source_file,
- "line": remark.debug_loc.line if remark.debug_loc else 0,
- "function": remark.function,
- "reason": remark.get_message(),
- "pass": remark.pass_name
- })
+ if (
+ "vectorize" in remark.pass_name.lower()
+ and remark.remark_type == RemarkType.MISSED
+ ):
+ opportunities.append(
+ {
+ "file": unit.source_file,
+ "line": remark.debug_loc.line if remark.debug_loc else 0,
+ "function": remark.function,
+ "reason": remark.get_message(),
+ "pass": remark.pass_name,
+ }
+ )
return opportunities[:20] # Top 20 opportunities
-
+
def _analyze_loop_optimizations(self) -> Dict[str, Any]:
"""Analyze loop optimization patterns"""
- loop_passes = ['loop-vectorize', 'loop-unroll', 'licm', 'loop-idiom']
+ loop_passes = ["loop-vectorize", "loop-unroll", "licm", "loop-idiom"]
loop_data = {}
-
+
for pass_name in loop_passes:
remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
if remark.pass_name == pass_name:
remarks.append(remark)
-
+
loop_data[pass_name] = {
"total": len(remarks),
- "passed": len([r for r in remarks if r.remark_type == RemarkType.PASSED]),
- "missed": len([r for r in remarks if r.remark_type == RemarkType.MISSED])
+ "passed": len(
+ [r for r in remarks if r.remark_type == RemarkType.PASSED]
+ ),
+ "missed": len(
+ [r for r in remarks if r.remark_type == RemarkType.MISSED]
+ ),
}
-
+
return loop_data
-
+
def _analyze_function_optimizations(self) -> Dict[str, Any]:
"""Analyze function-level optimizations"""
- function_passes = ['inline', 'dce', 'dse', 'gvn']
+ function_passes = ["inline", "dce", "dse", "gvn"]
func_data = {}
-
+
for pass_name in function_passes:
remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
if remark.pass_name == pass_name:
remarks.append(remark)
-
+
func_data[pass_name] = {
"total": len(remarks),
- "passed": len([r for r in remarks if r.remark_type == RemarkType.PASSED]),
- "missed": len([r for r in remarks if r.remark_type == RemarkType.MISSED])
+ "passed": len(
+ [r for r in remarks if r.remark_type == RemarkType.PASSED]
+ ),
+ "missed": len(
+ [r for r in remarks if r.remark_type == RemarkType.MISSED]
+ ),
}
-
+
return func_data
-
+
def _analyze_memory_optimizations(self) -> List[Dict[str, Any]]:
"""Analyze memory optimization opportunities"""
memory_issues = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if (remark.remark_type == RemarkType.MISSED and
- any(keyword in remark.get_message().lower() for keyword in
- ['load', 'store', 'memory', 'cache', 'clobbered'])):
- memory_issues.append({
- "file": unit.source_file,
- "line": remark.debug_loc.line if remark.debug_loc else 0,
- "function": remark.function,
- "issue": remark.get_message(),
- "pass": remark.pass_name
- })
+ if remark.remark_type == RemarkType.MISSED and any(
+ keyword in remark.get_message().lower()
+ for keyword in ["load", "store", "memory", "cache", "clobbered"]
+ ):
+ memory_issues.append(
+ {
+ "file": unit.source_file,
+ "line": remark.debug_loc.line if remark.debug_loc else 0,
+ "function": remark.function,
+ "issue": remark.get_message(),
+ "pass": remark.pass_name,
+ }
+ )
return memory_issues[:15]
-
+
def _get_parallelization_opportunities(self) -> List[Dict[str, Any]]:
"""Identify parallelization opportunities"""
opportunities = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if (any(keyword in remark.get_message().lower() for keyword in
- ['parallel', 'thread', 'openmp', 'offload']) and
- remark.remark_type == RemarkType.MISSED):
- opportunities.append({
- "file": unit.source_file,
- "line": remark.debug_loc.line if remark.debug_loc else 0,
- "function": remark.function,
- "opportunity": remark.get_message(),
- "pass": remark.pass_name
- })
+ if (
+ any(
+ keyword in remark.get_message().lower()
+ for keyword in ["parallel", "thread", "openmp", "offload"]
+ )
+ and remark.remark_type == RemarkType.MISSED
+ ):
+ opportunities.append(
+ {
+ "file": unit.source_file,
+ "line": remark.debug_loc.line if remark.debug_loc else 0,
+ "function": remark.function,
+ "opportunity": remark.get_message(),
+ "pass": remark.pass_name,
+ }
+ )
return opportunities[:10]
-
+
def _generate_compiler_recommendations(self) -> List[str]:
"""Generate compiler optimization recommendations"""
recommendations = []
-
+
# Analyze common missed optimizations
missed_passes = Counter()
for unit in self.project.compilation_units:
for remark in unit.remarks:
if remark.remark_type == RemarkType.MISSED:
missed_passes[remark.pass_name] += 1
-
- if missed_passes.get('loop-vectorize', 0) > 10:
- recommendations.append("Consider using -ffast-math for aggressive vectorization")
- if missed_passes.get('inline', 0) > 20:
+
+ if missed_passes.get("loop-vectorize", 0) > 10:
+ recommendations.append(
+ "Consider using -ffast-math for aggressive vectorization"
+ )
+ if missed_passes.get("inline", 0) > 20:
recommendations.append("Increase inlining thresholds with -finline-limit")
- if missed_passes.get('gvn', 0) > 5:
+ if missed_passes.get("gvn", 0) > 5:
recommendations.append("Enable aggressive optimization with -O3")
-
+
return recommendations
-
+
def _analyze_gpu_utilization(self) -> Dict[str, Any]:
"""Analyze GPU utilization patterns"""
gpu_remarks = []
for unit in self.project.compilation_units:
gpu_remarks.extend(unit.get_offloading_remarks())
-
- kernel_functions = set(r.function for r in gpu_remarks if '__omp_offloading_' in r.function)
+
+ kernel_functions = set(
+ r.function for r in gpu_remarks if "__omp_offloading_" in r.function
+ )
return {
"total_gpu_functions": len(kernel_functions),
- "optimization_coverage": len([r for r in gpu_remarks if r.remark_type == RemarkType.PASSED]) / len(gpu_remarks) if gpu_remarks else 0,
- "offloading_efficiency": self.analyze_offloading_efficiency()
- }
-
+ "optimization_coverage": len(
+ [r for r in gpu_remarks if r.remark_type == RemarkType.PASSED]
+ )
+ / len(gpu_remarks)
+ if gpu_remarks
+ else 0,
+ "offloading_efficiency": self.analyze_offloading_efficiency(),
+ }
+
def _analyze_memory_hierarchy(self) -> Dict[str, Any]:
"""Analyze memory access patterns for GPU"""
memory_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if (remark.is_offloading_related() and
- any(keyword in remark.get_message().lower() for keyword in
- ['memory', 'load', 'store', 'cache', 'shared', 'global'])):
+ if remark.is_offloading_related() and any(
+ keyword in remark.get_message().lower()
+ for keyword in [
+ "memory",
+ "load",
+ "store",
+ "cache",
+ "shared",
+ "global",
+ ]
+ ):
memory_remarks.append(remark)
-
+
return {
"memory_operations": len(memory_remarks),
- "optimized_memory": len([r for r in memory_remarks if r.remark_type == RemarkType.PASSED]),
- "memory_issues": len([r for r in memory_remarks if r.remark_type == RemarkType.MISSED])
- }
-
+ "optimized_memory": len(
+ [r for r in memory_remarks if r.remark_type == RemarkType.PASSED]
+ ),
+ "memory_issues": len(
+ [r for r in memory_remarks if r.remark_type == RemarkType.MISSED]
+ ),
+ }
+
def _analyze_compute_patterns(self) -> Dict[str, Any]:
"""Analyze compute utilization patterns"""
compute_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if (remark.is_offloading_related() and
- any(keyword in remark.get_message().lower() for keyword in
- ['compute', 'thread', 'warp', 'simd', 'vector'])):
+ if remark.is_offloading_related() and any(
+ keyword in remark.get_message().lower()
+ for keyword in ["compute", "thread", "warp", "simd", "vector"]
+ ):
compute_remarks.append(remark)
-
+
return {
"compute_operations": len(compute_remarks),
- "compute_efficiency": len([r for r in compute_remarks if r.remark_type == RemarkType.PASSED]) / len(compute_remarks) if compute_remarks else 0
- }
-
+ "compute_efficiency": len(
+ [r for r in compute_remarks if r.remark_type == RemarkType.PASSED]
+ )
+ / len(compute_remarks)
+ if compute_remarks
+ else 0,
+ }
+
def _analyze_offloading_patterns(self) -> Dict[str, Any]:
"""Analyze offloading patterns and effectiveness"""
offload_remarks = []
for unit in self.project.compilation_units:
offload_remarks.extend(unit.get_offloading_remarks())
-
- offload_functions = Counter(r.function for r in offload_remarks if '__omp_offloading_' in r.function)
+
+ offload_functions = Counter(
+ r.function for r in offload_remarks if "__omp_offloading_" in r.function
+ )
return {
"offloaded_functions": len(offload_functions),
"most_active_kernels": dict(offload_functions.most_common(5)),
- "offloading_success_rate": len([r for r in offload_remarks if r.remark_type == RemarkType.PASSED]) / len(offload_remarks) if offload_remarks else 0
- }
-
+ "offloading_success_rate": len(
+ [r for r in offload_remarks if r.remark_type == RemarkType.PASSED]
+ )
+ / len(offload_remarks)
+ if offload_remarks
+ else 0,
+ }
+
def _generate_architecture_recommendations(self) -> List[str]:
"""Generate architecture-specific recommendations"""
recommendations = []
-
+
offload_remarks = []
for unit in self.project.compilation_units:
offload_remarks.extend(unit.get_offloading_remarks())
-
+
if len(offload_remarks) > 0:
- efficiency = len([r for r in offload_remarks if r.remark_type == RemarkType.PASSED]) / len(offload_remarks)
+ efficiency = len(
+ [r for r in offload_remarks if r.remark_type == RemarkType.PASSED]
+ ) / len(offload_remarks)
if efficiency < 0.7:
recommendations.append("Consider optimizing GPU memory access patterns")
recommendations.append("Review data transfer between host and device")
-
+
vectorization_remarks = []
for unit in self.project.compilation_units:
for remark in unit.remarks:
- if 'vectorize' in remark.pass_name:
+ if "vectorize" in remark.pass_name:
vectorization_remarks.append(remark)
-
+
if len(vectorization_remarks) > 0:
- vec_efficiency = len([r for r in vectorization_remarks if r.remark_type == RemarkType.PASSED]) / len(vectorization_remarks)
+ vec_efficiency = len(
+ [r for r in vectorization_remarks if r.remark_type == RemarkType.PASSED]
+ ) / len(vectorization_remarks)
if vec_efficiency < 0.5:
recommendations.append("Consider SIMD-friendly data structures")
-
+
return recommendations
-
- def _get_common_failure_reasons(self, failed_remarks: List[OptimizationRemark]) -> List[str]:
+
+ def _get_common_failure_reasons(
+ self, failed_remarks: List[OptimizationRemark]
+ ) -> List[str]:
"""Extract common failure reasons from remarks"""
reasons = Counter()
for remark in failed_remarks:
message = remark.get_message().lower()
- if 'uncountable' in message:
- reasons['Uncountable loops'] += 1
- elif 'definition unavailable' in message:
- reasons['Function definition unavailable'] += 1
- elif 'clobbered' in message:
- reasons['Memory dependencies'] += 1
- elif 'impossible' in message:
- reasons['Vectorization impossible'] += 1
+ if "uncountable" in message:
+ reasons["Uncountable loops"] += 1
+ elif "definition unavailable" in message:
+ reasons["Function definition unavailable"] += 1
+ elif "clobbered" in message:
+ reasons["Memory dependencies"] += 1
+ elif "impossible" in message:
+ reasons["Vectorization impossible"] += 1
else:
- reasons['Other'] += 1
-
+ reasons["Other"] += 1
+
return [f"{reason}: {count}" for reason, count in reasons.most_common(5)]
-
+
def _analyze_runtime_profile(self) -> Dict[str, Any]:
"""Analyze runtime profiling data from LIBOMPTARGET_PROFILE"""
if not self.profile_data and not self.raw_trace_data:
return {}
-
+
# Prioritize raw Chrome Trace format data
if self.raw_trace_data:
return {
"trace_data": self.raw_trace_data,
- "performance_bottlenecks": self._identify_trace_bottlenecks(self.raw_trace_data),
- "optimization_recommendations": self._generate_trace_recommendations(self.raw_trace_data)
+ "performance_bottlenecks": self._identify_trace_bottlenecks(
+ self.raw_trace_data
+ ),
+ "optimization_recommendations": self._generate_trace_recommendations(
+ self.raw_trace_data
+ ),
}
-
+
# Handle Chrome Trace format from profile_data
- if hasattr(self.profile_data, 'trace_events') or isinstance(self.profile_data, dict):
- trace_data = self.profile_data if isinstance(self.profile_data, dict) else self.profile_data.__dict__
-
+ if hasattr(self.profile_data, "trace_events") or isinstance(
+ self.profile_data, dict
+ ):
+ trace_data = (
+ self.profile_data
+ if isinstance(self.profile_data, dict)
+ else self.profile_data.__dict__
+ )
+
return {
"trace_data": trace_data,
"performance_bottlenecks": self._identify_trace_bottlenecks(trace_data),
- "optimization_recommendations": self._generate_trace_recommendations(trace_data)
+ "optimization_recommendations": self._generate_trace_recommendations(
+ trace_data
+ ),
}
-
+
# Fallback to original ProfileData structure (if implemented)
- if self.profile_data and hasattr(self.profile_data, 'total_time'):
+ if self.profile_data and hasattr(self.profile_data, "total_time"):
return {
"runtime_performance": {
"total_execution_time_us": self.profile_data.total_time,
"device_time_us": self.profile_data.device_time,
"host_time_us": self.profile_data.host_time,
"memory_transfer_time_us": self.profile_data.memory_transfer_time,
- "device_utilization_percent": (self.profile_data.device_time / self.profile_data.total_time * 100) if self.profile_data.total_time > 0 else 0
+ "device_utilization_percent": (
+ self.profile_data.device_time
+ / self.profile_data.total_time
+ * 100
+ )
+ if self.profile_data.total_time > 0
+ else 0,
},
"kernel_performance": [
{
"name": kernel.name,
"execution_time_us": kernel.execution_time,
"launch_time_us": kernel.launch_time,
"device_id": kernel.device_id,
"grid_size": kernel.grid_size,
- "block_size": kernel.block_size
- } for kernel in self.profile_data.kernels
+ "block_size": kernel.block_size,
+ }
+ for kernel in self.profile_data.kernels
],
"performance_bottlenecks": self._identify_performance_bottlenecks(),
- "optimization_recommendations": self._generate_runtime_recommendations()
+ "optimization_recommendations": self._generate_runtime_recommendations(),
}
-
+
return {}
- def _identify_trace_bottlenecks(self, trace_data: Dict[str, Any]) -> List[Dict[str, Any]]:
+ def _identify_trace_bottlenecks(
+ self, trace_data: Dict[str, Any]
+ ) -> List[Dict[str, Any]]:
"""Identify performance bottlenecks from Chrome Trace format data"""
- if not trace_data or 'traceEvents' not in trace_data:
+ if not trace_data or "traceEvents" not in trace_data:
return []
-
+
bottlenecks = []
- events = [e for e in trace_data['traceEvents'] if e.get('ph') == 'X' and e.get('dur')]
-
+ events = [
+ e for e in trace_data["traceEvents"] if e.get("ph") == "X" and e.get("dur")
+ ]
+
if not events:
return bottlenecks
-
+
# Calculate time distributions
- kernel_events = [e for e in events if 'target exe' in e.get('name', '')]
- memory_events = [e for e in events if any(term in e.get('name', '') for term in ['HostToDev', 'DevToHost'])]
- init_events = [e for e in events if 'init' in e.get('name', '').lower()]
-
- total_time = max(e['ts'] + e['dur'] for e in events) - min(e['ts'] for e in events)
- kernel_time = sum(e['dur'] for e in kernel_events)
- memory_time = sum(e['dur'] for e in memory_events)
- init_time = sum(e['dur'] for e in init_events)
-
+ kernel_events = [e for e in events if "target exe" in e.get("name", "")]
+ memory_events = [
+ e
+ for e in events
+ if any(term in e.get("name", "") for term in ["HostToDev", "DevToHost"])
+ ]
+ init_events = [e for e in events if "init" in e.get("name", "").lower()]
+
+ total_time = max(e["ts"] + e["dur"] for e in events) - min(
+ e["ts"] for e in events
+ )
+ kernel_time = sum(e["dur"] for e in kernel_events)
+ memory_time = sum(e["dur"] for e in memory_events)
+ init_time = sum(e["dur"] for e in init_events)
+
# Memory transfer bottleneck
if memory_time > kernel_time * 0.5:
- bottlenecks.append({
- "type": "memory_transfer",
- "severity": "high",
- "description": "Memory transfers take significant time compared to kernel execution",
- "impact_percent": (memory_time / total_time) * 100
- })
-
+ bottlenecks.append(
+ {
+ "type": "memory_transfer",
+ "severity": "high",
+ "description": "Memory transfers take significant time compared to kernel execution",
+ "impact_percent": (memory_time / total_time) * 100,
+ }
+ )
+
# Initialization overhead
if init_time > total_time * 0.3:
- bottlenecks.append({
- "type": "initialization_overhead",
- "severity": "medium",
- "description": "Initialization takes significant portion of total execution time",
- "impact_percent": (init_time / total_time) * 100
- })
-
+ bottlenecks.append(
+ {
+ "type": "initialization_overhead",
+ "severity": "medium",
+ "description": "Initialization takes significant portion of total execution time",
+ "impact_percent": (init_time / total_time) * 100,
+ }
+ )
+
# Kernel utilization
kernel_util = (kernel_time / total_time) * 100 if total_time > 0 else 0
if kernel_util < 30:
- bottlenecks.append({
- "type": "low_kernel_utilization",
- "severity": "medium",
- "description": f"Kernel utilization is only {kernel_util:.1f}%",
- "impact_percent": 100 - kernel_util
- })
-
+ bottlenecks.append(
+ {
+ "type": "low_kernel_utilization",
+ "severity": "medium",
+ "description": f"Kernel utilization is only {kernel_util:.1f}%",
+ "impact_percent": 100 - kernel_util,
+ }
+ )
+
# Kernel execution imbalance
if len(kernel_events) > 1:
- durations = [e['dur'] for e in kernel_events]
+ durations = [e["dur"] for e in kernel_events]
max_dur = max(durations)
min_dur = min(durations)
if max_dur > min_dur * 3: # 3x difference indicates imbalance
- bottlenecks.append({
- "type": "kernel_imbalance",
- "severity": "low",
- "description": "Significant execution time variance between kernel launches",
- "impact_percent": ((max_dur - min_dur) / max_dur) * 100
- })
-
+ bottlenecks.append(
+ {
+ "type": "kernel_imbalance",
+ "severity": "low",
+ "description": "Significant execution time variance between kernel launches",
+ "impact_percent": ((max_dur - min_dur) / max_dur) * 100,
+ }
+ )
+
return bottlenecks
def _generate_trace_recommendations(self, trace_data: Dict[str, Any]) -> List[str]:
"""Generate optimization recommendations based on Chrome Trace format data"""
- if not trace_data or 'traceEvents' not in trace_data:
+ if not trace_data or "traceEvents" not in trace_data:
return []
-
+
recommendations = []
- events = [e for e in trace_data['traceEvents'] if e.get('ph') == 'X' and e.get('dur')]
-
+ events = [
+ e for e in trace_data["traceEvents"] if e.get("ph") == "X" and e.get("dur")
+ ]
+
if not events:
return recommendations
-
- kernel_events = [e for e in events if 'target exe' in e.get('name', '')]
- memory_events = [e for e in events if any(term in e.get('name', '') for term in ['HostToDev', 'DevToHost'])]
-
- total_time = max(e['ts'] + e['dur'] for e in events) - min(e['ts'] for e in events)
- kernel_time = sum(e['dur'] for e in kernel_events)
- memory_time = sum(e['dur'] for e in memory_events)
-
+
+ kernel_events = [e for e in events if "target exe" in e.get("name", "")]
+ memory_events = [
+ e
+ for e in events
+ if any(term in e.get("name", "") for term in ["HostToDev", "DevToHost"])
+ ]
+
+ total_time = max(e["ts"] + e["dur"] for e in events) - min(
+ e["ts"] for e in events
+ )
+ kernel_time = sum(e["dur"] for e in kernel_events)
+ memory_time = sum(e["dur"] for e in memory_events)
+
# Memory transfer recommendations
if memory_time > kernel_time * 0.3:
- recommendations.append("Consider optimizing data transfers - use unified memory or asynchronous transfers")
- recommendations.append("Reduce data movement by keeping data on device between kernel launches")
-
+ recommendations.append(
+ "Consider optimizing data transfers - use unified memory or asynchronous transfers"
+ )
+ recommendations.append(
+ "Reduce data movement by keeping data on device between kernel launches"
+ )
+
# Kernel execution recommendations
if len(kernel_events) > 5:
avg_kernel_time = kernel_time / len(kernel_events)
if avg_kernel_time < 100: # Very short kernels (< 100μs)
- recommendations.append("Consider kernel fusion to reduce launch overhead")
-
+ recommendations.append(
+ "Consider kernel fusion to reduce launch overhead"
+ )
+
# Device utilization recommendations
kernel_util = (kernel_time / total_time) * 100 if total_time > 0 else 0
if kernel_util < 50:
- recommendations.append("Increase workload size or use multiple devices to improve utilization")
- recommendations.append("Consider overlapping computation with data transfers")
-
+ recommendations.append(
+ "Increase workload size or use multiple devices to improve utilization"
+ )
+ recommendations.append(
+ "Consider overlapping computation with data transfers"
+ )
+
# Specific kernel analysis
if kernel_events:
for event in kernel_events:
- detail = event.get('args', {}).get('detail', '')
- if 'NumTeams=0' in detail:
- recommendations.append("Some kernels launch with NumTeams=0 - verify OpenMP target directives")
-
+ detail = event.get("args", {}).get("detail", "")
+ if "NumTeams=0" in detail:
+ recommendations.append(
+ "Some kernels launch with NumTeams=0 - verify OpenMP target directives"
+ )
+
return recommendations
-
+
def _identify_performance_bottlenecks(self) -> List[Dict[str, Any]]:
"""Identify performance bottlenecks from runtime data"""
if not self.profile_data:
return []
-
+
bottlenecks = []
-
+
# Memory transfer bottleneck
if self.profile_data.memory_transfer_time > self.profile_data.device_time * 0.5:
- bottlenecks.append({
- "type": "memory_transfer",
- "severity": "high",
- "description": "Memory transfers take significant time compared to computation",
- "impact_percent": (self.profile_data.memory_transfer_time / self.profile_data.total_time) * 100
- })
-
+ bottlenecks.append(
+ {
+ "type": "memory_transfer",
+ "severity": "high",
+ "description": "Memory transfers take significant time compared to computation",
+ "impact_percent": (
+ self.profile_data.memory_transfer_time
+ / self.profile_data.total_time
+ )
+ * 100,
+ }
+ )
+
# Low device utilization
- device_util = (self.profile_data.device_time / self.profile_data.total_time) * 100 if self.profile_data.total_time > 0 else 0
+ device_util = (
+ (self.profile_data.device_time / self.profile_data.total_time) * 100
+ if self.profile_data.total_time > 0
+ else 0
+ )
if device_util < 50:
- bottlenecks.append({
- "type": "low_device_utilization",
- "severity": "medium",
- "description": f"Device utilization is only {device_util:.1f}%",
- "impact_percent": 100 - device_util
- })
-
+ bottlenecks.append(
+ {
+ "type": "low_device_utilization",
+ "severity": "medium",
+ "description": f"Device utilization is only {device_util:.1f}%",
+ "impact_percent": 100 - device_util,
+ }
+ )
+
# Kernel execution imbalance
if len(self.profile_data.kernels) > 1:
execution_times = [k.execution_time for k in self.profile_data.kernels]
max_time = max(execution_times)
min_time = min(execution_times)
if max_time > min_time * 5: # 5x difference indicates imbalance
- bottlenecks.append({
- "type": "kernel_imbalance",
- "severity": "medium",
- "description": "Significant execution time variance between kernels",
- "impact_percent": ((max_time - min_time) / max_time) * 100
- })
-
+ bottlenecks.append(
+ {
+ "type": "kernel_imbalance",
+ "severity": "medium",
+ "description": "Significant execution time variance between kernels",
+ "impact_percent": ((max_time - min_time) / max_time) * 100,
+ }
+ )
+
return bottlenecks
-
+
def _generate_runtime_recommendations(self) -> List[str]:
"""Generate optimization recommendations based on runtime data"""
if not self.profile_data:
return []
-
+
recommendations = []
-
+
# Memory transfer optimization
if self.profile_data.memory_transfer_time > self.profile_data.device_time * 0.3:
- recommendations.append("Consider data layout optimizations to reduce memory transfers")
- recommendations.append("Use unified memory or async transfers where possible")
-
+ recommendations.append(
+ "Consider data layout optimizations to reduce memory transfers"
+ )
+ recommendations.append(
+ "Use unified memory or async transfers where possible"
+ )
+
# Device utilization
- device_util = (self.profile_data.device_time / self.profile_data.total_time) * 100 if self.profile_data.total_time > 0 else 0
+ device_util = (
+ (self.profile_data.device_time / self.profile_data.total_time) * 100
+ if self.profile_data.total_time > 0
+ else 0
+ )
if device_util < 70:
- recommendations.append("Increase workload size or use multiple kernels to improve device utilization")
-
+ recommendations.append(
+ "Increase workload size or use multiple kernels to improve device utilization"
+ )
+
# Kernel optimization
for kernel in self.profile_data.kernels:
if kernel.execution_time < 1000: # Less than 1ms
- recommendations.append(f"Kernel '{kernel.name}' has very short execution time - consider kernel fusion")
-
+ recommendations.append(
+ f"Kernel '{kernel.name}' has very short execution time - consider kernel fusion"
+ )
+
return recommendations
-
+
def get_comprehensive_analysis(self) -> Dict[str, Any]:
return {
"project_summary": self.project.get_project_summary(),
"optimization_opportunities": self.analyze_optimization_opportunities(),
"performance_hotspots": self.analyze_performance_hotspots(),
"offloading_efficiency": self.analyze_offloading_efficiency(),
"profiling_data": self.analyze_profiling_data(),
"optimization_insights": self.analyze_optimization_insights(),
- "hardware_insights": self.analyze_hardware_insights()
- }
+ "hardware_insights": self.analyze_hardware_insights(),
+ }
--- models.py 2025-07-08 02:57:37.000000 +0000
+++ models.py 2025-07-08 02:59:59.000147 +0000
@@ -1,197 +1,268 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# LLVM Advisor Data Models - Structured data models for optimization
# analysis, remarks, and profiling information.
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any, Union
from enum import Enum
from pathlib import Path
class RemarkType(Enum):
PASSED = "Passed"
- MISSED = "Missed"
+ MISSED = "Missed"
ANALYSIS = "Analysis"
@dataclass
class DebugLocation:
file: str
line: int
column: int
-
+
@classmethod
- def from_dict(cls, data: Dict[str, Any]) -> Optional['DebugLocation']:
- return cls(
- file=data.get('File', '<unknown>'),
- line=int(data.get('Line', 0)),
- column=int(data.get('Column', 0))
- ) if data else None
+ def from_dict(cls, data: Dict[str, Any]) -> Optional["DebugLocation"]:
+ return (
+ cls(
+ file=data.get("File", "<unknown>"),
+ line=int(data.get("Line", 0)),
+ column=int(data.get("Column", 0)),
+ )
+ if data
+ else None
+ )
@dataclass
class RemarkArgument:
type: str
value: Any
debug_loc: Optional[DebugLocation] = None
-
+
@classmethod
- def from_dict(cls, data: Union[str, Dict[str, Any]]) -> 'RemarkArgument':
+ def from_dict(cls, data: Union[str, Dict[str, Any]]) -> "RemarkArgument":
if isinstance(data, str):
return cls(type="String", value=data)
-
+
# Consolidated argument type mapping
type_mappings = {
# Basic types
"String": "String",
"DebugLoc": None, # Special handling
# Function/code references
- **{key: key for key in ["Callee", "Caller", "Function", "BasicBlock", "Inst",
- "Call", "OtherAccess", "ClobberedBy", "InfavorOfValue", "Reason"]},
+ **{
+ key: key
+ for key in [
+ "Callee",
+ "Caller",
+ "Function",
+ "BasicBlock",
+ "Inst",
+ "Call",
+ "OtherAccess",
+ "ClobberedBy",
+ "InfavorOfValue",
+ "Reason",
+ ]
+ },
# Metrics and counts
- **{key: key for key in ["Type", "Cost", "Threshold", "VectorizationFactor",
- "InterleaveCount", "NumVRCopies", "TotalCopiesCost",
- "NumStackBytes", "NumInstructions", "Line", "Column"]},
+ **{
+ key: key
+ for key in [
+ "Type",
+ "Cost",
+ "Threshold",
+ "VectorizationFactor",
+ "InterleaveCount",
+ "NumVRCopies",
+ "TotalCopiesCost",
+ "NumStackBytes",
+ "NumInstructions",
+ "Line",
+ "Column",
+ ]
+ },
# OpenMP/GPU specific
- **{key: key for key in ["ExternalNotKernel", "GlobalsSize", "LocalVarSize",
- "NumRegions", "RegisterPressurerValue", "AssumedAddressSpace",
- "SPMDCompatibilityTracker", "GlobalizationLevel", "AddressSpace"]}
+ **{
+ key: key
+ for key in [
+ "ExternalNotKernel",
+ "GlobalsSize",
+ "LocalVarSize",
+ "NumRegions",
+ "RegisterPressurerValue",
+ "AssumedAddressSpace",
+ "SPMDCompatibilityTracker",
+ "GlobalizationLevel",
+ "AddressSpace",
+ ]
+ },
}
-
+
arg_type = None
value = None
debug_loc = None
-
+
for key, val in data.items():
if key == "DebugLoc":
debug_loc = DebugLocation.from_dict(val)
elif key in type_mappings:
arg_type = type_mappings[key]
value = val
break
-
+
return cls(type=arg_type or "Unknown", value=value, debug_loc=debug_loc)
- at dataclass
+ at dataclass
class OptimizationRemark:
remark_type: RemarkType
pass_name: str
remark_name: str
function: str
debug_loc: Optional[DebugLocation]
args: List[RemarkArgument]
-
+
@classmethod
- def from_dict(cls, data: Dict[str, Any]) -> 'OptimizationRemark':
+ def from_dict(cls, data: Dict[str, Any]) -> "OptimizationRemark":
remark_type = {
- 'Passed': RemarkType.PASSED,
- 'Missed': RemarkType.MISSED,
- 'Analysis': RemarkType.ANALYSIS
- }.get(data.get('_remark_type'), RemarkType.MISSED)
-
+ "Passed": RemarkType.PASSED,
+ "Missed": RemarkType.MISSED,
+ "Analysis": RemarkType.ANALYSIS,
+ }.get(data.get("_remark_type"), RemarkType.MISSED)
+
return cls(
remark_type=remark_type,
- pass_name=data.get('Pass', 'unknown'),
- remark_name=data.get('Name', 'unknown'),
- function=data.get('Function', 'unknown'),
- debug_loc=DebugLocation.from_dict(data.get('DebugLoc', {})),
- args=[RemarkArgument.from_dict(arg) for arg in data.get('Args', [])]
+ pass_name=data.get("Pass", "unknown"),
+ remark_name=data.get("Name", "unknown"),
+ function=data.get("Function", "unknown"),
+ debug_loc=DebugLocation.from_dict(data.get("DebugLoc", {})),
+ args=[RemarkArgument.from_dict(arg) for arg in data.get("Args", [])],
)
-
+
def get_message(self) -> str:
parts = []
for arg in self.args:
if arg.type == "String":
parts.append(str(arg.value))
else:
parts.append(f"[{arg.type}: {arg.value}]")
return "".join(parts)
-
+
def is_offloading_related(self) -> bool:
"""Enhanced detection of OpenMP offloading and GPU-related remarks"""
offloading_indicators = [
- "__omp_offloading", "omp_outlined", "target", "kernel-info", "offload",
- "gpu", "cuda", "opencl", "sycl", "hip", "device", "host",
- "ExternalNotKernel", "SPMDCompatibilityTracker", "GlobalizationLevel",
- "NumRegions", "AddressSpace", "AssumedAddressSpace"
+ "__omp_offloading",
+ "omp_outlined",
+ "target",
+ "kernel-info",
+ "offload",
+ "gpu",
+ "cuda",
+ "opencl",
+ "sycl",
+ "hip",
+ "device",
+ "host",
+ "ExternalNotKernel",
+ "SPMDCompatibilityTracker",
+ "GlobalizationLevel",
+ "NumRegions",
+ "AddressSpace",
+ "AssumedAddressSpace",
]
-
+
# OpenMP-specific pass names that indicate offloading
openmp_passes = [
- "openmp-opt", "kernel-info", "openmp", "target-region",
- "offload", "device-lower", "gpu-lower"
+ "openmp-opt",
+ "kernel-info",
+ "openmp",
+ "target-region",
+ "offload",
+ "device-lower",
+ "gpu-lower",
]
-
+
# Check function name, pass name, and remark name
text_to_check = f"{self.function} {self.pass_name} {self.remark_name}".lower()
if any(indicator in text_to_check for indicator in offloading_indicators):
return True
-
+
# Check if this is an OpenMP-related pass
if any(omp_pass in self.pass_name.lower() for omp_pass in openmp_passes):
return True
-
+
# Check argument values for GPU/offloading specific content
for arg in self.args:
- if hasattr(arg, 'value') and arg.value:
+ if hasattr(arg, "value") and arg.value:
arg_text = str(arg.value).lower()
if any(indicator in arg_text for indicator in offloading_indicators):
return True
-
+
# Check for specific OpenMP argument types
- if hasattr(arg, 'type') and arg.type in [
- "ExternalNotKernel", "SPMDCompatibilityTracker", "GlobalizationLevel",
- "NumRegions", "AddressSpace", "AssumedAddressSpace"
+ if hasattr(arg, "type") and arg.type in [
+ "ExternalNotKernel",
+ "SPMDCompatibilityTracker",
+ "GlobalizationLevel",
+ "NumRegions",
+ "AddressSpace",
+ "AssumedAddressSpace",
]:
return True
-
+
# Check message content for OpenMP target-related keywords
message = self.get_message().lower()
target_keywords = [
- "target region", "target directive", "offload", "device",
- "kernel", "gpu", "accelerator", "teams", "thread_limit"
+ "target region",
+ "target directive",
+ "offload",
+ "device",
+ "kernel",
+ "gpu",
+ "accelerator",
+ "teams",
+ "thread_limit",
]
if any(keyword in message for keyword in target_keywords):
return True
-
+
return False
@dataclass
class CompilationUnit:
source_file: str
remarks: List[OptimizationRemark] = field(default_factory=list)
build_time: Optional[float] = None
optimization_level: Optional[str] = None
target_arch: Optional[str] = None
-
+
def get_remarks_by_pass(self) -> Dict[str, List[OptimizationRemark]]:
result = {}
for remark in self.remarks:
result.setdefault(remark.pass_name, []).append(remark)
return result
-
+
def get_remarks_by_type(self) -> Dict[RemarkType, List[OptimizationRemark]]:
result = {t: [] for t in RemarkType}
for remark in self.remarks:
result[remark.remark_type].append(remark)
return result
-
+
def get_offloading_remarks(self) -> List[OptimizationRemark]:
return [r for r in self.remarks if r.is_offloading_related()]
-
+
def get_summary(self) -> Dict[str, Any]:
by_type = self.get_remarks_by_type()
return {
"source_file": self.source_file,
"total_remarks": len(self.remarks),
@@ -200,107 +271,115 @@
"analysis": len(by_type[RemarkType.ANALYSIS]),
"offloading_remarks": len(self.get_offloading_remarks()),
"passes_involved": list(self.get_remarks_by_pass().keys()),
"build_time": self.build_time,
"optimization_level": self.optimization_level,
- "target_arch": self.target_arch
+ "target_arch": self.target_arch,
}
-
+
def deduplicate_remarks(self):
"""Remove duplicate remarks based on key attributes."""
seen = set()
unique_remarks = []
-
+
for remark in self.remarks:
# Create a key based on remark attributes
key = (
remark.remark_type,
remark.pass_name,
remark.remark_name,
remark.function,
remark.debug_loc.line if remark.debug_loc else 0,
- remark.debug_loc.column if remark.debug_loc else 0
+ remark.debug_loc.column if remark.debug_loc else 0,
)
-
+
if key not in seen:
seen.add(key)
unique_remarks.append(remark)
-
+
self.remarks = unique_remarks
def load_source_code(self, source_directory: str = ".") -> Optional[str]:
"""Load the actual source code for this file."""
possible_paths = [
Path(source_directory) / self.source_file,
Path(source_directory) / "src" / self.source_file,
Path(source_directory) / "source" / self.source_file,
]
-
+
for path in possible_paths:
if path.exists() and path.is_file():
try:
- with open(path, 'r', encoding='utf-8') as f:
+ with open(path, "r", encoding="utf-8") as f:
return f.read()
except Exception:
continue
return None
-
+
def get_source_lines(self, source_directory: str = ".") -> List[str]:
"""Get source code as list of lines."""
source_code = self.load_source_code(source_directory)
- return source_code.split('\n') if source_code else []
+ return source_code.split("\n") if source_code else []
@dataclass
class Project:
name: str
compilation_units: List[CompilationUnit] = field(default_factory=list)
build_system: Optional[str] = None
-
+
def add_compilation_unit(self, unit: CompilationUnit):
self.compilation_units.append(unit)
-
+
def get_project_summary(self) -> Dict[str, Any]:
total_remarks = sum(len(unit.remarks) for unit in self.compilation_units)
- total_offloading = sum(len(unit.get_offloading_remarks()) for unit in self.compilation_units)
+ total_offloading = sum(
+ len(unit.get_offloading_remarks()) for unit in self.compilation_units
+ )
all_passes = set()
-
+
# Collect type counts across all units
type_counts = {remark_type: 0 for remark_type in RemarkType}
pass_counts = {}
-
+
for unit in self.compilation_units:
all_passes.update(unit.get_remarks_by_pass().keys())
by_type = unit.get_remarks_by_type()
for remark_type, remarks in by_type.items():
type_counts[remark_type] += len(remarks)
-
+
for remark in unit.remarks:
pass_counts[remark.pass_name] = pass_counts.get(remark.pass_name, 0) + 1
-
+
return {
"project_name": self.name,
"compilation_units": len(self.compilation_units),
- "total_files": len(self.compilation_units), # Added for frontend compatibility
+ "total_files": len(
+ self.compilation_units
+ ), # Added for frontend compatibility
"total_remarks": total_remarks,
"total_offloading_remarks": total_offloading,
"total_passed": type_counts[RemarkType.PASSED],
- "total_missed": type_counts[RemarkType.MISSED],
+ "total_missed": type_counts[RemarkType.MISSED],
"total_analysis": type_counts[RemarkType.ANALYSIS],
"unique_passes": list(all_passes),
"pass_counts": pass_counts,
- "most_active_passes": sorted(pass_counts.items(), key=lambda x: x[1], reverse=True)[:10],
- "build_system": self.build_system
+ "most_active_passes": sorted(
+ pass_counts.items(), key=lambda x: x[1], reverse=True
+ )[:10],
+ "build_system": self.build_system,
}
-
+
def get_file_list(self) -> List[Dict[str, Any]]:
files = []
for unit in self.compilation_units:
summary = unit.get_summary()
- files.append({
- "file": unit.source_file,
- "remarks": summary["total_remarks"],
- "passed": summary["passed"],
- "missed": summary["missed"],
- "offloading": summary["offloading_remarks"]
- })
+ files.append(
+ {
+ "file": unit.source_file,
+ "remarks": summary["total_remarks"],
+ "passed": summary["passed"],
+ "missed": summary["missed"],
+ "offloading": summary["offloading_remarks"],
+ }
+ )
return files
--- profile_parser.py 2025-07-08 02:57:37.000000 +0000
+++ profile_parser.py 2025-07-08 02:59:59.097049 +0000
@@ -1,23 +1,24 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# LLVM Advisor Profile Parser - Parses Chrome Trace format profiling
# data generated by LLVM OpenMP runtime and other profiling tools.
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
import json
import os
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from pathlib import Path
+
@dataclass
class ProfileEvent:
name: str
category: str
@@ -26,163 +27,184 @@
duration: float
process_id: int
thread_id: int
args: Dict[str, Any]
+
@dataclass
class KernelProfile:
name: str
launch_time: float
execution_time: float
memory_transfers: List[Dict[str, Any]]
device_id: int
grid_size: Optional[tuple] = None
block_size: Optional[tuple] = None
+
@dataclass
class ProfileData:
events: List[ProfileEvent]
kernels: List[KernelProfile]
total_time: float
device_time: float
host_time: float
memory_transfer_time: float
+
class ProfileParser:
def __init__(self, profile_path: str):
self.profile_path = Path(profile_path)
self.profile_data: Optional[ProfileData] = None
-
+
def parse(self) -> Optional[ProfileData]:
if not self.profile_path.exists():
return None
-
+
try:
- with open(self.profile_path, 'r') as f:
+ with open(self.profile_path, "r") as f:
raw_data = json.load(f)
-
+
# Store raw data for Chrome Trace format compatibility
self.raw_data = raw_data
-
- events = self._parse_events(raw_data.get('traceEvents', []))
+
+ events = self._parse_events(raw_data.get("traceEvents", []))
kernels = self._extract_kernels(events)
-
+
self.profile_data = ProfileData(
events=events,
kernels=kernels,
total_time=self._calculate_total_time(events),
device_time=self._calculate_device_time(events),
host_time=self._calculate_host_time(events),
- memory_transfer_time=self._calculate_memory_transfer_time(events)
+ memory_transfer_time=self._calculate_memory_transfer_time(events),
)
-
+
return self.profile_data
-
+
except (json.JSONDecodeError, KeyError, IOError) as e:
print(f"Error parsing profile file: {e}")
return None
-
+
def get_raw_data(self) -> Optional[Dict[str, Any]]:
"""Return raw Chrome Trace format data for frontend processing"""
- return getattr(self, 'raw_data', None)
-
+ return getattr(self, "raw_data", None)
+
def _parse_events(self, trace_events: List[Dict]) -> List[ProfileEvent]:
events = []
for event in trace_events:
try:
- events.append(ProfileEvent(
- name=event.get('name', ''),
- category=event.get('cat', ''),
- phase=event.get('ph', ''),
- timestamp=float(event.get('ts', 0)),
- duration=float(event.get('dur', 0)),
- process_id=int(event.get('pid', 0)),
- thread_id=int(event.get('tid', 0)),
- args=event.get('args', {})
- ))
+ events.append(
+ ProfileEvent(
+ name=event.get("name", ""),
+ category=event.get("cat", ""),
+ phase=event.get("ph", ""),
+ timestamp=float(event.get("ts", 0)),
+ duration=float(event.get("dur", 0)),
+ process_id=int(event.get("pid", 0)),
+ thread_id=int(event.get("tid", 0)),
+ args=event.get("args", {}),
+ )
+ )
except (ValueError, TypeError):
continue
return events
-
+
def _extract_kernels(self, events: List[ProfileEvent]) -> List[KernelProfile]:
kernels = []
# Look for OpenMP target execution events
- kernel_events = [e for e in events if 'target exe' in e.name or 'Runtime: target exe' in e.name]
-
+ kernel_events = [
+ e
+ for e in events
+ if "target exe" in e.name or "Runtime: target exe" in e.name
+ ]
+
for event in kernel_events:
- if event.phase == 'X': # Complete event
+ if event.phase == "X": # Complete event
# Parse kernel details from args.detail
- detail = event.args.get('detail', '')
- parts = detail.split(';') if detail else []
-
+ detail = event.args.get("detail", "")
+ parts = detail.split(";") if detail else []
+
kernel = KernelProfile(
name=event.name,
launch_time=event.timestamp,
execution_time=event.duration,
memory_transfers=[],
- device_id=event.args.get('device_id', 0),
+ device_id=event.args.get("device_id", 0),
grid_size=self._parse_grid_size_from_detail(parts),
- block_size=self._parse_block_size_from_detail(parts)
+ block_size=self._parse_block_size_from_detail(parts),
)
kernels.append(kernel)
-
+
return kernels
-
+
def _parse_grid_size_from_detail(self, parts: List[str]) -> Optional[tuple]:
# Extract grid size from OpenMP detail string if available
for part in parts:
- if 'NumTeams=' in part:
+ if "NumTeams=" in part:
try:
- teams = int(part.split('=')[1])
+ teams = int(part.split("=")[1])
return (teams,) if teams > 0 else None
except (ValueError, IndexError):
pass
return None
-
+
def _parse_block_size_from_detail(self, parts: List[str]) -> Optional[tuple]:
# Extract block size from OpenMP detail string if available
# OpenMP doesn't directly map to CUDA block sizes, but we can use thread info if available
return None # Not available in OpenMP trace format
-
+
def _parse_grid_size(self, args: Dict) -> Optional[tuple]:
- if 'grid_size' in args:
- return tuple(args['grid_size'])
+ if "grid_size" in args:
+ return tuple(args["grid_size"])
return None
-
+
def _parse_block_size(self, args: Dict) -> Optional[tuple]:
- if 'block_size' in args:
- return tuple(args['block_size'])
+ if "block_size" in args:
+ return tuple(args["block_size"])
return None
-
+
def _calculate_total_time(self, events: List[ProfileEvent]) -> float:
if not events:
return 0.0
start_time = min(e.timestamp for e in events)
end_time = max(e.timestamp + e.duration for e in events if e.duration > 0)
return end_time - start_time
-
+
def _calculate_device_time(self, events: List[ProfileEvent]) -> float:
# For OpenMP, consider target execution as device time
- return sum(e.duration for e in events if 'target exe' in e.name or 'kernel' in e.name.lower())
-
+ return sum(
+ e.duration
+ for e in events
+ if "target exe" in e.name or "kernel" in e.name.lower()
+ )
+
def _calculate_host_time(self, events: List[ProfileEvent]) -> float:
# Host time is everything that's not device execution
device_time = self._calculate_device_time(events)
total_time = self._calculate_total_time(events)
return total_time - device_time
-
+
def _calculate_memory_transfer_time(self, events: List[ProfileEvent]) -> float:
- return sum(e.duration for e in events if 'HostToDev' in e.name or 'DevToHost' in e.name)
-
+ return sum(
+ e.duration for e in events if "HostToDev" in e.name or "DevToHost" in e.name
+ )
+
def get_summary(self) -> Dict[str, Any]:
if not self.profile_data:
return {}
-
+
return {
- 'total_kernels': len(self.profile_data.kernels),
- 'total_time_us': self.profile_data.total_time,
- 'device_time_us': self.profile_data.device_time,
- 'host_time_us': self.profile_data.host_time,
- 'memory_transfer_time_us': self.profile_data.memory_transfer_time,
- 'device_utilization': (self.profile_data.device_time / self.profile_data.total_time * 100) if self.profile_data.total_time > 0 else 0,
- 'top_kernels': sorted(self.profile_data.kernels, key=lambda k: k.execution_time, reverse=True)[:10]
- }
+ "total_kernels": len(self.profile_data.kernels),
+ "total_time_us": self.profile_data.total_time,
+ "device_time_us": self.profile_data.device_time,
+ "host_time_us": self.profile_data.host_time,
+ "memory_transfer_time_us": self.profile_data.memory_transfer_time,
+ "device_utilization": (
+ self.profile_data.device_time / self.profile_data.total_time * 100
+ )
+ if self.profile_data.total_time > 0
+ else 0,
+ "top_kernels": sorted(
+ self.profile_data.kernels, key=lambda k: k.execution_time, reverse=True
+ )[:10],
+ }
--- yaml_parser.py 2025-07-08 02:57:37.000000 +0000
+++ yaml_parser.py 2025-07-08 02:59:59.151234 +0000
@@ -1,168 +1,178 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
# See https://llvm.org/LICENSE.txt for license information.
# SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
#
# LLVM Advisor Parser - Parses YAML optimization data and profiling
# files generated by LLVM and creates structured data models.
#
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
import yaml
import glob
import json
from typing import List, Dict, Any, Optional
from pathlib import Path
-from .models import OptimizationRemark, CompilationUnit, Project, RemarkType, DebugLocation
+from .models import (
+ OptimizationRemark,
+ CompilationUnit,
+ Project,
+ RemarkType,
+ DebugLocation,
+)
class YAMLLoader(yaml.SafeLoader):
pass
def construct_remark(tag):
def constructor(loader, node):
value = loader.construct_mapping(node)
- value['_remark_type'] = tag
+ value["_remark_type"] = tag
return value
+
return constructor
-for tag in ['Passed', 'Missed', 'Analysis']:
- YAMLLoader.add_constructor(f'!{tag}', construct_remark(tag))
+for tag in ["Passed", "Missed", "Analysis"]:
+ YAMLLoader.add_constructor(f"!{tag}", construct_remark(tag))
class OptimizationRecordParser:
def __init__(self):
self.remark_type_map = {
- 'Passed': RemarkType.PASSED,
- 'Missed': RemarkType.MISSED,
- 'Analysis': RemarkType.ANALYSIS
+ "Passed": RemarkType.PASSED,
+ "Missed": RemarkType.MISSED,
+ "Analysis": RemarkType.ANALYSIS,
}
- self.source_extensions = {'c', 'cpp', 'cc', 'cxx', 'cu', 'f', 'f90', 'f95'}
-
+ self.source_extensions = {"c", "cpp", "cc", "cxx", "cu", "f", "f90", "f95"}
+
def parse_yaml_file(self, yaml_path: str) -> List[OptimizationRemark]:
"""Parse a single YAML file and return list of remarks"""
remarks = []
try:
- with open(yaml_path, 'r', encoding='utf-8') as f:
+ with open(yaml_path, "r", encoding="utf-8") as f:
documents = yaml.load_all(f, Loader=YAMLLoader)
-
+
for doc in documents:
if not doc:
continue
-
+
remark_type = self.remark_type_map.get(
- doc.get('_remark_type'),
- RemarkType.MISSED
+ doc.get("_remark_type"), RemarkType.MISSED
)
-
+
remark = OptimizationRemark.from_dict(doc)
remark.remark_type = remark_type
remarks.append(remark)
-
+
except Exception as e:
print(f"Error parsing {yaml_path}: {e}")
-
+
return remarks
-
+
def parse_directory(self, directory: str, pattern: str = "*.yaml") -> Project:
"""Parse all YAML files in directory and return project"""
project = Project(name=Path(directory).name)
-
+
# Collect all YAML files
- yaml_files = set(Path(directory).glob(pattern)) | set(Path(directory).rglob(pattern))
-
+ yaml_files = set(Path(directory).glob(pattern)) | set(
+ Path(directory).rglob(pattern)
+ )
+
# Group remarks by source file
units_by_source = {}
-
+
for yaml_path in yaml_files:
remarks = self.parse_yaml_file(str(yaml_path))
if not remarks:
continue
-
+
source_file = self._extract_source_file(str(yaml_path), remarks)
-
+
if source_file in units_by_source:
units_by_source[source_file].remarks.extend(remarks)
else:
units_by_source[source_file] = CompilationUnit(
- source_file=source_file,
- remarks=remarks
+ source_file=source_file, remarks=remarks
)
-
+
# Add deduplicated units to project
for unit in units_by_source.values():
unit.deduplicate_remarks()
project.add_compilation_unit(unit)
-
+
return project
-
- def _extract_source_file(self, yaml_path: str, remarks: List[OptimizationRemark]) -> str:
+
+ def _extract_source_file(
+ self, yaml_path: str, remarks: List[OptimizationRemark]
+ ) -> str:
"""Extract source file name from remarks or YAML path"""
# Try to extract from debug info first
for remark in remarks:
- if (remark.debug_loc and
- remark.debug_loc.file != '<unknown>' and
- not remark.debug_loc.file.startswith(('tmp', '.'))):
+ if (
+ remark.debug_loc
+ and remark.debug_loc.file != "<unknown>"
+ and not remark.debug_loc.file.startswith(("tmp", "."))
+ ):
return Path(remark.debug_loc.file).name
-
+
# Fallback: derive from YAML filename
yaml_stem = Path(yaml_path).stem
- if '.' in yaml_stem:
- parts = yaml_stem.split('.')
+ if "." in yaml_stem:
+ parts = yaml_stem.split(".")
for i, part in enumerate(parts):
if part in self.source_extensions:
- return '.'.join(parts[:i+1])
-
- return yaml_stem or 'unknown'
-
- def parse_single_file(self, yaml_path: str, source_file: Optional[str] = None) -> CompilationUnit:
+ return ".".join(parts[: i + 1])
+
+ return yaml_stem or "unknown"
+
+ def parse_single_file(
+ self, yaml_path: str, source_file: Optional[str] = None
+ ) -> CompilationUnit:
"""Parse single YAML file and return compilation unit"""
remarks = self.parse_yaml_file(yaml_path)
source_file = source_file or self._extract_source_file(yaml_path, remarks)
return CompilationUnit(source_file=source_file, remarks=remarks)
-
+
def get_statistics(self, project: Project) -> Dict[str, Any]:
"""Generate comprehensive statistics for project"""
stats = {
"total_files": len(project.compilation_units),
"total_remarks": 0,
"by_type": {t.value: 0 for t in RemarkType},
"by_pass": {},
- "most_active_passes": []
+ "most_active_passes": [],
}
-
+
for unit in project.compilation_units:
stats["total_remarks"] += len(unit.remarks)
-
+
for remark in unit.remarks:
stats["by_type"][remark.remark_type.value] += 1
- stats["by_pass"][remark.pass_name] = stats["by_pass"].get(remark.pass_name, 0) + 1
-
+ stats["by_pass"][remark.pass_name] = (
+ stats["by_pass"].get(remark.pass_name, 0) + 1
+ )
+
stats["most_active_passes"] = sorted(
- stats["by_pass"].items(),
- key=lambda x: x[1],
- reverse=True
+ stats["by_pass"].items(), key=lambda x: x[1], reverse=True
)[:10]
-
+
return stats
-
-
-
if __name__ == "__main__":
parser = OptimizationRecordParser()
project = parser.parse_directory(".")
-
+
if not project.compilation_units:
print("No YAML files found. Run compiler with optimization remarks enabled.")
exit(1)
-
+
print(f"Project: {project.name}, Units: {len(project.compilation_units)}")
stats = parser.get_statistics(project)
print(f"Statistics: {stats}")
``````````
</details>
https://github.com/llvm/llvm-project/pull/147451
More information about the llvm-commits
mailing list