Skip to content

Commit c0c7bf4

Browse files
committed
Arrow experiment
1 parent 57ffd1b commit c0c7bf4

11 files changed

Lines changed: 2669 additions & 8 deletions

ARROW.md

Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
# Arrow Columnar Data Processing
2+
3+
This document explains the Apache Arrow implementation introduced for efficient batch aggregation of oracle observations in the LLO (Low-Latency Oracle) system.
4+
5+
## Overview
6+
7+
### Why Arrow?
8+
9+
The Arrow implementation addresses key performance challenges:
10+
11+
1. **Memory Efficiency** - Replaces the 1GB static memory ballast with controlled, bounded allocation
12+
2. **Batch Processing** - Enables efficient columnar operations on thousands of observations simultaneously
13+
3. **Reduced Allocations** - Builder pooling minimizes GC pressure during repeated aggregation cycles
14+
15+
### Dependencies
16+
17+
```go
18+
github.com/apache/arrow-go/v18 v18.3.1
19+
```
20+
21+
## Architecture
22+
23+
```
24+
┌─────────────────────────────────────────────────────────────────────────────┐
25+
│ Arrow Data Pipeline │
26+
├─────────────────────────────────────────────────────────────────────────────┤
27+
│ │
28+
│ Node Observations │
29+
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
30+
│ │ Observer │ │ Observer │ │ Observer │ ... │
31+
│ │ 0 │ │ 1 │ │ N │ │
32+
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
33+
│ │ │ │ │
34+
│ └────────────┼────────────┘ │
35+
│ ▼ │
36+
│ ┌────────────────────────┐ │
37+
│ │ ArrowObservationMerger │ │
38+
│ │ MergeObservations() │ │
39+
│ └───────────┬────────────┘ │
40+
│ ▼ │
41+
│ ┌────────────────────────┐ │
42+
│ │ Arrow Record │ (ObservationSchema) │
43+
│ │ [observer_id, stream_id, value_type, values...] │
44+
│ └───────────┬────────────┘ │
45+
│ ▼ │
46+
│ ┌────────────────────────┐ │
47+
│ │ ArrowAggregator │ │
48+
│ │ AggregateObservations()│ │
49+
│ └───────────┬────────────┘ │
50+
│ ▼ │
51+
│ ┌────────────────────────┐ │
52+
│ │ StreamAggregates │ (per-stream aggregated values) │
53+
│ └────────────────────────┘ │
54+
│ │
55+
└─────────────────────────────────────────────────────────────────────────────┘
56+
```
57+
58+
## Schemas
59+
60+
Four Arrow schemas are defined in `llo/arrow_schemas.go`:
61+
62+
### 1. ObservationSchema
63+
64+
Stores merged observations from all nodes for batch aggregation.
65+
66+
| Column | Type | Description |
67+
|--------|------|-------------|
68+
| `observer_id` | uint8 | Node that produced the observation (0-255) |
69+
| `stream_id` | uint32 | Stream identifier |
70+
| `value_type` | uint8 | Type discriminator (see Value Types) |
71+
| `decimal_value` | binary | Encoded decimal value |
72+
| `quote_bid` | binary | Quote bid component |
73+
| `quote_benchmark` | binary | Quote benchmark component |
74+
| `quote_ask` | binary | Quote ask component |
75+
| `observed_at_ns` | uint64 | Provider timestamp (nanoseconds) |
76+
| `timestamp_ns` | uint64 | Node observation timestamp |
77+
78+
### 2. StreamAggregatesSchema
79+
80+
Output from aggregation, input to report generation.
81+
82+
| Column | Type | Description |
83+
|--------|------|-------------|
84+
| `stream_id` | uint32 | Stream identifier |
85+
| `aggregator` | uint32 | Aggregator type used |
86+
| `value_type` | uint8 | Type discriminator |
87+
| `decimal_value` | binary | Aggregated decimal |
88+
| `quote_*` | binary | Aggregated quote components |
89+
| `observed_at_ns` | uint64 | Observation timestamp |
90+
91+
### 3. CacheSchema
92+
93+
Observation cache with TTL-based expiration.
94+
95+
| Column | Type | Description |
96+
|--------|------|-------------|
97+
| `stream_id` | uint32 | Stream identifier |
98+
| `value_type` | uint8 | Type discriminator |
99+
| `decimal_value` | binary | Cached decimal |
100+
| `quote_*` | binary | Cached quote components |
101+
| `observed_at_ns` | uint64 | Observation timestamp |
102+
| `expires_at_ns` | int64 | TTL expiration timestamp |
103+
104+
### 4. TransmissionSchema
105+
106+
Batched report transmissions with Arrow IPC compression.
107+
108+
| Column | Type | Description |
109+
|--------|------|-------------|
110+
| `server_url` | string | Destination server |
111+
| `config_digest` | fixed[32] | Configuration hash |
112+
| `seq_nr` | uint64 | Sequence number |
113+
| `report_data` | large_binary | Encoded report |
114+
| `lifecycle_stage` | string | Report lifecycle stage |
115+
| `report_format` | uint32 | Format identifier |
116+
| `signatures` | list<binary> | Report signatures |
117+
| `signers` | list<uint8> | Signer indices |
118+
| `transmission_hash` | fixed[32] | Transmission hash |
119+
| `created_at_ns` | timestamp[ns] | Creation time |
120+
121+
## Value Types
122+
123+
Three value types are supported, identified by `value_type` column:
124+
125+
```go
126+
const (
127+
StreamValueTypeDecimal uint8 = 0 // Single decimal value
128+
StreamValueTypeQuote uint8 = 1 // Quote with bid/benchmark/ask
129+
StreamValueTypeTimestampd uint8 = 2 // Decimal with observation timestamp
130+
)
131+
```
132+
133+
### Decimal Encoding
134+
135+
Values use `shopspring/decimal` binary encoding for precise representation:
136+
137+
```go
138+
// Encode
139+
bytes, _ := decimal.MarshalBinary()
140+
141+
// Decode
142+
var d decimal.Decimal
143+
d.UnmarshalBinary(bytes)
144+
```
145+
146+
## Core Components
147+
148+
### arrow_schemas.go
149+
150+
Defines all four Arrow schemas and column index constants for type-safe access:
151+
152+
```go
153+
// Column indices for ObservationSchema
154+
const (
155+
ObsColObserverID = iota
156+
ObsColStreamID
157+
ObsColValueType
158+
// ...
159+
)
160+
```
161+
162+
### arrow_pool.go
163+
164+
Memory management with two pool types:
165+
166+
**LLOMemoryPool** - Wraps Arrow's allocator with metrics and optional bounds:
167+
168+
```go
169+
pool := NewLLOMemoryPool(maxBytes) // 0 for unlimited
170+
allocated, allocs, releases := pool.Metrics()
171+
```
172+
173+
**ArrowBuilderPool** - Unified pool for all schema builders:
174+
175+
```go
176+
builderPool := NewArrowBuilderPool(maxMemoryBytes)
177+
178+
// Get a builder for observations
179+
builder := builderPool.GetObservationBuilder()
180+
// ... use builder ...
181+
builderPool.PutObservationBuilder(builder)
182+
```
183+
184+
### arrow_converters.go
185+
186+
Type conversion between Go types and Arrow columns:
187+
188+
```go
189+
// Write StreamValue to Arrow builders
190+
StreamValueToArrow(sv, valueTypeBuilder, decimalBuilder, bidBuilder, ...)
191+
192+
// Read StreamValue from Arrow arrays
193+
sv, _ := ArrowToStreamValue(idx, valueTypeArr, decimalArr, bidArr, ...)
194+
195+
// Batch conversion for cache operations
196+
record, _ := StreamValuesToArrowRecord(values, pool)
197+
values, _ := ArrowRecordToStreamValues(record)
198+
```
199+
200+
### arrow_observation_merger.go
201+
202+
Merges observations from multiple nodes into a single Arrow record:
203+
204+
```go
205+
merger := NewArrowObservationMerger(pool, codec)
206+
207+
// Merge attributed observations from consensus
208+
record, _ := merger.MergeObservations(attributedObservations)
209+
defer record.Release()
210+
211+
// Utility functions
212+
counts := CountByStreamID(record) // {streamID: count}
213+
counts := CountByObserver(record) // {observerID: count}
214+
```
215+
216+
### arrow_aggregators.go
217+
218+
Performs vectorized aggregation on Arrow records:
219+
220+
```go
221+
aggregator := NewArrowAggregator(pool)
222+
223+
// Aggregate with channel definitions providing aggregator type per stream
224+
results, _ := aggregator.AggregateObservations(record, channelDefs, f)
225+
// f = fault tolerance threshold (observations must exceed f)
226+
```
227+
228+
**Supported Aggregators:**
229+
230+
| Aggregator | Description |
231+
|------------|-------------|
232+
| `Median` | Sorts values, returns middle element |
233+
| `Mode` | Most common value (requires f+1 agreement) |
234+
| `Quote` | Median of each quote component separately |
235+
236+
## Data Flow Example
237+
238+
```go
239+
// 1. Create pools
240+
builderPool := NewArrowBuilderPool(0)
241+
codec := &StandardObservationCodec{}
242+
243+
// 2. Merge observations from all nodes
244+
merger := NewArrowObservationMerger(builderPool, codec)
245+
record, _ := merger.MergeObservations(attributedObservations)
246+
defer record.Release()
247+
248+
// 3. Aggregate using channel definitions
249+
aggregator := NewArrowAggregator(builderPool)
250+
streamAggregates, _ := aggregator.AggregateObservations(record, channelDefs, f)
251+
252+
// 4. Use aggregated values for report generation
253+
for streamID, aggregatorValues := range streamAggregates {
254+
for aggregatorType, value := range aggregatorValues {
255+
// Build reports...
256+
}
257+
}
258+
```
259+
260+
## Memory Management Best Practices
261+
262+
1. **Always release records** when done:
263+
```go
264+
record, _ := merger.MergeObservations(...)
265+
defer record.Release()
266+
```
267+
268+
2. **Return builders to pool** after use:
269+
```go
270+
builder := pool.GetObservationBuilder()
271+
// ... use builder ...
272+
pool.PutObservationBuilder(builder)
273+
```
274+
275+
3. **Set memory limits** in production:
276+
```go
277+
pool := NewArrowBuilderPool(500 * 1024 * 1024) // 500MB limit
278+
```
279+
280+
4. **Monitor allocation metrics**:
281+
```go
282+
allocated, allocs, releases := pool.MemoryStats()
283+
```
284+
285+
## Testing
286+
287+
### Unit Tests
288+
289+
`llo/arrow_aggregators_test.go` - Validates aggregation logic for all value types and aggregator combinations.
290+
291+
### Benchmarks
292+
293+
`llo/arrow_bench_test.go` - Performance benchmarks for:
294+
- Median aggregation
295+
- Quote aggregation
296+
- Type conversion operations
297+
- Builder pool efficiency
298+
299+
Run benchmarks:
300+
```bash
301+
cd llo
302+
go test -bench=. -benchmem ./...
303+
```
304+
305+
### Comparison Tests
306+
307+
`llo/arrow_comparison_test.go` - Compares Arrow implementation against the original implementation at various scales:
308+
- 10, 100, 1000, 5000, 10000 observations
309+
310+
Run comparison:
311+
```bash
312+
go test -run=Comparison -v ./llo/
313+
```
314+
315+
## File Reference
316+
317+
| File | Purpose |
318+
|------|---------|
319+
| `llo/arrow_schemas.go` | Schema definitions and column constants |
320+
| `llo/arrow_pool.go` | Memory pool and builder pool management |
321+
| `llo/arrow_converters.go` | Go type <-> Arrow conversion utilities |
322+
| `llo/arrow_observation_merger.go` | Multi-node observation merging |
323+
| `llo/arrow_aggregators.go` | Vectorized aggregation algorithms |
324+
| `llo/arrow_aggregators_test.go` | Unit tests |
325+
| `llo/arrow_bench_test.go` | Performance benchmarks |
326+
| `llo/arrow_comparison_test.go` | Before/after comparison tests |

