[llvm] [tools] LLVM Advisor - optimization analysis and performance guidance tool (PR #147451)

via llvm-commits llvm-commits at lists.llvm.org
Mon Jul 7 20:00:29 PDT 2025


Miguel =?utf-8?q?Cárdenas?= <miguelecsx at gmail.com>
Message-ID:
In-Reply-To: <llvm.org/llvm/llvm-project/pull/147451 at github.com>


github-actions[bot] wrote:

<!--LLVM CODE FORMAT COMMENT: {darker}-->


:warning: Python code formatter, darker found issues in your code. :warning:

<details>
<summary>
You can test this locally with the following command:
</summary>

``````````bash
darker --check --diff -r HEAD~1...HEAD llvm/tools/llvm-advisor/view/parser/__init__.py llvm/tools/llvm-advisor/view/parser/analyzer.py llvm/tools/llvm-advisor/view/parser/models.py llvm/tools/llvm-advisor/view/parser/profile_parser.py llvm/tools/llvm-advisor/view/parser/yaml_parser.py
``````````

</details>

<details>
<summary>
View the diff from darker here.
</summary>

``````````diff
--- __init__.py	2025-07-08 02:57:37.000000 +0000
+++ __init__.py	2025-07-08 02:59:58.577994 +0000
@@ -1,14 +1,21 @@
 from .yaml_parser import OptimizationRecordParser
-from .models import OptimizationRemark, CompilationUnit, Project, RemarkType, DebugLocation, RemarkArgument
+from .models import (
+    OptimizationRemark,
+    CompilationUnit,
+    Project,
+    RemarkType,
+    DebugLocation,
+    RemarkArgument,
+)
 from .analyzer import RemarkAnalyzer
 
 __all__ = [
-    'OptimizationRecordParser',
-    'OptimizationRemark',
-    'CompilationUnit',
-    'Project',
-    'RemarkType',
-    'DebugLocation',
-    'RemarkArgument',
-    'RemarkAnalyzer'
+    "OptimizationRecordParser",
+    "OptimizationRemark",
+    "CompilationUnit",
+    "Project",
+    "RemarkType",
+    "DebugLocation",
+    "RemarkArgument",
+    "RemarkAnalyzer",
 ]
--- analyzer.py	2025-07-08 02:57:37.000000 +0000
+++ analyzer.py	2025-07-08 02:59:58.876561 +0000
@@ -1,17 +1,17 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
 # See https://llvm.org/LICENSE.txt for license information.
 # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # LLVM Advisor Analysis Engine - Comprehensive analysis of optimization
 # remarks, profiling data, and performance insights.
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 
 from typing import Dict, List, Any, Optional
 from collections import defaultdict, Counter
 from .models import Project, RemarkType, OptimizationRemark
 from .profile_parser import ProfileParser, ProfileData
@@ -24,742 +24,964 @@
         self.raw_trace_data = None
         if profile_path:
             parser = ProfileParser(profile_path)
             self.profile_data = parser.parse()
             self.raw_trace_data = parser.get_raw_data()
-    
+
     def analyze_optimization_opportunities(self) -> Dict[str, Any]:
         missed_by_pass = defaultdict(list)
-        
+
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
                 if remark.remark_type == RemarkType.MISSED:
                     missed_by_pass[remark.pass_name].append(remark)
-        
+
         opportunities = []
         for pass_name, remarks in missed_by_pass.items():
             # Count unique files affected by this pass
             unique_files = set()
             for remark in remarks:
                 for unit in self.project.compilation_units:
                     if remark in unit.remarks:
                         unique_files.add(unit.source_file)
                         break
-            
-            opportunities.append({
-                "pass": pass_name,
-                "missed_count": len(remarks),
-                "files_affected": len(unique_files),
-                "impact": "high" if len(remarks) > 10 else "medium" if len(remarks) > 5 else "low"
-            })
-        
-        return {
-            "optimization_opportunities": sorted(opportunities, key=lambda x: x["missed_count"], reverse=True),
-            "total_missed": sum(len(remarks) for remarks in missed_by_pass.values())
-        }
-    
+
+            opportunities.append(
+                {
+                    "pass": pass_name,
+                    "missed_count": len(remarks),
+                    "files_affected": len(unique_files),
+                    "impact": "high"
+                    if len(remarks) > 10
+                    else "medium"
+                    if len(remarks) > 5
+                    else "low",
+                }
+            )
+
+        return {
+            "optimization_opportunities": sorted(
+                opportunities, key=lambda x: x["missed_count"], reverse=True
+            ),
+            "total_missed": sum(len(remarks) for remarks in missed_by_pass.values()),
+        }
+
     def analyze_performance_hotspots(self) -> Dict[str, Any]:
         function_remarks = defaultdict(list)
-        
+
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
                 function_remarks[remark.function].append(remark)
-        
+
         hotspots = []
         for function, remarks in function_remarks.items():
             if len(remarks) > 3:  # Functions with many remarks are potential hotspots
                 passed = sum(1 for r in remarks if r.remark_type == RemarkType.PASSED)
                 missed = sum(1 for r in remarks if r.remark_type == RemarkType.MISSED)
-                
-                hotspots.append({
-                    "function": function,
-                    "total_remarks": len(remarks),
-                    "passed": passed,
-                    "missed": missed,
-                    "optimization_ratio": passed / len(remarks) if remarks else 0
-                })
-        
-        return {
-            "hotspots": sorted(hotspots, key=lambda x: x["total_remarks"], reverse=True)[:10],
-            "total_functions_analyzed": len(function_remarks)
-        }
-    
+
+                hotspots.append(
+                    {
+                        "function": function,
+                        "total_remarks": len(remarks),
+                        "passed": passed,
+                        "missed": missed,
+                        "optimization_ratio": passed / len(remarks) if remarks else 0,
+                    }
+                )
+
+        return {
+            "hotspots": sorted(
+                hotspots, key=lambda x: x["total_remarks"], reverse=True
+            )[:10],
+            "total_functions_analyzed": len(function_remarks),
+        }
+
     def analyze_offloading_efficiency(self) -> Dict[str, Any]:
         offloading_remarks = []
-        
+
         for unit in self.project.compilation_units:
             offloading_remarks.extend(unit.get_offloading_remarks())
-        
+
         if not offloading_remarks:
             return {"offloading_remarks": 0, "efficiency": "N/A"}
-        
-        passed = sum(1 for r in offloading_remarks if r.remark_type == RemarkType.PASSED)
-        missed = sum(1 for r in offloading_remarks if r.remark_type == RemarkType.MISSED)
-        
+
+        passed = sum(
+            1 for r in offloading_remarks if r.remark_type == RemarkType.PASSED
+        )
+        missed = sum(
+            1 for r in offloading_remarks if r.remark_type == RemarkType.MISSED
+        )
+
         efficiency = passed / len(offloading_remarks) if offloading_remarks else 0
-        
+
         return {
             "offloading_remarks": len(offloading_remarks),
             "passed": passed,
             "missed": missed,
             "efficiency": efficiency,
-            "efficiency_rating": "excellent" if efficiency > 0.8 else "good" if efficiency > 0.6 else "needs_improvement"
+            "efficiency_rating": "excellent"
+            if efficiency > 0.8
+            else "good"
+            if efficiency > 0.6
+            else "needs_improvement",
         }
 
     def analyze_profiling_data(self) -> Dict[str, Any]:
         """Analyze performance-related remarks for profiling insights"""
         static_analysis = {
             "loop_analysis": self._analyze_loops(),
             "vectorization_analysis": self._analyze_vectorization(),
             "inlining_analysis": self._analyze_inlining(),
             "memory_analysis": self._analyze_memory_operations(),
             "kernel_analysis": self._analyze_kernels(),
-            "hotspot_files": self._analyze_file_hotspots()
-        }
-        
+            "hotspot_files": self._analyze_file_hotspots(),
+        }
+
         if self.profile_data:
             runtime_analysis = self._analyze_runtime_profile()
             static_analysis.update(runtime_analysis)
-        
+
         return static_analysis
-    
+
     def analyze_optimization_insights(self) -> Dict[str, Any]:
         """Detailed optimization analysis for optimization tab"""
         return {
             "vectorization_opportunities": self._get_vectorization_opportunities(),
             "loop_optimization": self._analyze_loop_optimizations(),
             "function_optimization": self._analyze_function_optimizations(),
             "memory_optimization": self._analyze_memory_optimizations(),
             "parallelization_opportunities": self._get_parallelization_opportunities(),
-            "compiler_recommendations": self._generate_compiler_recommendations()
-        }
-    
+            "compiler_recommendations": self._generate_compiler_recommendations(),
+        }
+
     def analyze_hardware_insights(self) -> Dict[str, Any]:
         """Hardware-specific analysis for hardware tab"""
         return {
             "gpu_utilization": self._analyze_gpu_utilization(),
             "memory_hierarchy": self._analyze_memory_hierarchy(),
             "compute_patterns": self._analyze_compute_patterns(),
             "offloading_patterns": self._analyze_offloading_patterns(),
-            "architecture_recommendations": self._generate_architecture_recommendations()
-        }
-    
+            "architecture_recommendations": self._generate_architecture_recommendations(),
+        }
+
     def _analyze_loops(self) -> Dict[str, Any]:
         """Analyze loop-related remarks"""
         loop_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if any(keyword in remark.get_message().lower() for keyword in ['loop', 'unroll', 'vectorize']):
+                if any(
+                    keyword in remark.get_message().lower()
+                    for keyword in ["loop", "unroll", "vectorize"]
+                ):
                     loop_remarks.append(remark)
