Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/consumer-heap-gc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Bound Shape.Consumer heap growth: make the consumer family's process spawn options (incl. `fullsweep_after`) configurable per process via `ELECTRIC_PROCESS_SPAWN_OPTS`, and add an opt-in adaptive GC that runs after a transaction fragment when the consumer's heap exceeds the runtime-tunable `ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD` (off by default).
1 change: 1 addition & 0 deletions packages/sync-service/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ config :electric,
process_registry_partitions: env!("ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS", :integer, nil),
process_spawn_opts:
env!("ELECTRIC_PROCESS_SPAWN_OPTS", &Electric.Config.parse_spawn_opts!/1, %{}),
consumer_gc_heap_threshold: env!("ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD", :integer, nil),
http_api_num_acceptors: env!("ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS", :integer, 100),
conn_max_requests: env!("ELECTRIC_TWEAKS_CONN_MAX_REQUESTS", :integer, nil),
handler_fullsweep_after: env!("ELECTRIC_TWEAKS_HANDLER_FULLSWEEP_AFTER", :integer, nil),
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ defmodule Electric.Application do
shape_enable_suspend?: get_env(opts, :shape_enable_suspend?),
conn_max_requests: get_env(opts, :conn_max_requests),
handler_fullsweep_after: get_env(opts, :handler_fullsweep_after),
process_spawn_opts: get_env(opts, :process_spawn_opts)
process_spawn_opts: get_env(opts, :process_spawn_opts),
consumer_gc_heap_threshold: get_env(opts, :consumer_gc_heap_threshold)
],
manual_table_publishing?: get_env(opts, :manual_table_publishing?),
shape_db_opts: [
Expand Down
4 changes: 4 additions & 0 deletions packages/sync-service/lib/electric/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ defmodule Electric.Config do
#
# e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]}
process_spawn_opts: %{},
# Heap-size threshold (in BYTES) above which a consumer runs :erlang.garbage_collect()
# after processing a transaction fragment. nil disables adaptive GC. Looked up at
# runtime via StackConfig so it can be changed from a live IEx shell.
consumer_gc_heap_threshold: nil,
## Misc
process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0,
feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []),
Expand Down
168 changes: 165 additions & 3 deletions packages/sync-service/lib/electric/shapes/consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ defmodule Electric.Shapes.Consumer do
@default_snapshot_timeout 45_000
@stop_and_clean_timeout 30_000
@stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason()
@word_size :erlang.system_info(:wordsize)

# Minimum wall-clock interval (ms) between consumer-forced full GC sweeps.
# Prevents GC-thrashing on the replication critical path: the ShapeLogCollector
# blocks until every consumer replies, so a forced GC on every fragment (e.g.
# during a buffered-fragment drain) would add publish latency proportional to
# the number of fragments. Hysteresis caps the worst-case frequency to at most
# one forced sweep per @gc_min_interval_ms regardless of fragment rate.
@gc_min_interval_ms 1_000

@type initialize_shape_opts() :: %{
:action => :create | :restore,
Expand Down Expand Up @@ -110,9 +119,85 @@ defmodule Electric.Shapes.Consumer do
ConsumerRegistry.whereis(stack_id, shape_handle)
end

@doc """
Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack.
Takes effect immediately for that stack's consumers — safe to call from IEx.

**Critical-path note**: this GC runs synchronously on the replication path —
the ShapeLogCollector blocks until every consumer replies. Prefer a conservative
(high) threshold to minimise added publish latency. The per-process
`fullsweep_after` spawn-opt (configured via `ELECTRIC_PROCESS_SPAWN_OPTS`) is
the lower-risk lever for steady-state heap bounding; this adaptive GC is a
targeted, runtime-tunable backstop. Forced-GC frequency is capped to at most
once per `@gc_min_interval_ms` (see `should_force_gc?/5`).
"""
@spec set_gc_heap_threshold(Electric.stack_id(), non_neg_integer() | nil) :: :ok
def set_gc_heap_threshold(stack_id, threshold_bytes)
when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do
Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes)
:ok
end

@doc """
Set the adaptive-GC heap threshold for every live stack on this node.
Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere.
Intended for live experimentation from an IEx shell.
"""
@spec set_gc_heap_threshold_all_stacks(non_neg_integer() | nil) :: {:ok, non_neg_integer()}
def set_gc_heap_threshold_all_stacks(threshold_bytes) do
stack_ids = list_stack_ids()