go.mod

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-data-streams
33
go 1.25.3
44

55
require (
6+
github.com/apache/arrow-go/v18 v18.3.1
67
github.com/ethereum/go-ethereum v1.15.3
78
github.com/expr-lang/expr v1.17.5
89
github.com/goccy/go-json v0.10.5
@@ -52,6 +53,7 @@ require (
5253
github.com/go-playground/validator/v10 v10.26.0 // indirect
5354
github.com/go-viper/mapstructure/v2 v2.4.0 // indirect
5455
github.com/golang/protobuf v1.5.4 // indirect
56+
github.com/google/flatbuffers v25.2.10+incompatible // indirect
5557
github.com/google/go-cmp v0.7.0 // indirect
5658
github.com/google/uuid v1.6.0 // indirect
5759
github.com/gorilla/websocket v1.5.3 // indirect
@@ -73,12 +75,12 @@ require (
7375
github.com/jmoiron/sqlx v1.4.0 // indirect
7476
github.com/jpillora/backoff v1.0.0 // indirect
7577
github.com/json-iterator/go v1.1.12 // indirect
78+
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
7679
github.com/leodido/go-urn v1.4.0 // indirect
7780
github.com/lib/pq v1.10.9 // indirect
7881
github.com/mailru/easyjson v0.9.0 // indirect
7982
github.com/mattn/go-colorable v0.1.14 // indirect
8083
github.com/mattn/go-isatty v0.0.20 // indirect
81-
github.com/mattn/go-runewidth v0.0.14 // indirect
8284
github.com/mmcloughlin/addchain v0.4.0 // indirect
8385
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
8486
github.com/modern-go/reflect2 v1.0.2 // indirect
@@ -93,7 +95,6 @@ require (
9395
github.com/prometheus/client_model v0.6.2 // indirect
9496
github.com/prometheus/common v0.65.0 // indirect
9597
github.com/prometheus/procfs v0.16.1 // indirect
96-
github.com/rivo/uniseg v0.4.4 // indirect
9798
github.com/rs/cors v1.9.0 // indirect
9899
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect
99100
github.com/scylladb/go-reflectx v1.0.1 // indirect
@@ -111,6 +112,7 @@ require (
111112
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
112113
github.com/x448/float16 v0.8.4 // indirect
113114
github.com/yusufpapurcu/wmi v1.2.4 // indirect
115+
github.com/zeebo/xxh3 v1.0.2 // indirect
114116
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
115117
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect
116118
go.opentelemetry.io/otel v1.38.0 // indirect
@@ -134,11 +136,15 @@ require (
134136
go.uber.org/multierr v1.11.0 // indirect
135137
go.uber.org/zap v1.27.0 // indirect
136138
golang.org/x/crypto v0.45.0 // indirect
139+
golang.org/x/mod v0.29.0 // indirect
137140
golang.org/x/net v0.47.0 // indirect
138141
golang.org/x/sync v0.18.0 // indirect
139142
golang.org/x/sys v0.38.0 // indirect
143+
golang.org/x/telemetry v0.0.0-20251008203120-078029d740a8 // indirect
140144
golang.org/x/text v0.31.0 // indirect
141145
golang.org/x/time v0.12.0 // indirect
146+
golang.org/x/tools v0.38.0 // indirect
147+
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
142148
google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect
143149
google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect
144150
gopkg.in/yaml.v3 v3.0.1 // indirect

0 commit comments

Comments
 (0)