-        
+
         loop_stats = Counter([r.pass_name for r in loop_remarks])
         return {
             "total_loops": len(loop_remarks),
             "by_pass": dict(loop_stats),
-            "vectorized": len([r for r in loop_remarks if 'vectorize' in r.pass_name and r.remark_type == RemarkType.PASSED]),
-            "failed_vectorization": len([r for r in loop_remarks if 'vectorize' in r.pass_name and r.remark_type == RemarkType.MISSED])
-        }
-    
+            "vectorized": len(
+                [
+                    r
+                    for r in loop_remarks
+                    if "vectorize" in r.pass_name and r.remark_type == RemarkType.PASSED
+                ]
+            ),
+            "failed_vectorization": len(
+                [
+                    r
+                    for r in loop_remarks
+                    if "vectorize" in r.pass_name and r.remark_type == RemarkType.MISSED
+                ]
+            ),
+        }
+
     def _analyze_vectorization(self) -> Dict[str, Any]:
         """Analyze vectorization performance"""
         vec_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if 'vectorize' in remark.pass_name.lower():
+                if "vectorize" in remark.pass_name.lower():
                     vec_remarks.append(remark)
-        
+
         successful = [r for r in vec_remarks if r.remark_type == RemarkType.PASSED]
         failed = [r for r in vec_remarks if r.remark_type == RemarkType.MISSED]
-        
+
         return {
             "total_vectorization_attempts": len(vec_remarks),
             "successful": len(successful),
             "failed": len(failed),
             "success_rate": len(successful) / len(vec_remarks) if vec_remarks else 0,
-            "common_failures": self._get_common_failure_reasons(failed)
-        }
-    
+            "common_failures": self._get_common_failure_reasons(failed),
+        }
+
     def _analyze_inlining(self) -> Dict[str, Any]:
         """Analyze function inlining performance"""
         inline_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if remark.pass_name == 'inline':
+                if remark.pass_name == "inline":
                     inline_remarks.append(remark)
-        
+
         successful = [r for r in inline_remarks if r.remark_type == RemarkType.PASSED]
         failed = [r for r in inline_remarks if r.remark_type == RemarkType.MISSED]
-        
+
         return {
             "total_inline_attempts": len(inline_remarks),
             "successful": len(successful),
             "failed": len(failed),
-            "success_rate": len(successful) / len(inline_remarks) if inline_remarks else 0
-        }
-    
+            "success_rate": len(successful) / len(inline_remarks)
+            if inline_remarks
+            else 0,
+        }
+
     def _analyze_memory_operations(self) -> Dict[str, Any]:
         """Analyze memory-related optimizations"""
         memory_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
                 message = remark.get_message().lower()
-                if any(keyword in message for keyword in ['load', 'store', 'memory', 'cache', 'transfer']):
+                if any(
+                    keyword in message
+                    for keyword in ["load", "store", "memory", "cache", "transfer"]
+                ):
                     memory_remarks.append(remark)
-        
+
         return {
             "total_memory_operations": len(memory_remarks),
-            "optimized": len([r for r in memory_remarks if r.remark_type == RemarkType.PASSED]),
-            "missed_optimizations": len([r for r in memory_remarks if r.remark_type == RemarkType.MISSED])
-        }
-    
+            "optimized": len(
+                [r for r in memory_remarks if r.remark_type == RemarkType.PASSED]
+            ),
+            "missed_optimizations": len(
+                [r for r in memory_remarks if r.remark_type == RemarkType.MISSED]
+            ),
+        }
+
     def _analyze_kernels(self) -> Dict[str, Any]:
         """Analyze GPU kernel performance from both remarks and profile data"""
         kernel_remarks = []
         kernel_functions = set()
         target_regions = set()
-        
+
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
                 # Check multiple indicators for kernel functions
                 is_kernel = (
-                    '__omp_offloading_' in remark.function or 
-                    'kernel' in remark.function.lower() or
-                    remark.is_offloading_related()
+                    "__omp_offloading_" in remark.function
+                    or "kernel" in remark.function.lower()
+                    or remark.is_offloading_related()
                 )
-                
+
                 if is_kernel:
                     kernel_remarks.append(remark)
-                    
+
                     # Extract unique kernel functions
-                    if '__omp_offloading_' in remark.function:
+                    if "__omp_offloading_" in remark.function:
                         kernel_functions.add(remark.function)
                     elif remark.is_offloading_related():
                         # For OpenMP optimization remarks, create unique identifiers
                         if remark.debug_loc:
                             # Use file:line as unique kernel identifier for target regions
-                            kernel_id = f"{remark.debug_loc.file}:{remark.debug_loc.line}"
+                            kernel_id = (
+                                f"{remark.debug_loc.file}:{remark.debug_loc.line}"
+                            )
                             target_regions.add(kernel_id)
                         else:
                             # Fallback to function name
                             kernel_functions.add(remark.function)
-        
+
         # Count from static analysis of source files for target directives
         static_kernel_count = 0
         for unit in self.project.compilation_units:
             source_lines = unit.get_source_lines(".")
             if source_lines:
                 for line in source_lines:
                     # Count OpenMP target directives directly from source
-                    if '#pragma omp target' in line and not line.strip().startswith('//'):
+                    if "#pragma omp target" in line and not line.strip().startswith(
+                        "//"
+                    ):
                         static_kernel_count += 1
-        
+
         # Get accurate kernel count from profile data if available
         profile_kernels = 0
-        if self.raw_trace_data and 'traceEvents' in self.raw_trace_data:
+        if self.raw_trace_data and "traceEvents" in self.raw_trace_data:
             # Primary method: Get count from "Total Runtime: target exe" summary event
-            summary_events = [e for e in self.raw_trace_data['traceEvents'] 
-                            if (e.get('name', '') == 'Total Runtime: target exe' and 
-                                e.get('args', {}).get('count') is not None)]
-            
+            summary_events = [
+                e
+                for e in self.raw_trace_data["traceEvents"]
+                if (
+                    e.get("name", "") == "Total Runtime: target exe"
+                    and e.get("args", {}).get("count") is not None
+                )
+            ]
+
             if summary_events:
                 # Use the count from summary which represents actual kernel executions
-                profile_kernels = summary_events[0]['args']['count']
+                profile_kernels = summary_events[0]["args"]["count"]
             else:
                 # Fallback: Count individual kernel execution events
-                kernel_events = [e for e in self.raw_trace_data['traceEvents'] 
-                               if (e.get('name', '') == 'Runtime: target exe' and 
-                                   e.get('ph') == 'X' and e.get('dur', 0) > 0)]
+                kernel_events = [
+                    e
+                    for e in self.raw_trace_data["traceEvents"]
+                    if (
+                        e.get("name", "") == "Runtime: target exe"
+                        and e.get("ph") == "X"
+                        and e.get("dur", 0) > 0
+                    )
+                ]
                 profile_kernels = len(kernel_events)
-                
+
                 # Another fallback: Look for other kernel execution patterns
                 if profile_kernels == 0:
-                    target_events = [e for e in self.raw_trace_data['traceEvents'] 
-                                   if (e.get('name', '').startswith('Kernel Target') and 
-                                       e.get('ph') == 'X')]
+                    target_events = [
+                        e
+                        for e in self.raw_trace_data["traceEvents"]
+                        if (
+                            e.get("name", "").startswith("Kernel Target")
+                            and e.get("ph") == "X"
+                        )
+                    ]
                     profile_kernels = len(target_events)
-        
-        elif self.profile_data and hasattr(self.profile_data, 'kernels'):
+
+        elif self.profile_data and hasattr(self.profile_data, "kernels"):
             profile_kernels = len(self.profile_data.kernels)
-        
+
         # Prioritize profile data when available, otherwise use static analysis
-        detected_kernels = profile_kernels if profile_kernels > 0 else max(
-            len(kernel_functions),
-            len(target_regions), 
-            static_kernel_count
+        detected_kernels = (
+            profile_kernels
+            if profile_kernels > 0
+            else max(len(kernel_functions), len(target_regions), static_kernel_count)
         )
-        
+
         return {
             "total_kernels": detected_kernels,
             "total_kernel_remarks": len(kernel_remarks),
-            "optimized_kernels": len([f for f in kernel_functions if any(
-                r.remark_type == RemarkType.PASSED for r in kernel_remarks if r.function == f
-            )]),
+            "optimized_kernels": len(
+                [
+                    f
+                    for f in kernel_functions
+                    if any(
+                        r.remark_type == RemarkType.PASSED
+                        for r in kernel_remarks
+                        if r.function == f
+                    )
+                ]
+            ),
             "profile_detected_kernels": profile_kernels,
             "remarks_detected_kernels": len(kernel_functions),
             "target_regions_detected": len(target_regions),
-            "static_analysis_kernels": static_kernel_count
-        }
-    
+            "static_analysis_kernels": static_kernel_count,
+        }
+
     def _analyze_file_hotspots(self) -> List[Dict[str, Any]]:
         """Identify files with most optimization activity"""
         file_stats = []
         for unit in self.project.compilation_units:
-            passed = len([r for r in unit.remarks if r.remark_type == RemarkType.PASSED])
-            missed = len([r for r in unit.remarks if r.remark_type == RemarkType.MISSED])
+            passed = len(
+                [r for r in unit.remarks if r.remark_type == RemarkType.PASSED]
+            )
+            missed = len(
+                [r for r in unit.remarks if r.remark_type == RemarkType.MISSED]
+            )
             total = len(unit.remarks)
-            
+
             if total > 0:
-                file_stats.append({
-                    "file": unit.source_file,
-                    "total_remarks": total,
-                    "passed": passed,
-                    "missed": missed,
-                    "optimization_ratio": passed / total,
-                    "activity_score": total
-                })
-        
+                file_stats.append(
+                    {
+                        "file": unit.source_file,
+                        "total_remarks": total,
+                        "passed": passed,
+                        "missed": missed,
+                        "optimization_ratio": passed / total,
+                        "activity_score": total,
+                    }
+                )
+
         return sorted(file_stats, key=lambda x: x["activity_score"], reverse=True)[:10]
