diff --git a/.changeset/consumer-heap-gc.md b/.changeset/consumer-heap-gc.md new file mode 100644 index 0000000000..b95e1a28bc --- /dev/null +++ b/.changeset/consumer-heap-gc.md @@ -0,0 +1,5 @@ +--- +"@core/sync-service": patch +--- + +Bound Shape.Consumer heap growth: make the consumer family's process spawn options (incl. `fullsweep_after`) configurable per process via `ELECTRIC_PROCESS_SPAWN_OPTS`, and add an opt-in adaptive GC that runs after a transaction fragment when the consumer's heap exceeds the runtime-tunable `ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD` (off by default). diff --git a/packages/sync-service/config/runtime.exs b/packages/sync-service/config/runtime.exs index 6dbbefcb59..c8648effe2 100644 --- a/packages/sync-service/config/runtime.exs +++ b/packages/sync-service/config/runtime.exs @@ -287,6 +287,7 @@ config :electric, process_registry_partitions: env!("ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS", :integer, nil), process_spawn_opts: env!("ELECTRIC_PROCESS_SPAWN_OPTS", &Electric.Config.parse_spawn_opts!/1, %{}), + consumer_gc_heap_threshold: env!("ELECTRIC_CONSUMER_GC_HEAP_THRESHOLD", :integer, nil), http_api_num_acceptors: env!("ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS", :integer, 100), conn_max_requests: env!("ELECTRIC_TWEAKS_CONN_MAX_REQUESTS", :integer, nil), handler_fullsweep_after: env!("ELECTRIC_TWEAKS_HANDLER_FULLSWEEP_AFTER", :integer, nil), diff --git a/packages/sync-service/lib/electric/application.ex b/packages/sync-service/lib/electric/application.ex index c46b55c7b9..4a18cd1b8f 100644 --- a/packages/sync-service/lib/electric/application.ex +++ b/packages/sync-service/lib/electric/application.ex @@ -155,7 +155,8 @@ defmodule Electric.Application do shape_enable_suspend?: get_env(opts, :shape_enable_suspend?), conn_max_requests: get_env(opts, :conn_max_requests), handler_fullsweep_after: get_env(opts, :handler_fullsweep_after), - process_spawn_opts: get_env(opts, :process_spawn_opts) + process_spawn_opts: get_env(opts, :process_spawn_opts), + consumer_gc_heap_threshold: get_env(opts, :consumer_gc_heap_threshold) ], manual_table_publishing?: get_env(opts, :manual_table_publishing?), shape_db_opts: [ diff --git a/packages/sync-service/lib/electric/config.ex b/packages/sync-service/lib/electric/config.ex index c33ac3c271..6eb2c322f3 100644 --- a/packages/sync-service/lib/electric/config.ex +++ b/packages/sync-service/lib/electric/config.ex @@ -109,6 +109,10 @@ defmodule Electric.Config do # # e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]} process_spawn_opts: %{}, + # Heap-size threshold (in BYTES) above which a consumer runs :erlang.garbage_collect() + # after processing a transaction fragment. nil disables adaptive GC. Looked up at + # runtime via StackConfig so it can be changed from a live IEx shell. + consumer_gc_heap_threshold: nil, ## Misc process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0, feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []), diff --git a/packages/sync-service/lib/electric/shapes/consumer.ex b/packages/sync-service/lib/electric/shapes/consumer.ex index 3ab114d0f5..e13d8a57b1 100644 --- a/packages/sync-service/lib/electric/shapes/consumer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer.ex @@ -38,6 +38,15 @@ defmodule Electric.Shapes.Consumer do @default_snapshot_timeout 45_000 @stop_and_clean_timeout 30_000 @stop_and_clean_reason ShapeCleaner.consumer_cleanup_reason() + @word_size :erlang.system_info(:wordsize) + + # Minimum wall-clock interval (ms) between consumer-forced full GC sweeps. + # Prevents GC-thrashing on the replication critical path: the ShapeLogCollector + # blocks until every consumer replies, so a forced GC on every fragment (e.g. + # during a buffered-fragment drain) would add publish latency proportional to + # the number of fragments. Hysteresis caps the worst-case frequency to at most + # one forced sweep per @gc_min_interval_ms regardless of fragment rate. + @gc_min_interval_ms 1_000 @type initialize_shape_opts() :: %{ :action => :create | :restore, @@ -110,9 +119,85 @@ defmodule Electric.Shapes.Consumer do ConsumerRegistry.whereis(stack_id, shape_handle) end + @doc """ + Set the adaptive-GC heap threshold (bytes, or nil to disable) for a single stack. + Takes effect immediately for that stack's consumers — safe to call from IEx. + + **Critical-path note**: this GC runs synchronously on the replication path — + the ShapeLogCollector blocks until every consumer replies. Prefer a conservative + (high) threshold to minimise added publish latency. The per-process + `fullsweep_after` spawn-opt (configured via `ELECTRIC_PROCESS_SPAWN_OPTS`) is + the lower-risk lever for steady-state heap bounding; this adaptive GC is a + targeted, runtime-tunable backstop. Forced-GC frequency is capped to at most + once per `@gc_min_interval_ms` (see `should_force_gc?/5`). + """ + @spec set_gc_heap_threshold(Electric.stack_id(), non_neg_integer() | nil) :: :ok + def set_gc_heap_threshold(stack_id, threshold_bytes) + when is_nil(threshold_bytes) or (is_integer(threshold_bytes) and threshold_bytes >= 0) do + Electric.StackConfig.put(stack_id, :consumer_gc_heap_threshold, threshold_bytes) + :ok + end + + @doc """ + Set the adaptive-GC heap threshold for every live stack on this node. + Returns `{:ok, number_of_stacks_updated}`. Pass nil to disable everywhere. + Intended for live experimentation from an IEx shell. + """ + @spec set_gc_heap_threshold_all_stacks(non_neg_integer() | nil) :: {:ok, non_neg_integer()} + def set_gc_heap_threshold_all_stacks(threshold_bytes) do + stack_ids = list_stack_ids() + + # Guard against a stack dying between enumeration and the put: if the + # StackConfig ETS table vanishes, StackConfig.put/3 raises ArgumentError. + # We skip such stale entries rather than crashing the operator call. + # Only successfully-written stacks are counted so the returned value + # reflects reality even when stacks die mid-iteration. + count = + Enum.reduce(stack_ids, 0, fn stack_id, acc -> + try do + set_gc_heap_threshold(stack_id, threshold_bytes) + acc + 1 + rescue + ArgumentError -> acc + end + end) + + {:ok, count} + end + + # Enumerate live stacks by scanning ETS tables whose names match the + # Electric.StackConfig table-name prefix ("Electric.StackConfig:"). + # This is the most direct approach: StackConfig creates one named ETS table per + # stack, so the set of live tables IS the set of live stacks. + # No first-class listing API exists in the codebase (grep confirmed). + # A race (stack dies mid-iteration) is harmless: StackConfig.put/3 on a vanished + # table would raise ArgumentError, which we rescue and skip. + defp list_stack_ids do + prefix = "#{inspect(Electric.StackConfig)}:" + prefix_len = byte_size(prefix) + + :ets.all() + |> Enum.flat_map(fn tab -> + case :ets.info(tab, :name) do + :undefined -> + [] + + name -> + name_str = Atom.to_string(name) + + if String.starts_with?(name_str, prefix) do + [binary_part(name_str, prefix_len, byte_size(name_str) - prefix_len)] + else + [] + end + end + end) + end + def start_link(%{stack_id: stack_id, shape_handle: shape_handle} = _config) do GenServer.start_link(__MODULE__, %{stack_id: stack_id, shape_handle: shape_handle}, - name: name(stack_id, shape_handle) + name: name(stack_id, shape_handle), + spawn_opt: Electric.StackConfig.spawn_opts(stack_id, :consumer) ) end @@ -478,7 +563,81 @@ defmodule Electric.Shapes.Consumer do defp handle_event(%TransactionFragment{} = txn_fragment, state) do Logger.debug(fn -> "Txn fragment received in Shapes.Consumer: #{inspect(txn_fragment)}" end) - handle_txn_fragment(txn_fragment, state) + + txn_fragment + |> handle_txn_fragment(state) + |> maybe_garbage_collect() + end + + # NOTE: this runs synchronously on the replication critical path — the + # ShapeLogCollector blocks until every consumer replies. A forced + # :erlang.garbage_collect() can add measurable publish latency, especially + # during a buffered-fragment drain where maybe_garbage_collect/1 is called + # for every queued fragment in a tight loop. + # + # Two safeguards limit the impact: + # 1. The nil fast-path exits immediately when no threshold is configured. + # 2. Hysteresis (@gc_min_interval_ms) prevents back-to-back full sweeps even + # when the consumer sits just above the threshold across many fragments. + # + # Operators should prefer a CONSERVATIVE (high) threshold. For steady-state + # heap bounding the per-process `fullsweep_after` spawn-opt (set via + # ELECTRIC_PROCESS_SPAWN_OPTS) is a lower-risk alternative; this adaptive GC + # is a targeted, runtime-tunable backstop. + defp maybe_garbage_collect(%State{stack_id: stack_id} = state) do + case Electric.StackConfig.lookup(stack_id, :consumer_gc_heap_threshold, nil) do + nil -> + # Fast path: adaptive GC is disabled — skip all process_info/time calls. + state + + threshold -> + {:total_heap_size, heap_words} = :erlang.process_info(self(), :total_heap_size) + now = System.monotonic_time(:millisecond) + + if should_force_gc?(heap_words, threshold, state.last_forced_gc_at, now) do + :erlang.garbage_collect() + %{state | last_forced_gc_at: now} + else + state + end + end + end + + @doc false + # heap_words: process total_heap_size in words; threshold_bytes: configured byte threshold (or nil) + @spec over_heap_threshold?(non_neg_integer(), non_neg_integer() | nil) :: boolean() + def over_heap_threshold?(_heap_words, nil), do: false + + def over_heap_threshold?(heap_words, threshold_bytes) when is_integer(threshold_bytes) do + heap_words * @word_size > threshold_bytes + end + + @doc false + # Decide whether to force a full GC sweep: heap must be over the threshold AND + # at least @gc_min_interval_ms must have elapsed since the last forced GC. + # last_gc_at / now_ms are monotonic milliseconds; last_gc_at is nil if this + # consumer has never forced a GC (always fire on first over-threshold event). + # Passing explicit min_interval_ms enables deterministic unit tests. + @spec should_force_gc?( + non_neg_integer(), + non_neg_integer() | nil, + integer() | nil, + integer(), + non_neg_integer() + ) :: boolean() + def should_force_gc?( + heap_words, + threshold_bytes, + last_gc_at, + now_ms, + min_interval_ms \\ @gc_min_interval_ms + ) + + def should_force_gc?(_heap_words, nil, _last_gc_at, _now_ms, _min_interval_ms), do: false + + def should_force_gc?(heap_words, threshold_bytes, last_gc_at, now_ms, min_interval_ms) do + over_heap_threshold?(heap_words, threshold_bytes) and + (is_nil(last_gc_at) or now_ms - last_gc_at >= min_interval_ms) end # A consumer process starts with buffering?=true before it has PG snapshot info (xmin, xmax, xip_list). @@ -768,7 +927,10 @@ defmodule Electric.Shapes.Consumer do {txn_fragments, state} = State.pop_buffered(state) Enum.reduce_while(txn_fragments, state, fn txn_fragment, state -> - state = handle_txn_fragment(txn_fragment, state) + state = + txn_fragment + |> handle_txn_fragment(state) + |> maybe_garbage_collect() if state.terminating? do {:halt, state} diff --git a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex index c98dc4c1f0..08c6c97bf5 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/materializer.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/materializer.ex @@ -111,7 +111,10 @@ defmodule Electric.Shapes.Consumer.Materializer do do: subscribe(%{stack_id: stack_id, shape_handle: shape_handle}) def start_link(opts) do - GenServer.start_link(__MODULE__, opts, name: name(opts)) + GenServer.start_link(__MODULE__, opts, + name: name(opts), + spawn_opt: Electric.StackConfig.spawn_opts(opts.stack_id, :consumer_materializer) + ) end def init(opts) do diff --git a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex index 8ff3aa14be..2a64f916f9 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/snapshotter.ex @@ -20,7 +20,10 @@ defmodule Electric.Shapes.Consumer.Snapshotter do end def start_link(config) when is_map(config) do - GenServer.start_link(__MODULE__, config, name: name(config)) + GenServer.start_link(__MODULE__, config, + name: name(config), + spawn_opt: Electric.StackConfig.spawn_opts(config.stack_id, :consumer_snapshotter) + ) end def init(config) do diff --git a/packages/sync-service/lib/electric/shapes/consumer/state.ex b/packages/sync-service/lib/electric/shapes/consumer/state.ex index 9414f07571..1f8e224fde 100644 --- a/packages/sync-service/lib/electric/shapes/consumer/state.ex +++ b/packages/sync-service/lib/electric/shapes/consumer/state.ex @@ -41,7 +41,10 @@ defmodule Electric.Shapes.Consumer.State do # When a {Storage, :flushed, offset} message arrives during a pending # transaction, we defer the notification and store the max flushed offset # here. Multiple deferred notifications are collapsed into a single most recent offset. - pending_flush_offset: nil + pending_flush_offset: nil, + # Monotonic millisecond timestamp of the last consumer-forced GC (nil if never). + # Used by hysteresis logic in maybe_garbage_collect/1 to cap forced-GC frequency. + last_forced_gc_at: nil ] @type pg_snapshot() :: SnapshotQuery.pg_snapshot() diff --git a/packages/sync-service/lib/electric/stack_config.ex b/packages/sync-service/lib/electric/stack_config.ex index cc614cd055..27f8378309 100644 --- a/packages/sync-service/lib/electric/stack_config.ex +++ b/packages/sync-service/lib/electric/stack_config.ex @@ -32,7 +32,8 @@ defmodule Electric.StackConfig do shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?), chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(), feature_flags: [], - process_spawn_opts: %{} + process_spawn_opts: %{}, + consumer_gc_heap_threshold: Electric.Config.default(:consumer_gc_heap_threshold) ] end diff --git a/packages/sync-service/lib/electric/stack_supervisor.ex b/packages/sync-service/lib/electric/stack_supervisor.ex index 23912c21be..fd30eeb485 100644 --- a/packages/sync-service/lib/electric/stack_supervisor.ex +++ b/packages/sync-service/lib/electric/stack_supervisor.ex @@ -150,6 +150,10 @@ defmodule Electric.StackSupervisor do default: nil ], process_spawn_opts: [type: :map, default: %{}], + consumer_gc_heap_threshold: [ + type: {:or, [:non_neg_integer, nil]}, + default: Electric.Config.default(:consumer_gc_heap_threshold) + ], consumer_partitions: [type: {:or, [:pos_integer, nil]}, default: nil] ] ], @@ -352,6 +356,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after) shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?) process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts) + consumer_gc_heap_threshold = Keyword.fetch!(config.tweaks, :consumer_gc_heap_threshold) shape_cache_opts = [ stack_id: stack_id @@ -401,6 +406,7 @@ defmodule Electric.StackSupervisor do shape_hibernate_after: shape_hibernate_after, shape_enable_suspend?: shape_enable_suspend?, process_spawn_opts: process_spawn_opts, + consumer_gc_heap_threshold: consumer_gc_heap_threshold, feature_flags: Map.get(config, :feature_flags, []) ]}, {Electric.AsyncDeleter, diff --git a/packages/sync-service/test/electric/config_test.exs b/packages/sync-service/test/electric/config_test.exs index 366c4352ec..f9b647b7a9 100644 --- a/packages/sync-service/test/electric/config_test.exs +++ b/packages/sync-service/test/electric/config_test.exs @@ -120,4 +120,43 @@ defmodule Electric.ConfigTest do end end end + + describe "tweaks propagation" do + setup do + initial_config = Application.get_all_env(:electric) + + for {key, _} <- initial_config do + Application.delete_env(:electric, key) + end + + on_exit(fn -> + Application.put_all_env([{:electric, initial_config}]) + end) + + [initial_config: initial_config] + end + + test "consumer_gc_heap_threshold opt is threaded into tweaks", ctx do + threshold = 209_715_200 + + config = + Electric.Application.configuration( + Keyword.merge( + Keyword.take(ctx.initial_config, [:replication_connection_opts]), + consumer_gc_heap_threshold: threshold + ) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == threshold + end + + test "consumer_gc_heap_threshold defaults to nil in tweaks", ctx do + config = + Electric.Application.configuration( + Keyword.take(ctx.initial_config, [:replication_connection_opts]) + ) + + assert Keyword.fetch!(config[:tweaks], :consumer_gc_heap_threshold) == nil + end + end end diff --git a/packages/sync-service/test/electric/shapes/consumer_test.exs b/packages/sync-service/test/electric/shapes/consumer_test.exs index 5b2d2cce2d..36aac47c93 100644 --- a/packages/sync-service/test/electric/shapes/consumer_test.exs +++ b/packages/sync-service/test/electric/shapes/consumer_test.exs @@ -2509,6 +2509,382 @@ defmodule Electric.Shapes.ConsumerTest do ) end + describe "process gc configuration" do + setup [ + :with_registry, + :with_in_memory_storage, + :with_shape_status, + :with_lsn_tracker, + :with_persistent_kv, + :with_status_monitor, + :with_dynamic_consumer_supervisor, + :with_noop_publication_manager, + :with_shape_cleaner + ] + + setup ctx do + start_link_supervised!({ + ShapeLogCollector.Supervisor, + stack_id: ctx.stack_id, persistent_kv: ctx.persistent_kv, inspector: @base_inspector + }) + + ShapeLogCollector.mark_as_ready(ctx.stack_id) + [shape_position: @shape_position] + end + + @tag process_spawn_opts: %{consumer: [fullsweep_after: 4, priority: :high]} + test "spawn_opts are correctly passed to consumer process", ctx do + support_test_storage_wrap(ctx, @shape_handle1, @shape1) + + {:ok, consumer} = + start_supervised( + {Consumer, + %{ + shape_handle: @shape_handle1, + stack_id: ctx.stack_id + }}, + id: {Consumer, @shape_handle1} + ) + + Consumer.initialize_shape(consumer, @shape1, %{action: :create}) + assert_receive {Support.TestStorage, :init_writer!, @shape_handle1, @shape1} + :started = Consumer.await_snapshot_start(ctx.stack_id, @shape_handle1) + + info = Process.info(consumer) + + assert info[:priority] == :high + assert info[:garbage_collection][:fullsweep_after] == 4 + end + end + + defp support_test_storage_wrap(ctx, shape_handle, shape) do + %{snapshot_xmin: xmin} = shape_status(shape_handle, ctx) + shapes = %{shape_handle => shape} + + storage = + Support.TestStorage.wrap(ctx.storage, %{ + shape_handle => [ + {:mark_snapshot_as_started, []}, + {:set_pg_snapshot, [%{xmin: xmin, xmax: xmin + 1, xip_list: [xmin]}]} + ] + }) + + Electric.StackConfig.put(ctx.stack_id, Electric.ShapeCache.Storage, storage) + Electric.StackConfig.put(ctx.stack_id, :inspector, @base_inspector) + + patch_shape_status(fetch_shape_by_handle: fn _, sh -> Map.fetch(shapes, sh) end) + + Support.TestUtils.activate_mocks_for_descendant_procs(Consumer) + Support.TestUtils.activate_mocks_for_descendant_procs(Electric.ShapeCache.ShapeCleaner) + :ok + end + + describe "over_heap_threshold?/2" do + test "false when threshold is nil" do + refute Electric.Shapes.Consumer.over_heap_threshold?(1_000_000, nil) + end + + test "false when heap (words) is below threshold (bytes)" do + refute Electric.Shapes.Consumer.over_heap_threshold?(10, 1024) + end + + test "true when heap (words) exceeds threshold (bytes)" do + assert Electric.Shapes.Consumer.over_heap_threshold?(1000, 1) + end + + test "exactly-equal is not over threshold" do + wordsize = :erlang.system_info(:wordsize) + refute Electric.Shapes.Consumer.over_heap_threshold?(1, wordsize) + end + end + + describe "should_force_gc?/5" do + # All tests pass explicit now_ms / last_gc_at / min_interval_ms so they are + # fully deterministic and do not depend on wall-clock time. + + test "false when threshold is nil (adaptive GC disabled)" do + refute Electric.Shapes.Consumer.should_force_gc?(1_000_000, nil, nil, 5_000, 1_000) + end + + test "true when heap over threshold and consumer has never forced a GC (last_gc_at nil)" do + # 1 000 words * 8 bytes/word = 8 000 bytes > threshold of 1 byte + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, nil, 5_000, 1_000) + end + + test "false when heap over threshold but interval has not elapsed" do + # last_gc_at=4_500, now=5_000 → delta=500 < min_interval=1_000 → no GC + refute Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_500, 5_000, 1_000) + end + + test "true when heap over threshold and interval has elapsed" do + # last_gc_at=3_000, now=5_000 → delta=2_000 >= min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 3_000, 5_000, 1_000) + end + + test "true at exactly the min interval boundary" do + # last_gc_at=4_000, now=5_000 → delta=1_000 == min_interval=1_000 → GC + assert Electric.Shapes.Consumer.should_force_gc?(1_000, 1, 4_000, 5_000, 1_000) + end + + test "false when heap is under threshold regardless of timing" do + # heap_words=1 * wordsize (8) = 8 bytes; threshold=1_000 bytes → under + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, nil, 5_000, 1_000) + end + + test "false when heap is under threshold even if interval would have elapsed" do + refute Electric.Shapes.Consumer.should_force_gc?(1, 1_000, 0, 5_000, 1_000) + end + end + + describe "adaptive GC after fragment processing" do + @describetag :tmp_dir + + setup do + %{inspector: @base_inspector, pool: nil} + end + + setup [ + :with_registry, + :with_pure_file_storage, + :with_shape_status, + :with_lsn_tracker, + :with_log_chunking, + :with_persistent_kv, + :with_async_deleter, + :with_shape_cleaner, + :with_shape_log_collector, + :with_noop_publication_manager, + :with_status_monitor + ] + + setup(ctx) do + delay_snapshot_creation? = Map.get(ctx, :delay_snapshot_creation?) + test_pid = self() + + patch_snapshotter(fn parent, shape_handle, _shape, %{snapshot_fun: snapshot_fun} -> + if delay_snapshot_creation? do + receive do + {^test_pid, :resume} -> :ok + end + end + + pg_snapshot = {10, 11, [10]} + GenServer.cast(parent, {:pg_snapshot_known, shape_handle, pg_snapshot}) + GenServer.cast(parent, {:snapshot_started, shape_handle}) + snapshot_fun.([]) + end) + + Electric.StackConfig.put(ctx.stack_id, :shape_hibernate_after, 10_000) + :ok + end + + setup ctx do + %{consumer_supervisor: consumer_supervisor, shape_cache: shape_cache} = + Support.ComponentSetup.with_shape_cache(ctx) + + %{ + consumer_supervisor: consumer_supervisor, + shape_cache: shape_cache + } + end + + test "GC runs when heap exceeds tiny threshold", ctx do + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + # Inflate the consumer's heap by sending a large binary payload + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Wait for the consumer to process the fragment + assert_receive {^ref, :new_changes, _}, @receive_timeout + + {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) + + # threshold=1 forces a full GC after this fragment. Because the ~200 KB payload is + # transient garbage (not live state), the post-GC heap must be far below the payload + # size. Without GC the heap grows ~185x (observed), so this assertion fails loudly. + payload_words = div(200_000, :erlang.system_info(:wordsize)) + + assert heap_after < payload_words, + "heap_after (#{heap_after} words) should be far below payload (#{payload_words} words) after GC" + end + + test "GC does not run when threshold is very large", ctx do + # 1 GB threshold — the consumer heap will never reach this, so GC must NOT fire. + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1_000_000_000) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + # Inflate the consumer's heap by sending a large binary payload + large_binary = :binary.copy(<<0>>, 200_000) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Wait for the consumer to process the fragment + assert_receive {^ref, :new_changes, _}, @receive_timeout + + {:total_heap_size, heap_after} = :erlang.process_info(consumer_pid, :total_heap_size) + + # GC was NOT triggered (threshold too high), so the heap still reflects + # the retained payload — it must be >= payload_words. + payload_words = div(200_000, :erlang.system_info(:wordsize)) + + assert heap_after >= payload_words, + "heap_after (#{heap_after} words) should be >= payload (#{payload_words} words) when GC is skipped" + end + + test "no GC by default (threshold=nil)", ctx do + # Ensure no threshold is set (default behaviour) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1"}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + # Should process without error even when no GC threshold is configured + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + assert_receive {^ref, :new_changes, _}, @receive_timeout + end + + @tag delay_snapshot_creation?: true + test "GC runs during buffered-fragment drain when heap exceeds threshold", ctx do + # threshold=1 forces GC after every fragment processed during the buffer drain. + # The consumer starts with buffering?=true; fragments sent before pg_snapshot_known + # land in the buffer. When we unblock the snapshotter it fires pg_snapshot_known + # which triggers :consume_buffer → process_buffered_txn_fragments (our new GC call). + Electric.StackConfig.put(ctx.stack_id, :consumer_gc_heap_threshold, 1) + + {shape_handle, _} = ShapeCache.get_or_create_shape_handle(@shape1, ctx.stack_id) + + # The snapshotter is now running but blocked on `receive {^test_pid, :resume}`. + assert_receive {:snapshot, ^shape_handle, snapshotter_pid} + + consumer_pid = Consumer.whereis(ctx.stack_id, shape_handle) + + # Trace GC events on the consumer to count full GCs fired during the drain. + :erlang.trace(consumer_pid, true, [:garbage_collection]) + + # Send a large-payload fragment while buffering?=true — it goes into the buffer. + large_binary = :binary.copy(<<0>>, 200_000) + xid = 11 + lsn = Lsn.from_integer(10) + + txn = + complete_txn_fragment(xid, lsn, [ + %Changes.NewRecord{ + relation: {"public", "test_table"}, + record: %{"id" => "1", "value" => large_binary}, + log_offset: LogOffset.new(lsn, 0) + } + ]) + + assert :ok = ShapeLogCollector.handle_event(txn, ctx.stack_id) + + # Unblock the snapshotter: fires pg_snapshot_known → :consume_buffer → + # process_buffered_txn_fragments where our new maybe_garbage_collect() call fires. + send(snapshotter_pid, {self(), :resume}) + + ref = Shapes.Consumer.register_for_changes(ctx.stack_id, shape_handle) + assert_receive {^ref, :new_changes, _}, @receive_timeout + + :erlang.trace(consumer_pid, false, [:garbage_collection]) + + # Count how many full GC (garbage_collect) trace messages arrived. + # :garbage_collection traces emit {:trace, pid, :gc_major_start, info} / + # :gc_major_end pairs (one per full :erlang.garbage_collect() call). + gc_events = + Stream.repeatedly(fn -> + receive do + {:trace, ^consumer_pid, :gc_major_start, _} -> :gc + {:trace, ^consumer_pid, :gc_minor_start, _} -> :minor + {:trace, ^consumer_pid, :gc_major_end, _} -> :skip + {:trace, ^consumer_pid, :gc_minor_end, _} -> :skip + after + 0 -> :done + end + end) + |> Stream.take_while(&(&1 != :done)) + |> Enum.count(&(&1 == :gc)) + + assert gc_events >= 1, + "expected at least one full GC during buffered-fragment drain, got #{gc_events}" + end + end + + describe "set_gc_heap_threshold helpers" do + # with_stack_id_from_test (line 87) already starts ProcessRegistry + StackConfig + # for ctx.stack_id — no heavier setup is needed for these pure-config tests. + + test "set_gc_heap_threshold/2 writes the value into StackConfig", ctx do + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 2_000_000) + + assert 2_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold/2 accepts nil to disable", ctx do + Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, 123) + assert :ok = Electric.Shapes.Consumer.set_gc_heap_threshold(ctx.stack_id, nil) + assert nil == Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + + test "set_gc_heap_threshold_all_stacks/1 sets the live stack", ctx do + assert {:ok, n} = Electric.Shapes.Consumer.set_gc_heap_threshold_all_stacks(3_000_000) + assert n >= 1 + + assert 3_000_000 == + Electric.StackConfig.lookup(ctx.stack_id, :consumer_gc_heap_threshold, nil) + end + end + defp get_log_items_from_storage(offset, shape_storage) do Storage.get_log_stream(offset, shape_storage) |> Enum.map(&Jason.decode!/1) end