Skip to content

Commit cb02d1e

Browse files
committed
mini benchmark for fix filtering oom
Signed-off-by: Jialiang Liang <jiallian@amazon.com>
1 parent 4804ada commit cb02d1e

3 files changed

Lines changed: 428 additions & 0 deletions

File tree

analyze_performance.py

Lines changed: 279 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,279 @@
1+
#!/usr/bin/env python3
2+
"""
3+
Performance Analysis Script for Filter Merge Optimization
4+
5+
This script analyzes the performance log and generates detailed reports.
6+
"""
7+
8+
import re
9+
import sys
10+
from pathlib import Path
11+
from collections import defaultdict
12+
import statistics
13+
14+
def parse_perf_log(log_file, test_log_file=None):
15+
"""Extract performance metrics from log file."""
16+
perf_pattern = re.compile(
17+
r'PERF \[Planning\] analyze=(\d+)μs, filterMerge=(\d+)μs, optimize=(\d+)μs, convert=(\d+)μs, total=(\d+)μs'
18+
)
19+
20+
# Build ordered list of query names from test log
21+
query_names = []
22+
if test_log_file and test_log_file.exists():
23+
running_query_pattern = re.compile(r'Running (Query\d+)')
24+
with open(test_log_file, 'r') as f:
25+
for line in f:
26+
query_match = running_query_pattern.search(line)
27+
if query_match:
28+
query_names.append(query_match.group(1))
29+
30+
# Parse performance data from server log
31+
# Each query runs twice (explain + execute), so we track pairs
32+
results = []
33+
query_index = 0
34+
perf_count_for_query = 0
35+
36+
with open(log_file, 'r') as f:
37+
for line in f:
38+
# Extract performance data
39+
perf_match = perf_pattern.search(line)
40+
if perf_match:
41+
analyze, merge, optimize, convert, total = map(int, perf_match.groups())
42+
43+
# Determine query name (each query has 2 PERF lines: explain + execute)
44+
if query_index < len(query_names):
45+
current_test = query_names[query_index]
46+
else:
47+
current_test = f"Query{query_index + 1}"
48+
49+
results.append({
50+
'test': current_test,
51+
'analyze': analyze,
52+
'merge': merge,
53+
'optimize': optimize,
54+
'convert': convert,
55+
'total': total,
56+
'merge_pct': (merge / total * 100) if total > 0 else 0
57+
})
58+
59+
# Each query generates 2 PERF logs (explain yaml + actual query)
60+
perf_count_for_query += 1
61+
if perf_count_for_query == 2:
62+
query_index += 1
63+
perf_count_for_query = 0
64+
65+
return results
66+
67+
def generate_report(results):
68+
"""Generate detailed performance report."""
69+
if not results:
70+
print("❌ No performance data found!")
71+
print("Make sure:")
72+
print(" 1. The log level is set to INFO")
73+
print(" 2. The instrumentation code is active")
74+
print(" 3. Tests actually ran")
75+
return
76+
77+
print("=" * 80)
78+
print("FILTER MERGE PERFORMANCE ANALYSIS")
79+
print("=" * 80)
80+
print()
81+
82+
# Overall statistics
83+
merge_times = [r['merge'] for r in results]
84+
total_times = [r['total'] for r in results]
85+
merge_pcts = [r['merge_pct'] for r in results]
86+
87+
print(f"📊 OVERALL STATISTICS ({len(results)} queries)")
88+
print("-" * 80)
89+
print(f"Filter Merge Time:")
90+
print(f" Mean: {statistics.mean(merge_times):>8.0f} μs ({statistics.mean(merge_times)/1000:>6.2f} ms)")
91+
print(f" Median: {statistics.median(merge_times):>8.0f} μs ({statistics.median(merge_times)/1000:>6.2f} ms)")
92+
print(f" Std Dev: {statistics.stdev(merge_times) if len(merge_times) > 1 else 0:>8.0f} μs")
93+
print(f" Min: {min(merge_times):>8.0f} μs ({min(merge_times)/1000:>6.2f} ms)")
94+
print(f" Max: {max(merge_times):>8.0f} μs ({max(merge_times)/1000:>6.2f} ms)")
95+
print()
96+
97+
print(f"Total Planning Time:")
98+
print(f" Mean: {statistics.mean(total_times):>8.0f} μs ({statistics.mean(total_times)/1000:>6.2f} ms)")
99+
print(f" Median: {statistics.median(total_times):>8.0f} μs ({statistics.median(total_times)/1000:>6.2f} ms)")
100+
print()
101+
102+
print(f"Filter Merge as % of Planning:")
103+
print(f" Mean: {statistics.mean(merge_pcts):>6.2f}%")
104+
print(f" Median: {statistics.median(merge_pcts):>6.2f}%")
105+
print(f" Max: {max(merge_pcts):>6.2f}%")
106+
print()
107+
108+
# Performance assessment
109+
print("=" * 80)
110+
print("📈 PERFORMANCE ASSESSMENT")
111+
print("-" * 80)
112+
113+
avg_merge_ms = statistics.mean(merge_times) / 1000
114+
avg_merge_pct = statistics.mean(merge_pcts)
115+
116+
if avg_merge_ms < 1:
117+
rating = "✅ EXCELLENT"
118+
recommendation = "No optimization needed. Merge immediately."
119+
elif avg_merge_ms < 5:
120+
rating = "✅ GOOD"
121+
recommendation = "Acceptable overhead. Safe to merge."
122+
elif avg_merge_ms < 10:
123+
rating = "⚠️ MODERATE"
124+
recommendation = "Consider adding conditional execution (pre-check)."
125+
else:
126+
rating = "❌ HIGH OVERHEAD"
127+
recommendation = "Optimization required before merge."
128+
129+
print(f"Rating: {rating}")
130+
print(f"Average overhead: {avg_merge_ms:.2f}ms ({avg_merge_pct:.1f}% of planning)")
131+
print(f"Recommendation: {recommendation}")
132+
print()
133+
134+
# Percentile analysis
135+
merge_times_sorted = sorted(merge_times)
136+
n = len(merge_times_sorted)
137+
p50 = merge_times_sorted[n//2]
138+
p95 = merge_times_sorted[int(n*0.95)]
139+
p99 = merge_times_sorted[int(n*0.99)] if n > 100 else merge_times_sorted[-1]
140+
141+
print("=" * 80)
142+
print("📊 PERCENTILE ANALYSIS")
143+
print("-" * 80)
144+
print(f"Filter Merge Time Percentiles:")
145+
print(f" p50: {p50:>6.0f} μs ({p50/1000:>6.2f} ms)")
146+
print(f" p95: {p95:>6.0f} μs ({p95/1000:>6.2f} ms)")
147+
print(f" p99: {p99:>6.0f} μs ({p99/1000:>6.2f} ms)")
148+
print()
149+
150+
# Breakdown by phase
151+
print("=" * 80)
152+
print("⏱️ PLANNING PHASE BREAKDOWN")
153+
print("-" * 80)
154+
155+
avg_analyze = statistics.mean([r['analyze'] for r in results])
156+
avg_merge = statistics.mean([r['merge'] for r in results])
157+
avg_optimize = statistics.mean([r['optimize'] for r in results])
158+
avg_convert = statistics.mean([r['convert'] for r in results])
159+
avg_total = statistics.mean([r['total'] for r in results])
160+
161+
print(f"Phase Averages:")
162+
print(f" Analyze: {avg_analyze:>8.0f} μs ({avg_analyze/avg_total*100:>5.1f}%)")
163+
print(f" Filter Merge: {avg_merge:>8.0f} μs ({avg_merge/avg_total*100:>5.1f}%) ← THIS IS WHAT WE ADDED")
164+
print(f" Optimize: {avg_optimize:>8.0f} μs ({avg_optimize/avg_total*100:>5.1f}%)")
165+
print(f" Convert: {avg_convert:>8.0f} μs ({avg_convert/avg_total*100:>5.1f}%)")
166+
print(f" TOTAL: {avg_total:>8.0f} μs (100.0%)")
167+
print()
168+
169+
# Top slowest queries (aggregate by query name to remove duplicates)
170+
print("=" * 80)
171+
print("🐢 TOP 10 SLOWEST FILTER MERGE TIMES")
172+
print("-" * 80)
173+
174+
# Group by test name and average the metrics
175+
query_aggregates = {}
176+
for r in results:
177+
if r['test'] not in query_aggregates:
178+
query_aggregates[r['test']] = {
179+
'merge_times': [],
180+
'total_times': [],
181+
'merge_pcts': []
182+
}
183+
query_aggregates[r['test']]['merge_times'].append(r['merge'])
184+
query_aggregates[r['test']]['total_times'].append(r['total'])
185+
query_aggregates[r['test']]['merge_pcts'].append(r['merge_pct'])
186+
187+
# Calculate averages and sort
188+
query_stats = []
189+
for test_name, data in query_aggregates.items():
190+
query_stats.append({
191+
'test': test_name,
192+
'avg_merge': statistics.mean(data['merge_times']),
193+
'max_merge': max(data['merge_times']),
194+
'avg_merge_pct': statistics.mean(data['merge_pcts']),
195+
'count': len(data['merge_times'])
196+
})
197+
198+
sorted_queries = sorted(query_stats, key=lambda q: q['avg_merge'], reverse=True)[:10]
199+
print(f"{'Rank':<6} {'Query':<20} {'Avg Merge Time':<20} {'Max Merge Time':<20} {'% of Planning':<15}")
200+
print("-" * 80)
201+
for i, q in enumerate(sorted_queries, 1):
202+
print(f"{i:<6} {q['test']:<20} {q['avg_merge']:>8.0f} μs ({q['avg_merge']/1000:>5.2f}ms) {q['max_merge']:>8.0f} μs ({q['max_merge']/1000:>5.2f}ms) {q['avg_merge_pct']:>5.1f}%")
203+
print()
204+
205+
# Distribution analysis
206+
print("=" * 80)
207+
print("📈 DISTRIBUTION ANALYSIS")
208+
print("-" * 80)
209+
210+
buckets = {
211+
'<100μs': 0,
212+
'100-500μs': 0,
213+
'500-1000μs (1ms)': 0,
214+
'1-5ms': 0,
215+
'5-10ms': 0,
216+
'>10ms': 0
217+
}
218+
219+
for time in merge_times:
220+
if time < 100:
221+
buckets['<100μs'] += 1
222+
elif time < 500:
223+
buckets['100-500μs'] += 1
224+
elif time < 1000:
225+
buckets['500-1000μs (1ms)'] += 1
226+
elif time < 5000:
227+
buckets['1-5ms'] += 1
228+
elif time < 10000:
229+
buckets['5-10ms'] += 1
230+
else:
231+
buckets['>10ms'] += 1
232+
233+
print("Filter Merge Time Distribution:")
234+
for bucket, count in buckets.items():
235+
pct = count / len(merge_times) * 100
236+
bar = '█' * int(pct / 2)
237+
print(f" {bucket:<20} {count:>4} ({pct:>5.1f}%) {bar}")
238+
print()
239+
240+
# Export CSV for further analysis
241+
csv_file = Path(__file__).parent / 'performance_analysis.csv'
242+
with open(csv_file, 'w') as f:
243+
f.write('Test,Analyze(μs),FilterMerge(μs),Optimize(μs),Convert(μs),Total(μs),FilterMerge%\n')
244+
for r in results:
245+
f.write(f"{r['test']},{r['analyze']},{r['merge']},{r['optimize']},{r['convert']},{r['total']},{r['merge_pct']:.2f}\n")
246+
247+
print("=" * 80)
248+
print(f"📄 Detailed CSV exported to: {csv_file}")
249+
print("=" * 80)
250+
print()
251+
252+
def main():
253+
# Try server log first (where the actual perf data is), fallback to test log
254+
server_log = Path(__file__).parent / 'integ-test/build/testclusters/integTest-0/logs/integTest.log'
255+
test_log = Path(__file__).parent / 'performance_results.log'
256+
257+
log_file = server_log if server_log.exists() else test_log
258+
259+
if not log_file.exists():
260+
print(f"❌ Log file not found: {log_file}")
261+
print()
262+
print("Please run the performance test first:")
263+
print(" ./run_performance_test.sh")
264+
print()
265+
print("Looked for logs at:")
266+
print(f" - {server_log}")
267+
print(f" - {test_log}")
268+
sys.exit(1)
269+
270+
print(f"Analyzing log file: {log_file}")
271+
if test_log.exists():
272+
print(f"Using test log for query names: {test_log}")
273+
print()
274+
275+
results = parse_perf_log(log_file, test_log if test_log.exists() else None)
276+
generate_report(results)
277+
278+
if __name__ == '__main__':
279+
main()

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,32 @@ public void executeWithCalcite(
106106
CalcitePlanContext context =
107107
CalcitePlanContext.create(
108108
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
109+
110+
long analyzeStart = System.nanoTime();
109111
RelNode relNode = analyze(plan, context);
112+
long analyzeTime = System.nanoTime() - analyzeStart;
113+
114+
long mergeStart = System.nanoTime();
110115
relNode = mergeAdjacentFilters(relNode);
116+
long mergeTime = System.nanoTime() - mergeStart;
117+
118+
long optimizeStart = System.nanoTime();
111119
RelNode optimized = optimize(relNode, context);
120+
long optimizeTime = System.nanoTime() - optimizeStart;
121+
122+
long convertStart = System.nanoTime();
112123
RelNode calcitePlan = convertToCalcitePlan(optimized);
124+
long convertTime = System.nanoTime() - convertStart;
125+
126+
long totalPlanningTime = analyzeTime + mergeTime + optimizeTime + convertTime;
127+
128+
// Use System.out to ensure capture in test logs
129+
String perfLog = String.format("PERF [Planning] analyze=%dμs, filterMerge=%dμs, optimize=%dμs, convert=%dμs, total=%dμs",
130+
analyzeTime / 1000, mergeTime / 1000, optimizeTime / 1000,
131+
convertTime / 1000, totalPlanningTime / 1000);
132+
System.out.println(perfLog);
133+
log.info(perfLog);
134+
113135
executionEngine.execute(calcitePlan, context, listener);
114136
return null;
115137
});

0 commit comments

Comments
 (0)