-    
+
     def _get_vectorization_opportunities(self) -> List[Dict[str, Any]]:
         """Identify missed vectorization opportunities"""
         opportunities = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if ('vectorize' in remark.pass_name.lower() and 
-                    remark.remark_type == RemarkType.MISSED):
-                    opportunities.append({
-                        "file": unit.source_file,
-                        "line": remark.debug_loc.line if remark.debug_loc else 0,
-                        "function": remark.function,
-                        "reason": remark.get_message(),
-                        "pass": remark.pass_name
-                    })
+                if (
+                    "vectorize" in remark.pass_name.lower()
+                    and remark.remark_type == RemarkType.MISSED
+                ):
+                    opportunities.append(
+                        {
+                            "file": unit.source_file,
+                            "line": remark.debug_loc.line if remark.debug_loc else 0,
+                            "function": remark.function,
+                            "reason": remark.get_message(),
+                            "pass": remark.pass_name,
+                        }
+                    )
         return opportunities[:20]  # Top 20 opportunities
-    
+
     def _analyze_loop_optimizations(self) -> Dict[str, Any]:
         """Analyze loop optimization patterns"""
-        loop_passes = ['loop-vectorize', 'loop-unroll', 'licm', 'loop-idiom']
+        loop_passes = ["loop-vectorize", "loop-unroll", "licm", "loop-idiom"]
         loop_data = {}
-        
+
         for pass_name in loop_passes:
             remarks = []
             for unit in self.project.compilation_units:
                 for remark in unit.remarks:
                     if remark.pass_name == pass_name:
                         remarks.append(remark)
-            
+
             loop_data[pass_name] = {
                 "total": len(remarks),
-                "passed": len([r for r in remarks if r.remark_type == RemarkType.PASSED]),
-                "missed": len([r for r in remarks if r.remark_type == RemarkType.MISSED])
+                "passed": len(
+                    [r for r in remarks if r.remark_type == RemarkType.PASSED]
+                ),
+                "missed": len(
+                    [r for r in remarks if r.remark_type == RemarkType.MISSED]
+                ),
             }
-        
+
         return loop_data
-    
+
     def _analyze_function_optimizations(self) -> Dict[str, Any]:
         """Analyze function-level optimizations"""
-        function_passes = ['inline', 'dce', 'dse', 'gvn']
+        function_passes = ["inline", "dce", "dse", "gvn"]
         func_data = {}
-        
+
         for pass_name in function_passes:
             remarks = []
             for unit in self.project.compilation_units:
                 for remark in unit.remarks:
                     if remark.pass_name == pass_name:
                         remarks.append(remark)
-            
+
             func_data[pass_name] = {
                 "total": len(remarks),
-                "passed": len([r for r in remarks if r.remark_type == RemarkType.PASSED]),
-                "missed": len([r for r in remarks if r.remark_type == RemarkType.MISSED])
+                "passed": len(
+                    [r for r in remarks if r.remark_type == RemarkType.PASSED]
+                ),
+                "missed": len(
+                    [r for r in remarks if r.remark_type == RemarkType.MISSED]
+                ),
             }
-        
+
         return func_data
-    
+
     def _analyze_memory_optimizations(self) -> List[Dict[str, Any]]:
         """Analyze memory optimization opportunities"""
         memory_issues = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if (remark.remark_type == RemarkType.MISSED and
-                    any(keyword in remark.get_message().lower() for keyword in 
-                        ['load', 'store', 'memory', 'cache', 'clobbered'])):
-                    memory_issues.append({
-                        "file": unit.source_file,
-                        "line": remark.debug_loc.line if remark.debug_loc else 0,
-                        "function": remark.function,
-                        "issue": remark.get_message(),
-                        "pass": remark.pass_name
-                    })
+                if remark.remark_type == RemarkType.MISSED and any(
+                    keyword in remark.get_message().lower()
+                    for keyword in ["load", "store", "memory", "cache", "clobbered"]
+                ):
+                    memory_issues.append(
+                        {
+                            "file": unit.source_file,
+                            "line": remark.debug_loc.line if remark.debug_loc else 0,
+                            "function": remark.function,
+                            "issue": remark.get_message(),
+                            "pass": remark.pass_name,
+                        }
+                    )
         return memory_issues[:15]
-    
+
     def _get_parallelization_opportunities(self) -> List[Dict[str, Any]]:
         """Identify parallelization opportunities"""
         opportunities = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if (any(keyword in remark.get_message().lower() for keyword in 
-                       ['parallel', 'thread', 'openmp', 'offload']) and
-                    remark.remark_type == RemarkType.MISSED):
-                    opportunities.append({
-                        "file": unit.source_file,
-                        "line": remark.debug_loc.line if remark.debug_loc else 0,
-                        "function": remark.function,
-                        "opportunity": remark.get_message(),
-                        "pass": remark.pass_name
-                    })
+                if (
+                    any(
+                        keyword in remark.get_message().lower()
+                        for keyword in ["parallel", "thread", "openmp", "offload"]
+                    )
+                    and remark.remark_type == RemarkType.MISSED
+                ):
+                    opportunities.append(
+                        {
+                            "file": unit.source_file,
+                            "line": remark.debug_loc.line if remark.debug_loc else 0,
+                            "function": remark.function,
+                            "opportunity": remark.get_message(),
+                            "pass": remark.pass_name,
+                        }
+                    )
         return opportunities[:10]
-    
+
     def _generate_compiler_recommendations(self) -> List[str]:
         """Generate compiler optimization recommendations"""
         recommendations = []
-        
+
         # Analyze common missed optimizations
         missed_passes = Counter()
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
                 if remark.remark_type == RemarkType.MISSED:
                     missed_passes[remark.pass_name] += 1
-        
-        if missed_passes.get('loop-vectorize', 0) > 10:
-            recommendations.append("Consider using -ffast-math for aggressive vectorization")
-        if missed_passes.get('inline', 0) > 20:
+
+        if missed_passes.get("loop-vectorize", 0) > 10:
+            recommendations.append(
+                "Consider using -ffast-math for aggressive vectorization"
+            )
+        if missed_passes.get("inline", 0) > 20:
             recommendations.append("Increase inlining thresholds with -finline-limit")
-        if missed_passes.get('gvn', 0) > 5:
+        if missed_passes.get("gvn", 0) > 5:
             recommendations.append("Enable aggressive optimization with -O3")
-        
+
         return recommendations
-    
+
     def _analyze_gpu_utilization(self) -> Dict[str, Any]:
         """Analyze GPU utilization patterns"""
         gpu_remarks = []
         for unit in self.project.compilation_units:
             gpu_remarks.extend(unit.get_offloading_remarks())
-        
-        kernel_functions = set(r.function for r in gpu_remarks if '__omp_offloading_' in r.function)
+
+        kernel_functions = set(
+            r.function for r in gpu_remarks if "__omp_offloading_" in r.function
+        )
         return {
             "total_gpu_functions": len(kernel_functions),
-            "optimization_coverage": len([r for r in gpu_remarks if r.remark_type == RemarkType.PASSED]) / len(gpu_remarks) if gpu_remarks else 0,
-            "offloading_efficiency": self.analyze_offloading_efficiency()
-        }
-    
+            "optimization_coverage": len(
+                [r for r in gpu_remarks if r.remark_type == RemarkType.PASSED]
+            )
+            / len(gpu_remarks)
+            if gpu_remarks
+            else 0,
+            "offloading_efficiency": self.analyze_offloading_efficiency(),
+        }
+
     def _analyze_memory_hierarchy(self) -> Dict[str, Any]:
         """Analyze memory access patterns for GPU"""
         memory_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if (remark.is_offloading_related() and
-                    any(keyword in remark.get_message().lower() for keyword in 
-                        ['memory', 'load', 'store', 'cache', 'shared', 'global'])):
+                if remark.is_offloading_related() and any(
+                    keyword in remark.get_message().lower()
+                    for keyword in [
+                        "memory",
+                        "load",
+                        "store",
+                        "cache",
+                        "shared",
+                        "global",
+                    ]
+                ):
                     memory_remarks.append(remark)
-        
+
         return {
             "memory_operations": len(memory_remarks),
-            "optimized_memory": len([r for r in memory_remarks if r.remark_type == RemarkType.PASSED]),
-            "memory_issues": len([r for r in memory_remarks if r.remark_type == RemarkType.MISSED])
-        }
-    
+            "optimized_memory": len(
+                [r for r in memory_remarks if r.remark_type == RemarkType.PASSED]
+            ),
+            "memory_issues": len(
+                [r for r in memory_remarks if r.remark_type == RemarkType.MISSED]
+            ),
+        }
+
     def _analyze_compute_patterns(self) -> Dict[str, Any]:
         """Analyze compute utilization patterns"""
         compute_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if (remark.is_offloading_related() and
-                    any(keyword in remark.get_message().lower() for keyword in 
-                        ['compute', 'thread', 'warp', 'simd', 'vector'])):
+                if remark.is_offloading_related() and any(
+                    keyword in remark.get_message().lower()
+                    for keyword in ["compute", "thread", "warp", "simd", "vector"]
+                ):
                     compute_remarks.append(remark)
-        
+
         return {
             "compute_operations": len(compute_remarks),
-            "compute_efficiency": len([r for r in compute_remarks if r.remark_type == RemarkType.PASSED]) / len(compute_remarks) if compute_remarks else 0
-        }
-    
+            "compute_efficiency": len(
+                [r for r in compute_remarks if r.remark_type == RemarkType.PASSED]
+            )
+            / len(compute_remarks)
+            if compute_remarks
+            else 0,
+        }
+
     def _analyze_offloading_patterns(self) -> Dict[str, Any]:
         """Analyze offloading patterns and effectiveness"""
         offload_remarks = []
         for unit in self.project.compilation_units:
             offload_remarks.extend(unit.get_offloading_remarks())
-        
-        offload_functions = Counter(r.function for r in offload_remarks if '__omp_offloading_' in r.function)
+
+        offload_functions = Counter(
+            r.function for r in offload_remarks if "__omp_offloading_" in r.function
+        )
         return {
             "offloaded_functions": len(offload_functions),
             "most_active_kernels": dict(offload_functions.most_common(5)),
-            "offloading_success_rate": len([r for r in offload_remarks if r.remark_type == RemarkType.PASSED]) / len(offload_remarks) if offload_remarks else 0
-        }
-    
+            "offloading_success_rate": len(
+                [r for r in offload_remarks if r.remark_type == RemarkType.PASSED]
+            )
+            / len(offload_remarks)
+            if offload_remarks
+            else 0,
+        }
+
     def _generate_architecture_recommendations(self) -> List[str]:
         """Generate architecture-specific recommendations"""
         recommendations = []