# Guard against a stack dying between enumeration and the put: if the
# StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError.
# We skip such stale entries rather than crashing the operator call.
# Only successfully-written stacks are counted so the returned value
# reflects reality even when stacks die mid-iteration.
count =
Enum.reduce(stack_ids, 0, fn stack_id, acc ->
try do
set_gc_heap_threshold(stack_id, threshold_bytes)
acc + 1
rescue
ArgumentError -> acc
end
end)

{:ok, count}
end

# Enumerate live stacks by scanning ETS tables whose names match the
# Electric.StackConfig table-name prefix ("Electric.StackConfig:<stack_id>").
# This is the most direct approach: StackConfig creates one named ETS table per
# stack, so the set of live tables IS the set of live stacks.
# No first-class listing API exists in the codebase (grep confirmed).
# A race (stack dies mid-iteration) is harmless: StackConfig.put/3 on a vanished
# table would raise ArgumentError, which we rescue and skip.
defp list_stack_ids do
prefix = "#{inspect(Electric.StackConfig)}:"
prefix_len = byte_size(prefix)

:ets.all()
|> Enum.flat_map(fn tab ->
case :ets.info(tab, :name) do
:undefined ->
[]

name ->
name_str = Atom.to_string(name)

if String.starts_with?(name_str, prefix) do
[binary_part(name_str, prefix_len, byte_size(name_str) - prefix_len)]
else
[]
end
end
end)
end

def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do
GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle},
name: name(stack_id, shape_handle)
name: name(stack_id, shape_handle),
spawn_opt: Electric.StackConfig.spawn_opts(stack_id, :consumer)
)
end

Expand Down Expand Up @@ -478,7 +563,81 @@ defmodule Electric.Shapes.Consumer do

defp handle_event(%TransactionFragment{} = txn_fragment, state) do
Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end)
handle_txn_fragment(txn_fragment, state)

txn_fragment
|> handle_txn_fragment(state)
|> maybe_garbage_collect()
end

# NOTE: this runs synchronously on the replication critical path — the
# ShapeLogCollector blocks until every consumer replies. A forced
# :erlang.garbage_collect() can add measurable publish latency, especially
# during a buffered-fragment drain where maybe_garbage_collect/1 is called
# for every queued fragment in a tight loop.
#
# Two safeguards limit the impact:
# 1. The nil fast-path exits immediately when no threshold is configured.
# 2. Hysteresis (@gc_min_interval_ms) prevents back-to-back full sweeps even
# when the consumer sits just above the threshold across many fragments.
#
# Operators should prefer a CONSERVATIVE (high) threshold. For steady-state
# heap bounding the per-process `fullsweep_after` spawn-opt (set via
# ELECTRIC_PROCESS_SPAWN_OPTS) is a lower-risk alternative; this adaptive GC
# is a targeted, runtime-tunable backstop.
defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do
case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do
nil ->
# Fast path: adaptive GC is disabled — skip all process_info/time calls.
state

threshold ->
{:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size)
now = System.monotonic_time(:millisecond)

if should_force_gc?(heap_words, threshold, state.last_forced_gc_at, now) do
:erlang.garbage_collect()
%{state | last_forced_gc_at: now}
else
state
end
end
end

@doc false
# heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil)
@spec over_heap_threshold?(non_neg_integer(), non_neg_integer() | nil) :: boolean()
def over_heap_threshold?(_heap_words, nil), do: false

def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do
heap_words * @word_size > threshold_bytes
end

@doc false
# Decide whether to force a full GC sweep: heap must be over the threshold AND
# at least @gc_min_interval_ms must have elapsed since the last forced GC.
# last_gc_at / now_ms are monotonic milliseconds; last_gc_at is nil if this
# consumer has never forced a GC (always fire on first over-threshold event).
# Passing explicit min_interval_ms enables deterministic unit tests.
@spec should_force_gc?(
non_neg_integer(),
non_neg_integer() | nil,
integer() | nil,
integer(),
non_neg_integer()
) :: boolean()
def should_force_gc?(
heap_words,
threshold_bytes,
last_gc_at,
now_ms,
min_interval_ms \\ @gc_min_interval_ms
)

def should_force_gc?(_heap_words, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false

def should_force_gc?(heap_words, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do
over_heap_threshold?(heap_words, threshold_bytes) and
(is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms)
end

# A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list).
Expand Down Expand Up @@ -768,7 +927,10 @@ defmodule Electric.Shapes.Consumer do
{txn_fragments, state} = State.pop_buffered(state)

Enum.reduce_while(txn_fragments, state, fn txn_fragment, state ->
state = handle_txn_fragment(txn_fragment, state)
state =
txn_fragment
|> handle_txn_fragment(state)
|> maybe_garbage_collect()

if state.terminating? do
{:halt, state}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ defmodule Electric.Shapes.Consumer.Materializer do
do: subscribe(%{stack_id: stack_id, shape_handle: shape_handle})

def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: name(opts))
GenServer.start_link(__MODULE__, opts,
name: name(opts),
spawn_opt: Electric.StackConfig.spawn_opts(opts.stack_id, :consumer_materializer)
)
end

def init(opts) do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ defmodule Electric.Shapes.Consumer.Snapshotter do
end

def start_link(config) when is_map(config) do
GenServer.start_link(__MODULE__, config, name: name(config))
GenServer.start_link(__MODULE__, config,
name: name(config),
spawn_opt: Electric.StackConfig.spawn_opts(config.stack_id, :consumer_snapshotter)
)
end

def init(config) do
Expand Down
5 changes: 4 additions & 1 deletion packages/sync-service/lib/electric/shapes/consumer/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ defmodule Electric.Shapes.Consumer.State do
# When a {Storage, :flushed, offset} message arrives during a pending
# transaction, we defer the notification and store the max flushed offset
# here. Multiple deferred notifications are collapsed into a single most recent offset.
pending_flush_offset: nil
pending_flush_offset: nil,
# Monotonic millisecond timestamp of the last consumer-forced GC (nil if never).
# Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency.
last_forced_gc_at: nil
]

@type pg_snapshot() :: SnapshotQuery.pg_snapshot()
Expand Down
3 changes: 2 additions & 1 deletion packages/sync-service/lib/electric/stack_config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ defmodule Electric.StackConfig do
shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?),
chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(),
feature_flags: [],
process_spawn_opts: %{}
process_spawn_opts: %{},
consumer_gc_heap_threshold: Electric.Config.default(:consumer_gc_heap_threshold)
]
end

Expand Down
6 changes: 6 additions & 0 deletions packages/sync-service/lib/electric/stack_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ defmodule Electric.StackSupervisor do
default: nil
],
process_spawn_opts: [type: :map, default: %{}],
consumer_gc_heap_threshold: [
type: {:or, [:non_neg_integer, nil]},
default: Electric.Config.default(:consumer_gc_heap_threshold)
],
consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil]
]
],
Expand Down Expand Up @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do
shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after)
shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?)
process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts)
consumer_gc_heap_threshold = Keyword.fetch!(config.tweaks, :consumer_gc_heap_threshold)

shape_cache_opts = [
stack_id: stack_id
Expand Down Expand Up @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do
shape_hibernate_after: shape_hibernate_after,
shape_enable_suspend?: shape_enable_suspend?,
process_spawn_opts: process_spawn_opts,
consumer_gc_heap_threshold: consumer_gc_heap_threshold,
feature_flags: Map.get(config, :feature_flags, [])
]},
{Electric.AsyncDeleter,
Expand Down
39 changes: 39 additions & 0 deletions packages/sync-service/test/electric/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,43 @@ defmodule Electric.ConfigTest do
end
end
end

describe "tweaks propagation" do
setup do
initial_config = Application.get_all_env(:electric)

for {key, _} <- initial_config do
Application.delete_env(:electric, key)
end

on_exit(fn ->
Application.put_all_env([{:electric, initial_config}])
end)

[initial_config: initial_config]
end

test "consumer_gc_heap_threshold opt is threaded into tweaks", ctx do
threshold = 209_715_200

config =
Electric.Application.configuration(
Keyword.merge(
Keyword.take(ctx.initial_config, [:replication_connection_opts]),
consumer_gc_heap_threshold: threshold
)
)

assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == threshold
end

test "consumer_gc_heap_threshold defaults to nil in tweaks", ctx do
config =
Electric.Application.configuration(
Keyword.take(ctx.initial_config, [:replication_connection_opts])
)

assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == nil
end
end
end
Loading
Loading