-        
+
         offload_remarks = []
         for unit in self.project.compilation_units:
             offload_remarks.extend(unit.get_offloading_remarks())
-        
+
         if len(offload_remarks) > 0:
-            efficiency = len([r for r in offload_remarks if r.remark_type == RemarkType.PASSED]) / len(offload_remarks)
+            efficiency = len(
+                [r for r in offload_remarks if r.remark_type == RemarkType.PASSED]
+            ) / len(offload_remarks)
             if efficiency < 0.7:
                 recommendations.append("Consider optimizing GPU memory access patterns")
                 recommendations.append("Review data transfer between host and device")
-        
+
         vectorization_remarks = []
         for unit in self.project.compilation_units:
             for remark in unit.remarks:
-                if 'vectorize' in remark.pass_name:
+                if "vectorize" in remark.pass_name:
                     vectorization_remarks.append(remark)
-        
+
         if len(vectorization_remarks) > 0:
-            vec_efficiency = len([r for r in vectorization_remarks if r.remark_type == RemarkType.PASSED]) / len(vectorization_remarks)
+            vec_efficiency = len(
+                [r for r in vectorization_remarks if r.remark_type == RemarkType.PASSED]
+            ) / len(vectorization_remarks)
             if vec_efficiency < 0.5:
                 recommendations.append("Consider SIMD-friendly data structures")
-        
+
         return recommendations
-    
-    def _get_common_failure_reasons(self, failed_remarks: List[OptimizationRemark]) -> List[str]:
+
+    def _get_common_failure_reasons(
+        self, failed_remarks: List[OptimizationRemark]
+    ) -> List[str]:
         """Extract common failure reasons from remarks"""
         reasons = Counter()
         for remark in failed_remarks:
             message = remark.get_message().lower()
-            if 'uncountable' in message:
-                reasons['Uncountable loops'] += 1
-            elif 'definition unavailable' in message:
-                reasons['Function definition unavailable'] += 1
-            elif 'clobbered' in message:
-                reasons['Memory dependencies'] += 1
-            elif 'impossible' in message:
-                reasons['Vectorization impossible'] += 1
+            if "uncountable" in message:
+                reasons["Uncountable loops"] += 1
+            elif "definition unavailable" in message:
+                reasons["Function definition unavailable"] += 1
+            elif "clobbered" in message:
+                reasons["Memory dependencies"] += 1
+            elif "impossible" in message:
+                reasons["Vectorization impossible"] += 1
             else:
-                reasons['Other'] += 1
-        
+                reasons["Other"] += 1
+
         return [f"{reason}: {count}" for reason, count in reasons.most_common(5)]
-    
+
     def _analyze_runtime_profile(self) -> Dict[str, Any]:
         """Analyze runtime profiling data from LIBOMPTARGET_PROFILE"""
         if not self.profile_data and not self.raw_trace_data:
             return {}
-        
+
         # Prioritize raw Chrome Trace format data
         if self.raw_trace_data:
             return {
                 "trace_data": self.raw_trace_data,
-                "performance_bottlenecks": self._identify_trace_bottlenecks(self.raw_trace_data),
-                "optimization_recommendations": self._generate_trace_recommendations(self.raw_trace_data)
+                "performance_bottlenecks": self._identify_trace_bottlenecks(
+                    self.raw_trace_data
+                ),
+                "optimization_recommendations": self._generate_trace_recommendations(
+                    self.raw_trace_data
+                ),
             }
-        
+
         # Handle Chrome Trace format from profile_data
-        if hasattr(self.profile_data, 'trace_events') or isinstance(self.profile_data, dict):
-            trace_data = self.profile_data if isinstance(self.profile_data, dict) else self.profile_data.__dict__
-            
+        if hasattr(self.profile_data, "trace_events") or isinstance(
+            self.profile_data, dict
+        ):
+            trace_data = (
+                self.profile_data
+                if isinstance(self.profile_data, dict)
+                else self.profile_data.__dict__
+            )
+
             return {
                 "trace_data": trace_data,
                 "performance_bottlenecks": self._identify_trace_bottlenecks(trace_data),
-                "optimization_recommendations": self._generate_trace_recommendations(trace_data)
+                "optimization_recommendations": self._generate_trace_recommendations(
+                    trace_data
+                ),
             }
-        
+
         # Fallback to original ProfileData structure (if implemented)
-        if self.profile_data and hasattr(self.profile_data, 'total_time'):
+        if self.profile_data and hasattr(self.profile_data, "total_time"):
             return {
                 "runtime_performance": {
                     "total_execution_time_us": self.profile_data.total_time,
                     "device_time_us": self.profile_data.device_time,
                     "host_time_us": self.profile_data.host_time,
                     "memory_transfer_time_us": self.profile_data.memory_transfer_time,
-                    "device_utilization_percent": (self.profile_data.device_time / self.profile_data.total_time * 100) if self.profile_data.total_time > 0 else 0
+                    "device_utilization_percent": (
+                        self.profile_data.device_time
+                        / self.profile_data.total_time
+                        * 100
+                    )
+                    if self.profile_data.total_time > 0
+                    else 0,
                 },
                 "kernel_performance": [
                     {
                         "name": kernel.name,
                         "execution_time_us": kernel.execution_time,
                         "launch_time_us": kernel.launch_time,
                         "device_id": kernel.device_id,
                         "grid_size": kernel.grid_size,
-                        "block_size": kernel.block_size
-                    } for kernel in self.profile_data.kernels
+                        "block_size": kernel.block_size,
+                    }
+                    for kernel in self.profile_data.kernels
                 ],
                 "performance_bottlenecks": self._identify_performance_bottlenecks(),
-                "optimization_recommendations": self._generate_runtime_recommendations()
+                "optimization_recommendations": self._generate_runtime_recommendations(),
             }
-        
+
         return {}
 
-    def _identify_trace_bottlenecks(self, trace_data: Dict[str, Any]) -> List[Dict[str, Any]]:
+    def _identify_trace_bottlenecks(
+        self, trace_data: Dict[str, Any]
+    ) -> List[Dict[str, Any]]:
         """Identify performance bottlenecks from Chrome Trace format data"""
-        if not trace_data or 'traceEvents' not in trace_data:
+        if not trace_data or "traceEvents" not in trace_data:
             return []
-        
+
         bottlenecks = []
-        events = [e for e in trace_data['traceEvents'] if e.get('ph') == 'X' and e.get('dur')]
-        
+        events = [
+            e for e in trace_data["traceEvents"] if e.get("ph") == "X" and e.get("dur")
+        ]
+
         if not events:
             return bottlenecks
-        
+
         # Calculate time distributions
-        kernel_events = [e for e in events if 'target exe' in e.get('name', '')]
-        memory_events = [e for e in events if any(term in e.get('name', '') for term in ['HostToDev', 'DevToHost'])]
-        init_events = [e for e in events if 'init' in e.get('name', '').lower()]
-        
-        total_time = max(e['ts'] + e['dur'] for e in events) - min(e['ts'] for e in events)
-        kernel_time = sum(e['dur'] for e in kernel_events)
-        memory_time = sum(e['dur'] for e in memory_events)
-        init_time = sum(e['dur'] for e in init_events)
-        
+        kernel_events = [e for e in events if "target exe" in e.get("name", "")]
+        memory_events = [
+            e
+            for e in events
+            if any(term in e.get("name", "") for term in ["HostToDev", "DevToHost"])
+        ]
+        init_events = [e for e in events if "init" in e.get("name", "").lower()]
+
+        total_time = max(e["ts"] + e["dur"] for e in events) - min(
+            e["ts"] for e in events
+        )
+        kernel_time = sum(e["dur"] for e in kernel_events)
+        memory_time = sum(e["dur"] for e in memory_events)
+        init_time = sum(e["dur"] for e in init_events)
+
         # Memory transfer bottleneck
         if memory_time > kernel_time * 0.5:
-            bottlenecks.append({
-                "type": "memory_transfer",
-                "severity": "high",
-                "description": "Memory transfers take significant time compared to kernel execution",
-                "impact_percent": (memory_time / total_time) * 100
-            })
-        
+            bottlenecks.append(
+                {
+                    "type": "memory_transfer",
+                    "severity": "high",
+                    "description": "Memory transfers take significant time compared to kernel execution",
+                    "impact_percent": (memory_time / total_time) * 100,
+                }
+            )
+
         # Initialization overhead
         if init_time > total_time * 0.3:
-            bottlenecks.append({
-                "type": "initialization_overhead",
-                "severity": "medium", 
-                "description": "Initialization takes significant portion of total execution time",
-                "impact_percent": (init_time / total_time) * 100
-            })
-        
+            bottlenecks.append(
+                {
+                    "type": "initialization_overhead",
+                    "severity": "medium",
+                    "description": "Initialization takes significant portion of total execution time",
+                    "impact_percent": (init_time / total_time) * 100,
+                }
+            )
+
         # Kernel utilization
         kernel_util = (kernel_time / total_time) * 100 if total_time > 0 else 0
         if kernel_util < 30:
-            bottlenecks.append({
-                "type": "low_kernel_utilization",
-                "severity": "medium",
-                "description": f"Kernel utilization is only {kernel_util:.1f}%",
-                "impact_percent": 100 - kernel_util
-            })
-        
+            bottlenecks.append(
+                {
+                    "type": "low_kernel_utilization",
+                    "severity": "medium",
+                    "description": f"Kernel utilization is only {kernel_util:.1f}%",
+                    "impact_percent": 100 - kernel_util,
+                }
+            )
+
         # Kernel execution imbalance
         if len(kernel_events) > 1:
-            durations = [e['dur'] for e in kernel_events]
+            durations = [e["dur"] for e in kernel_events]
             max_dur = max(durations)
             min_dur = min(durations)
             if max_dur > min_dur * 3:  # 3x difference indicates imbalance
-                bottlenecks.append({
-                    "type": "kernel_imbalance",
-                    "severity": "low",
-                    "description": "Significant execution time variance between kernel launches",
-                    "impact_percent": ((max_dur - min_dur) / max_dur) * 100
-                })
-        
+                bottlenecks.append(
+                    {
+                        "type": "kernel_imbalance",
+                        "severity": "low",
+                        "description": "Significant execution time variance between kernel launches",
+                        "impact_percent": ((max_dur - min_dur) / max_dur) * 100,
+                    }
+                )
+
         return bottlenecks
 
     def _generate_trace_recommendations(self, trace_data: Dict[str, Any]) -> List[str]:
         """Generate optimization recommendations based on Chrome Trace format data"""
-        if not trace_data or 'traceEvents' not in trace_data:
+        if not trace_data or "traceEvents" not in trace_data:
             return []
-        
+
         recommendations = []
-        events = [e for e in trace_data['traceEvents'] if e.get('ph') == 'X' and e.get('dur')]
-        
+        events = [
+            e for e in trace_data["traceEvents"] if e.get("ph") == "X" and e.get("dur")
+        ]
+
         if not events:
             return recommendations
-        
-        kernel_events = [e for e in events if 'target exe' in e.get('name', '')]
-        memory_events = [e for e in events if any(term in e.get('name', '') for term in ['HostToDev', 'DevToHost'])]
-        
-        total_time = max(e['ts'] + e['dur'] for e in events) - min(e['ts'] for e in events)
-        kernel_time = sum(e['dur'] for e in kernel_events)
-        memory_time = sum(e['dur'] for e in memory_events)
-        
+
+        kernel_events = [e for e in events if "target exe" in e.get("name", "")]
+        memory_events = [
+            e
+            for e in events
+            if any(term in e.get("name", "") for term in ["HostToDev", "DevToHost"])
+        ]
+
+        total_time = max(e["ts"] + e["dur"] for e in events) - min(
+            e["ts"] for e in events
+        )
+        kernel_time = sum(e["dur"] for e in kernel_events)
+        memory_time = sum(e["dur"] for e in memory_events)
+
         # Memory transfer recommendations
         if memory_time > kernel_time * 0.3:
-            recommendations.append("Consider optimizing data transfers - use unified memory or asynchronous transfers")
-            recommendations.append("Reduce data movement by keeping data on device between kernel launches")
-        
+            recommendations.append(
+                "Consider optimizing data transfers - use unified memory or asynchronous transfers"
+            )
+            recommendations.append(
+                "Reduce data movement by keeping data on device between kernel launches"
+            )
+
         # Kernel execution recommendations
         if len(kernel_events) > 5:
             avg_kernel_time = kernel_time / len(kernel_events)
             if avg_kernel_time < 100:  # Very short kernels (< 100μs)
-                recommendations.append("Consider kernel fusion to reduce launch overhead")
-        
+                recommendations.append(
+                    "Consider kernel fusion to reduce launch overhead"
+                )
+
         # Device utilization recommendations
         kernel_util = (kernel_time / total_time) * 100 if total_time > 0 else 0
         if kernel_util < 50:
-            recommendations.append("Increase workload size or use multiple devices to improve utilization")
-            recommendations.append("Consider overlapping computation with data transfers")
-        
+            recommendations.append(
+                "Increase workload size or use multiple devices to improve utilization"
+            )
+            recommendations.append(
+                "Consider overlapping computation with data transfers"
+            )
+
         # Specific kernel analysis
         if kernel_events:
             for event in kernel_events:
-                detail = event.get('args', {}).get('detail', '')
-                if 'NumTeams=0' in detail:
-                    recommendations.append("Some kernels launch with NumTeams=0 - verify OpenMP target directives")
-        
+                detail = event.get("args", {}).get("detail", "")
+                if "NumTeams=0" in detail:
+                    recommendations.append(
+                        "Some kernels launch with NumTeams=0 - verify OpenMP target directives"
+                    )
+
         return recommendations
-    
+
     def _identify_performance_bottlenecks(self) -> List[Dict[str, Any]]:
         """Identify performance bottlenecks from runtime data"""
         if not self.profile_data:
             return []
-        
+
         bottlenecks = []
-        
+
         # Memory transfer bottleneck
         if self.profile_data.memory_transfer_time > self.profile_data.device_time * 0.5:
-            bottlenecks.append({
-                "type": "memory_transfer",
-                "severity": "high",
-                "description": "Memory transfers take significant time compared to computation",
-                "impact_percent": (self.profile_data.memory_transfer_time / self.profile_data.total_time) * 100
-            })
-        
+            bottlenecks.append(
+                {
+                    "type": "memory_transfer",
+                    "severity": "high",
+                    "description": "Memory transfers take significant time compared to computation",
+                    "impact_percent": (
+                        self.profile_data.memory_transfer_time
+                        / self.profile_data.total_time
+                    )
+                    * 100,
+                }
+            )
+
         # Low device utilization
-        device_util = (self.profile_data.device_time / self.profile_data.total_time) * 100 if self.profile_data.total_time > 0 else 0
+        device_util = (
+            (self.profile_data.device_time / self.profile_data.total_time) * 100
+            if self.profile_data.total_time > 0
+            else 0
+        )
         if device_util < 50:
-            bottlenecks.append({
-                "type": "low_device_utilization",
-                "severity": "medium",
-                "description": f"Device utilization is only {device_util:.1f}%",
-                "impact_percent": 100 - device_util
-            })
-        
+            bottlenecks.append(
+                {
+                    "type": "low_device_utilization",
+                    "severity": "medium",
+                    "description": f"Device utilization is only {device_util:.1f}%",
+                    "impact_percent": 100 - device_util,
+                }
+            )
+
         # Kernel execution imbalance
         if len(self.profile_data.kernels) > 1:
             execution_times = [k.execution_time for k in self.profile_data.kernels]
             max_time = max(execution_times)
             min_time = min(execution_times)
             if max_time > min_time * 5:  # 5x difference indicates imbalance
-                bottlenecks.append({
-                    "type": "kernel_imbalance",
-                    "severity": "medium",
-                    "description": "Significant execution time variance between kernels",
-                    "impact_percent": ((max_time - min_time) / max_time) * 100
-                })
-        
+                bottlenecks.append(
+                    {
+                        "type": "kernel_imbalance",
+                        "severity": "medium",
+                        "description": "Significant execution time variance between kernels",
+                        "impact_percent": ((max_time - min_time) / max_time) * 100,
+                    }
+                )
+
         return bottlenecks
-    
+
     def _generate_runtime_recommendations(self) -> List[str]:
         """Generate optimization recommendations based on runtime data"""
         if not self.profile_data:
             return []
-        
+
         recommendations = []
-        
+
         # Memory transfer optimization
         if self.profile_data.memory_transfer_time > self.profile_data.device_time * 0.3:
-            recommendations.append("Consider data layout optimizations to reduce memory transfers")
-            recommendations.append("Use unified memory or async transfers where possible")
-        
+            recommendations.append(
+                "Consider data layout optimizations to reduce memory transfers"
+            )
+            recommendations.append(
+                "Use unified memory or async transfers where possible"
+            )
+
         # Device utilization
-        device_util = (self.profile_data.device_time / self.profile_data.total_time) * 100 if self.profile_data.total_time > 0 else 0
+        device_util = (
+            (self.profile_data.device_time / self.profile_data.total_time) * 100
+            if self.profile_data.total_time > 0
+            else 0
+        )
         if device_util < 70:
-            recommendations.append("Increase workload size or use multiple kernels to improve device utilization")
-        
+            recommendations.append(
+                "Increase workload size or use multiple kernels to improve device utilization"
+            )
+
         # Kernel optimization
         for kernel in self.profile_data.kernels:
             if kernel.execution_time < 1000:  # Less than 1ms
-                recommendations.append(f"Kernel '{kernel.name}' has very short execution time - consider kernel fusion")
-        
+                recommendations.append(
+                    f"Kernel '{kernel.name}' has very short execution time - consider kernel fusion"
+                )
+
         return recommendations
-    
+
     def get_comprehensive_analysis(self) -> Dict[str, Any]:
         return {
             "project_summary": self.project.get_project_summary(),
             "optimization_opportunities": self.analyze_optimization_opportunities(),
             "performance_hotspots": self.analyze_performance_hotspots(),
             "offloading_efficiency": self.analyze_offloading_efficiency(),
             "profiling_data": self.analyze_profiling_data(),
             "optimization_insights": self.analyze_optimization_insights(),
-            "hardware_insights": self.analyze_hardware_insights()
-        }
+            "hardware_insights": self.analyze_hardware_insights(),
+        }
--- models.py	2025-07-08 02:57:37.000000 +0000
+++ models.py	2025-07-08 02:59:59.000147 +0000
@@ -1,197 +1,268 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
 # See https://llvm.org/LICENSE.txt for license information.
 # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # LLVM Advisor Data Models - Structured data models for optimization
 # analysis, remarks, and profiling information.
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 
 from dataclasses import dataclass, field
 from typing import Dict, List, Optional, Any, Union
 from enum import Enum
 from pathlib import Path
 
 
 class RemarkType(Enum):
     PASSED = "Passed"
-    MISSED = "Missed" 
+    MISSED = "Missed"
     ANALYSIS = "Analysis"
 
 
 @dataclass
 class DebugLocation:
     file: str
     line: int
     column: int
-    
+
     @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> Optional['DebugLocation']:
-        return cls(
-            file=data.get('File', '<unknown>'),
-            line=int(data.get('Line', 0)),
-            column=int(data.get('Column', 0))
-        ) if data else None
+    def from_dict(cls, data: Dict[str, Any]) -> Optional["DebugLocation"]:
+        return (
+            cls(
+                file=data.get("File", "<unknown>"),
+                line=int(data.get("Line", 0)),
+                column=int(data.get("Column", 0)),
+            )
+            if data
+            else None
+        )
 
 
 @dataclass
 class RemarkArgument:
     type: str
     value: Any
     debug_loc: Optional[DebugLocation] = None
-    
+
     @classmethod
-    def from_dict(cls, data: Union[str, Dict[str, Any]]) -> 'RemarkArgument':
+    def from_dict(cls, data: Union[str, Dict[str, Any]]) -> "RemarkArgument":
         if isinstance(data, str):
             return cls(type="String", value=data)
-        
+
         # Consolidated argument type mapping
         type_mappings = {
             # Basic types
             "String": "String",
             "DebugLoc": None,  # Special handling
             # Function/code references
-            **{key: key for key in ["Callee", "Caller", "Function", "BasicBlock", "Inst", 
-                                   "Call", "OtherAccess", "ClobberedBy", "InfavorOfValue", "Reason"]},
+            **{
+                key: key
+                for key in [
+                    "Callee",
+                    "Caller",
+                    "Function",
+                    "BasicBlock",
+                    "Inst",
+                    "Call",
+                    "OtherAccess",
+                    "ClobberedBy",
+                    "InfavorOfValue",
+                    "Reason",
+                ]
+            },
             # Metrics and counts
-            **{key: key for key in ["Type", "Cost", "Threshold", "VectorizationFactor", 
-                                   "InterleaveCount", "NumVRCopies", "TotalCopiesCost",
-                                   "NumStackBytes", "NumInstructions", "Line", "Column"]},
+            **{
+                key: key
+                for key in [
+                    "Type",
+                    "Cost",
+                    "Threshold",
+                    "VectorizationFactor",
+                    "InterleaveCount",
+                    "NumVRCopies",
+                    "TotalCopiesCost",
+                    "NumStackBytes",
+                    "NumInstructions",
+                    "Line",
+                    "Column",
+                ]
+            },
             # OpenMP/GPU specific
-            **{key: key for key in ["ExternalNotKernel", "GlobalsSize", "LocalVarSize", 
-                                   "NumRegions", "RegisterPressurerValue", "AssumedAddressSpace",
-                                   "SPMDCompatibilityTracker", "GlobalizationLevel", "AddressSpace"]}
+            **{
+                key: key
+                for key in [
+                    "ExternalNotKernel",
+                    "GlobalsSize",
+                    "LocalVarSize",
+                    "NumRegions",
+                    "RegisterPressurerValue",
+                    "AssumedAddressSpace",
+                    "SPMDCompatibilityTracker",
+                    "GlobalizationLevel",
+                    "AddressSpace",
+                ]
+            },
         }
-        
+
         arg_type = None
         value = None
         debug_loc = None
-        
+
         for key, val in data.items():
             if key == "DebugLoc":
                 debug_loc = DebugLocation.from_dict(val)
             elif key in type_mappings:
                 arg_type = type_mappings[key]
                 value = val
                 break
-        
+
         return cls(type=arg_type or "Unknown", value=value, debug_loc=debug_loc)
 
 
- at dataclass 
+ at dataclass
 class OptimizationRemark:
     remark_type: RemarkType
     pass_name: str
     remark_name: str
     function: str
     debug_loc: Optional[DebugLocation]
     args: List[RemarkArgument]
-    
+
     @classmethod
-    def from_dict(cls, data: Dict[str, Any]) -> 'OptimizationRemark':
+    def from_dict(cls, data: Dict[str, Any]) -> "OptimizationRemark":
         remark_type = {
-            'Passed': RemarkType.PASSED,
-            'Missed': RemarkType.MISSED,
-            'Analysis': RemarkType.ANALYSIS
-        }.get(data.get('_remark_type'), RemarkType.MISSED)
-        
+            "Passed": RemarkType.PASSED,
+            "Missed": RemarkType.MISSED,
+            "Analysis": RemarkType.ANALYSIS,
+        }.get(data.get("_remark_type"), RemarkType.MISSED)
+
         return cls(
             remark_type=remark_type,
-            pass_name=data.get('Pass', 'unknown'),
-            remark_name=data.get('Name', 'unknown'),
-            function=data.get('Function', 'unknown'),
-            debug_loc=DebugLocation.from_dict(data.get('DebugLoc', {})),
-            args=[RemarkArgument.from_dict(arg) for arg in data.get('Args', [])]
+            pass_name=data.get("Pass", "unknown"),
+            remark_name=data.get("Name", "unknown"),
+            function=data.get("Function", "unknown"),
+            debug_loc=DebugLocation.from_dict(data.get("DebugLoc", {})),
+            args=[RemarkArgument.from_dict(arg) for arg in data.get("Args", [])],
         )
-    
+
     def get_message(self) -> str:
         parts = []
         for arg in self.args:
             if arg.type == "String":
                 parts.append(str(arg.value))
             else:
                 parts.append(f"[{arg.type}: {arg.value}]")
         return "".join(parts)
-    
+
     def is_offloading_related(self) -> bool:
         """Enhanced detection of OpenMP offloading and GPU-related remarks"""
         offloading_indicators = [
-            "__omp_offloading", "omp_outlined", "target", "kernel-info", "offload",
-            "gpu", "cuda", "opencl", "sycl", "hip", "device", "host",
-            "ExternalNotKernel", "SPMDCompatibilityTracker", "GlobalizationLevel",
-            "NumRegions", "AddressSpace", "AssumedAddressSpace"
+            "__omp_offloading",
+            "omp_outlined",
+            "target",
+            "kernel-info",
+            "offload",
+            "gpu",
+            "cuda",
+            "opencl",
+            "sycl",
+            "hip",
+            "device",
+            "host",
+            "ExternalNotKernel",
+            "SPMDCompatibilityTracker",
+            "GlobalizationLevel",
+            "NumRegions",
+            "AddressSpace",
+            "AssumedAddressSpace",
         ]
-        
+
         # OpenMP-specific pass names that indicate offloading
         openmp_passes = [
-            "openmp-opt", "kernel-info", "openmp", "target-region", 
-            "offload", "device-lower", "gpu-lower"
+            "openmp-opt",
+            "kernel-info",
+            "openmp",
+            "target-region",
+            "offload",
+            "device-lower",
+            "gpu-lower",
         ]
-        
+
         # Check function name, pass name, and remark name
         text_to_check = f"{self.function} {self.pass_name} {self.remark_name}".lower()
         if any(indicator in text_to_check for indicator in offloading_indicators):
             return True
-            
+
         # Check if this is an OpenMP-related pass
         if any(omp_pass in self.pass_name.lower() for omp_pass in openmp_passes):
             return True
-        
+
         # Check argument values for GPU/offloading specific content
         for arg in self.args:
-            if hasattr(arg, 'value') and arg.value:
+            if hasattr(arg, "value") and arg.value:
                 arg_text = str(arg.value).lower()
                 if any(indicator in arg_text for indicator in offloading_indicators):
                     return True
-            
+
             # Check for specific OpenMP argument types
-            if hasattr(arg, 'type') and arg.type in [
-                "ExternalNotKernel", "SPMDCompatibilityTracker", "GlobalizationLevel",
-                "NumRegions", "AddressSpace", "AssumedAddressSpace"
+            if hasattr(arg, "type") and arg.type in [
+                "ExternalNotKernel",
+                "SPMDCompatibilityTracker",
+                "GlobalizationLevel",
+                "NumRegions",
+                "AddressSpace",
+                "AssumedAddressSpace",
             ]:
                 return True
-        
+
         # Check message content for OpenMP target-related keywords
         message = self.get_message().lower()
         target_keywords = [
-            "target region", "target directive", "offload", "device", 
-            "kernel", "gpu", "accelerator", "teams", "thread_limit"
+            "target region",
+            "target directive",
+            "offload",
+            "device",
+            "kernel",
+            "gpu",
+            "accelerator",
+            "teams",
+            "thread_limit",
         ]
         if any(keyword in message for keyword in target_keywords):
             return True
-        
+
         return False
 
 
 @dataclass
 class CompilationUnit:
     source_file: str
     remarks: List[OptimizationRemark] = field(default_factory=list)
     build_time: Optional[float] = None
     optimization_level: Optional[str] = None
     target_arch: Optional[str] = None
-    
+
     def get_remarks_by_pass(self) -> Dict[str, List[OptimizationRemark]]:
         result = {}
         for remark in self.remarks:
             result.setdefault(remark.pass_name, []).append(remark)
         return result
-    
+
     def get_remarks_by_type(self) -> Dict[RemarkType, List[OptimizationRemark]]:
         result = {t: [] for t in RemarkType}
         for remark in self.remarks:
             result[remark.remark_type].append(remark)
         return result
-    
+
     def get_offloading_remarks(self) -> List[OptimizationRemark]:
         return [r for r in self.remarks if r.is_offloading_related()]
-    
+
     def get_summary(self) -> Dict[str, Any]:
         by_type = self.get_remarks_by_type()
         return {
             "source_file": self.source_file,
             "total_remarks": len(self.remarks),
@@ -200,107 +271,115 @@
             "analysis": len(by_type[RemarkType.ANALYSIS]),
             "offloading_remarks": len(self.get_offloading_remarks()),
             "passes_involved": list(self.get_remarks_by_pass().keys()),
             "build_time": self.build_time,
             "optimization_level": self.optimization_level,
-            "target_arch": self.target_arch
+            "target_arch": self.target_arch,
         }
-    
+
     def deduplicate_remarks(self):
         """Remove duplicate remarks based on key attributes."""
         seen = set()
         unique_remarks = []
-        
+
         for remark in self.remarks:
             # Create a key based on remark attributes
             key = (
                 remark.remark_type,
                 remark.pass_name,
                 remark.remark_name,
                 remark.function,
                 remark.debug_loc.line if remark.debug_loc else 0,
-                remark.debug_loc.column if remark.debug_loc else 0
+                remark.debug_loc.column if remark.debug_loc else 0,
             )
-            
+
             if key not in seen:
                 seen.add(key)
                 unique_remarks.append(remark)
-        
+
         self.remarks = unique_remarks
 
     def load_source_code(self, source_directory: str = ".") -> Optional[str]:
         """Load the actual source code for this file."""
         possible_paths = [
             Path(source_directory) / self.source_file,
             Path(source_directory) / "src" / self.source_file,
             Path(source_directory) / "source" / self.source_file,
         ]
-        
+
         for path in possible_paths:
             if path.exists() and path.is_file():
                 try:
-                    with open(path, 'r', encoding='utf-8') as f:
+                    with open(path, "r", encoding="utf-8") as f:
                         return f.read()
                 except Exception:
                     continue
         return None
-    
+
     def get_source_lines(self, source_directory: str = ".") -> List[str]:
         """Get source code as list of lines."""
         source_code = self.load_source_code(source_directory)
-        return source_code.split('\n') if source_code else []
+        return source_code.split("\n") if source_code else []
 
 
 @dataclass
 class Project:
     name: str
     compilation_units: List[CompilationUnit] = field(default_factory=list)
     build_system: Optional[str] = None
-    
+
     def add_compilation_unit(self, unit: CompilationUnit):
         self.compilation_units.append(unit)
-    
+
     def get_project_summary(self) -> Dict[str, Any]:
         total_remarks = sum(len(unit.remarks) for unit in self.compilation_units)
-        total_offloading = sum(len(unit.get_offloading_remarks()) for unit in self.compilation_units)
+        total_offloading = sum(
+            len(unit.get_offloading_remarks()) for unit in self.compilation_units
+        )
         all_passes = set()
-        
+
         # Collect type counts across all units
         type_counts = {remark_type: 0 for remark_type in RemarkType}
         pass_counts = {}
-        
+
         for unit in self.compilation_units:
             all_passes.update(unit.get_remarks_by_pass().keys())
             by_type = unit.get_remarks_by_type()
             for remark_type, remarks in by_type.items():
                 type_counts[remark_type] += len(remarks)
-            
+
             for remark in unit.remarks:
                 pass_counts[remark.pass_name] = pass_counts.get(remark.pass_name, 0) + 1
-        
+
         return {
             "project_name": self.name,
             "compilation_units": len(self.compilation_units),
-            "total_files": len(self.compilation_units),  # Added for frontend compatibility
+            "total_files": len(
+                self.compilation_units
+            ),  # Added for frontend compatibility
             "total_remarks": total_remarks,
             "total_offloading_remarks": total_offloading,
             "total_passed": type_counts[RemarkType.PASSED],
-            "total_missed": type_counts[RemarkType.MISSED], 
+            "total_missed": type_counts[RemarkType.MISSED],
             "total_analysis": type_counts[RemarkType.ANALYSIS],
             "unique_passes": list(all_passes),
             "pass_counts": pass_counts,
-            "most_active_passes": sorted(pass_counts.items(), key=lambda x: x[1], reverse=True)[:10],
-            "build_system": self.build_system
+            "most_active_passes": sorted(
+                pass_counts.items(), key=lambda x: x[1], reverse=True
+            )[:10],
+            "build_system": self.build_system,
         }
-    
+
     def get_file_list(self) -> List[Dict[str, Any]]:
         files = []
         for unit in self.compilation_units:
             summary = unit.get_summary()
-            files.append({
-                "file": unit.source_file,
-                "remarks": summary["total_remarks"],
-                "passed": summary["passed"],
-                "missed": summary["missed"],
-                "offloading": summary["offloading_remarks"]
-            })
+            files.append(
+                {
+                    "file": unit.source_file,
+                    "remarks": summary["total_remarks"],
+                    "passed": summary["passed"],
+                    "missed": summary["missed"],
+                    "offloading": summary["offloading_remarks"],
+                }
+            )
         return files
--- profile_parser.py	2025-07-08 02:57:37.000000 +0000
+++ profile_parser.py	2025-07-08 02:59:59.097049 +0000
@@ -1,23 +1,24 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
 # See https://llvm.org/LICENSE.txt for license information.
 # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # LLVM Advisor Profile Parser - Parses Chrome Trace format profiling
 # data generated by LLVM OpenMP runtime and other profiling tools.
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 
 import json
 import os
 from typing import Dict, List, Optional, Any
 from dataclasses import dataclass
 from pathlib import Path
+
 
 @dataclass
 class ProfileEvent:
     name: str
     category: str
@@ -26,163 +27,184 @@
     duration: float
     process_id: int
     thread_id: int
     args: Dict[str, Any]
 
+
 @dataclass
 class KernelProfile:
     name: str
     launch_time: float
     execution_time: float
     memory_transfers: List[Dict[str, Any]]
     device_id: int
     grid_size: Optional[tuple] = None
     block_size: Optional[tuple] = None
 
+
 @dataclass
 class ProfileData:
     events: List[ProfileEvent]
     kernels: List[KernelProfile]
     total_time: float
     device_time: float
     host_time: float
     memory_transfer_time: float
 
+
 class ProfileParser:
     def __init__(self, profile_path: str):
         self.profile_path = Path(profile_path)
         self.profile_data: Optional[ProfileData] = None
-    
+
     def parse(self) -> Optional[ProfileData]:
         if not self.profile_path.exists():
             return None
-        
+
         try:
-            with open(self.profile_path, 'r') as f:
+            with open(self.profile_path, "r") as f:
                 raw_data = json.load(f)
-            
+
             # Store raw data for Chrome Trace format compatibility
             self.raw_data = raw_data
-            
-            events = self._parse_events(raw_data.get('traceEvents', []))
+
+            events = self._parse_events(raw_data.get("traceEvents", []))
             kernels = self._extract_kernels(events)
-            
+
             self.profile_data = ProfileData(
                 events=events,
                 kernels=kernels,
                 total_time=self._calculate_total_time(events),
                 device_time=self._calculate_device_time(events),
                 host_time=self._calculate_host_time(events),
-                memory_transfer_time=self._calculate_memory_transfer_time(events)
+                memory_transfer_time=self._calculate_memory_transfer_time(events),
             )
-            
+
             return self.profile_data
-            
+
         except (json.JSONDecodeError, KeyError, IOError) as e:
             print(f"Error parsing profile file: {e}")
             return None
-    
+
     def get_raw_data(self) -> Optional[Dict[str, Any]]:
         """Return raw Chrome Trace format data for frontend processing"""
-        return getattr(self, 'raw_data', None)
-    
+        return getattr(self, "raw_data", None)
+
     def _parse_events(self, trace_events: List[Dict]) -> List[ProfileEvent]:
         events = []
         for event in trace_events:
             try:
-                events.append(ProfileEvent(
-                    name=event.get('name', ''),
-                    category=event.get('cat', ''),
-                    phase=event.get('ph', ''),
-                    timestamp=float(event.get('ts', 0)),
-                    duration=float(event.get('dur', 0)),
-                    process_id=int(event.get('pid', 0)),
-                    thread_id=int(event.get('tid', 0)),
-                    args=event.get('args', {})
-                ))
+                events.append(
+                    ProfileEvent(
+                        name=event.get("name", ""),
+                        category=event.get("cat", ""),
+                        phase=event.get("ph", ""),
+                        timestamp=float(event.get("ts", 0)),
+                        duration=float(event.get("dur", 0)),
+                        process_id=int(event.get("pid", 0)),
+                        thread_id=int(event.get("tid", 0)),
+                        args=event.get("args", {}),
+                    )
+                )
             except (ValueError, TypeError):
                 continue
         return events
-    
+
     def _extract_kernels(self, events: List[ProfileEvent]) -> List[KernelProfile]:
         kernels = []
         # Look for OpenMP target execution events
-        kernel_events = [e for e in events if 'target exe' in e.name or 'Runtime: target exe' in e.name]
-        
+        kernel_events = [
+            e
+            for e in events
+            if "target exe" in e.name or "Runtime: target exe" in e.name
+        ]
+
         for event in kernel_events:
-            if event.phase == 'X':  # Complete event
+            if event.phase == "X":  # Complete event
                 # Parse kernel details from args.detail
-                detail = event.args.get('detail', '')
-                parts = detail.split(';') if detail else []
-                
+                detail = event.args.get("detail", "")
+                parts = detail.split(";") if detail else []
+
                 kernel = KernelProfile(
                     name=event.name,
                     launch_time=event.timestamp,
                     execution_time=event.duration,
                     memory_transfers=[],
-                    device_id=event.args.get('device_id', 0),
+                    device_id=event.args.get("device_id", 0),
                     grid_size=self._parse_grid_size_from_detail(parts),
-                    block_size=self._parse_block_size_from_detail(parts)
+                    block_size=self._parse_block_size_from_detail(parts),
                 )
                 kernels.append(kernel)
-        
+
         return kernels
-    
+
     def _parse_grid_size_from_detail(self, parts: List[str]) -> Optional[tuple]:
         # Extract grid size from OpenMP detail string if available
         for part in parts:
-            if 'NumTeams=' in part:
+            if "NumTeams=" in part:
                 try:
-                    teams = int(part.split('=')[1])
+                    teams = int(part.split("=")[1])
                     return (teams,) if teams > 0 else None
                 except (ValueError, IndexError):
                     pass
         return None
-    
+
     def _parse_block_size_from_detail(self, parts: List[str]) -> Optional[tuple]:
         # Extract block size from OpenMP detail string if available
         # OpenMP doesn't directly map to CUDA block sizes, but we can use thread info if available
         return None  # Not available in OpenMP trace format
-    
+
     def _parse_grid_size(self, args: Dict) -> Optional[tuple]:
-        if 'grid_size' in args:
-            return tuple(args['grid_size'])
+        if "grid_size" in args:
+            return tuple(args["grid_size"])
         return None
-    
+
     def _parse_block_size(self, args: Dict) -> Optional[tuple]:
-        if 'block_size' in args:
-            return tuple(args['block_size'])
+        if "block_size" in args:
+            return tuple(args["block_size"])
         return None
-    
+
     def _calculate_total_time(self, events: List[ProfileEvent]) -> float:
         if not events:
             return 0.0
         start_time = min(e.timestamp for e in events)
         end_time = max(e.timestamp + e.duration for e in events if e.duration > 0)
         return end_time - start_time
-    
+
     def _calculate_device_time(self, events: List[ProfileEvent]) -> float:
         # For OpenMP, consider target execution as device time
-        return sum(e.duration for e in events if 'target exe' in e.name or 'kernel' in e.name.lower())
-    
+        return sum(
+            e.duration
+            for e in events
+            if "target exe" in e.name or "kernel" in e.name.lower()
+        )
+
     def _calculate_host_time(self, events: List[ProfileEvent]) -> float:
         # Host time is everything that's not device execution
         device_time = self._calculate_device_time(events)
         total_time = self._calculate_total_time(events)
         return total_time - device_time
-    
+
     def _calculate_memory_transfer_time(self, events: List[ProfileEvent]) -> float:
-        return sum(e.duration for e in events if 'HostToDev' in e.name or 'DevToHost' in e.name)
-    
+        return sum(
+            e.duration for e in events if "HostToDev" in e.name or "DevToHost" in e.name
+        )
+
     def get_summary(self) -> Dict[str, Any]:
         if not self.profile_data:
             return {}
-        
+
         return {
-            'total_kernels': len(self.profile_data.kernels),
-            'total_time_us': self.profile_data.total_time,
-            'device_time_us': self.profile_data.device_time,
-            'host_time_us': self.profile_data.host_time,
-            'memory_transfer_time_us': self.profile_data.memory_transfer_time,
-            'device_utilization': (self.profile_data.device_time / self.profile_data.total_time * 100) if self.profile_data.total_time > 0 else 0,
-            'top_kernels': sorted(self.profile_data.kernels, key=lambda k: k.execution_time, reverse=True)[:10]
-        } 
+            "total_kernels": len(self.profile_data.kernels),
+            "total_time_us": self.profile_data.total_time,
+            "device_time_us": self.profile_data.device_time,
+            "host_time_us": self.profile_data.host_time,
+            "memory_transfer_time_us": self.profile_data.memory_transfer_time,
+            "device_utilization": (
+                self.profile_data.device_time / self.profile_data.total_time * 100
+            )
+            if self.profile_data.total_time > 0
+            else 0,
+            "top_kernels": sorted(
+                self.profile_data.kernels, key=lambda k: k.execution_time, reverse=True
+            )[:10],
+        }
--- yaml_parser.py	2025-07-08 02:57:37.000000 +0000
+++ yaml_parser.py	2025-07-08 02:59:59.151234 +0000
@@ -1,168 +1,178 @@
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
 # See https://llvm.org/LICENSE.txt for license information.
 # SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 #
 # LLVM Advisor Parser - Parses YAML optimization data and profiling
 # files generated by LLVM and creates structured data models.
 #
-#===----------------------------------------------------------------------===#
+# ===----------------------------------------------------------------------===#
 
 import yaml
 import glob
 import json
 from typing import List, Dict, Any, Optional
 from pathlib import Path
-from .models import OptimizationRemark, CompilationUnit, Project, RemarkType, DebugLocation
+from .models import (
+    OptimizationRemark,
+    CompilationUnit,
+    Project,
+    RemarkType,
+    DebugLocation,
+)
 
 
 class YAMLLoader(yaml.SafeLoader):
     pass
 
 
 def construct_remark(tag):
     def constructor(loader, node):
         value = loader.construct_mapping(node)
-        value['_remark_type'] = tag
+        value["_remark_type"] = tag
         return value
+
     return constructor
 
 
-for tag in ['Passed', 'Missed', 'Analysis']:
-    YAMLLoader.add_constructor(f'!{tag}', construct_remark(tag))
+for tag in ["Passed", "Missed", "Analysis"]:
+    YAMLLoader.add_constructor(f"!{tag}", construct_remark(tag))
 
 
 class OptimizationRecordParser:
     def __init__(self):
         self.remark_type_map = {
-            'Passed': RemarkType.PASSED,
-            'Missed': RemarkType.MISSED,
-            'Analysis': RemarkType.ANALYSIS
+            "Passed": RemarkType.PASSED,
+            "Missed": RemarkType.MISSED,
+            "Analysis": RemarkType.ANALYSIS,
         }
-        self.source_extensions = {'c', 'cpp', 'cc', 'cxx', 'cu', 'f', 'f90', 'f95'}
-    
+        self.source_extensions = {"c", "cpp", "cc", "cxx", "cu", "f", "f90", "f95"}
+
     def parse_yaml_file(self, yaml_path: str) -> List[OptimizationRemark]:
         """Parse a single YAML file and return list of remarks"""
         remarks = []
         try:
-            with open(yaml_path, 'r', encoding='utf-8') as f:
+            with open(yaml_path, "r", encoding="utf-8") as f:
                 documents = yaml.load_all(f, Loader=YAMLLoader)
-                
+
                 for doc in documents:
                     if not doc:
                         continue
-                    
+
                     remark_type = self.remark_type_map.get(
-                        doc.get('_remark_type'), 
-                        RemarkType.MISSED
+                        doc.get("_remark_type"), RemarkType.MISSED
                     )
-                    
+
                     remark = OptimizationRemark.from_dict(doc)
                     remark.remark_type = remark_type
                     remarks.append(remark)
-                        
+
         except Exception as e:
             print(f"Error parsing {yaml_path}: {e}")
-        
+
         return remarks
-    
+
     def parse_directory(self, directory: str, pattern: str = "*.yaml") -> Project:
         """Parse all YAML files in directory and return project"""
         project = Project(name=Path(directory).name)
-        
+
         # Collect all YAML files
-        yaml_files = set(Path(directory).glob(pattern)) | set(Path(directory).rglob(pattern))
-        
+        yaml_files = set(Path(directory).glob(pattern)) | set(
+            Path(directory).rglob(pattern)
+        )
+
         # Group remarks by source file
         units_by_source = {}
-        
+
         for yaml_path in yaml_files:
             remarks = self.parse_yaml_file(str(yaml_path))
             if not remarks:
                 continue
-                
+
             source_file = self._extract_source_file(str(yaml_path), remarks)
-            
+
             if source_file in units_by_source:
                 units_by_source[source_file].remarks.extend(remarks)
             else:
                 units_by_source[source_file] = CompilationUnit(
-                    source_file=source_file, 
-                    remarks=remarks
+                    source_file=source_file, remarks=remarks
                 )
-        
+
         # Add deduplicated units to project
         for unit in units_by_source.values():
             unit.deduplicate_remarks()
             project.add_compilation_unit(unit)
-        
+
         return project
-    
-    def _extract_source_file(self, yaml_path: str, remarks: List[OptimizationRemark]) -> str:
+
+    def _extract_source_file(
+        self, yaml_path: str, remarks: List[OptimizationRemark]
+    ) -> str:
         """Extract source file name from remarks or YAML path"""
         # Try to extract from debug info first
         for remark in remarks:
-            if (remark.debug_loc and 
-                remark.debug_loc.file != '<unknown>' and
-                not remark.debug_loc.file.startswith(('tmp', '.'))):
+            if (
+                remark.debug_loc
+                and remark.debug_loc.file != "<unknown>"
+                and not remark.debug_loc.file.startswith(("tmp", "."))
+            ):
                 return Path(remark.debug_loc.file).name
-        
+
         # Fallback: derive from YAML filename
         yaml_stem = Path(yaml_path).stem
-        if '.' in yaml_stem:
-            parts = yaml_stem.split('.')
+        if "." in yaml_stem:
+            parts = yaml_stem.split(".")
             for i, part in enumerate(parts):
                 if part in self.source_extensions:
-                    return '.'.join(parts[:i+1])
-        
-        return yaml_stem or 'unknown'
-    
-    def parse_single_file(self, yaml_path: str, source_file: Optional[str] = None) -> CompilationUnit:
+                    return ".".join(parts[: i + 1])
+
+        return yaml_stem or "unknown"
+
+    def parse_single_file(
+        self, yaml_path: str, source_file: Optional[str] = None
+    ) -> CompilationUnit:
         """Parse single YAML file and return compilation unit"""
         remarks = self.parse_yaml_file(yaml_path)
         source_file = source_file or self._extract_source_file(yaml_path, remarks)
         return CompilationUnit(source_file=source_file, remarks=remarks)
-    
+
     def get_statistics(self, project: Project) -> Dict[str, Any]:
         """Generate comprehensive statistics for project"""
         stats = {
             "total_files": len(project.compilation_units),
             "total_remarks": 0,
             "by_type": {t.value: 0 for t in RemarkType},
             "by_pass": {},
-            "most_active_passes": []
+            "most_active_passes": [],
         }
-        
+
         for unit in project.compilation_units:
             stats["total_remarks"] += len(unit.remarks)
-            
+
             for remark in unit.remarks:
                 stats["by_type"][remark.remark_type.value] += 1
-                stats["by_pass"][remark.pass_name] = stats["by_pass"].get(remark.pass_name, 0) + 1
-        
+                stats["by_pass"][remark.pass_name] = (
+                    stats["by_pass"].get(remark.pass_name, 0) + 1
+                )
+
         stats["most_active_passes"] = sorted(
-            stats["by_pass"].items(), 
-            key=lambda x: x[1], 
-            reverse=True
+            stats["by_pass"].items(), key=lambda x: x[1], reverse=True
         )[:10]
-        
+
         return stats
-
-
-
 
 
 if __name__ == "__main__":
     parser = OptimizationRecordParser()
     project = parser.parse_directory(".")
-    
+
     if not project.compilation_units:
         print("No YAML files found. Run compiler with optimization remarks enabled.")
         exit(1)
-    
+
     print(f"Project: {project.name}, Units: {len(project.compilation_units)}")
     stats = parser.get_statistics(project)
     print(f"Statistics: {stats}")

``````````

</details>


https://github.com/llvm/llvm-project/pull/147451


More information about the llvm-commits mailing list