diff --git a/CHANGELOG.md b/CHANGELOG.md index ca168d1e..b528bed4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Support for Distributed Tracing ([957](https://github.com/getsentry/sentry-elixir/pull/957)) - Support for LiveView spans captured under single trace root ([#977](https://github.com/getsentry/sentry-elixir/pull/977)) - Handle HTTP 413 responses for oversized envelopes ([#982](https://github.com/getsentry/sentry-elixir/pull/982)) +- Introduced `TelemetryProcessor` according to [the spec](https://develop.sentry.dev/sdk/telemetry/telemetry-processor/) [#987](https://github.com/getsentry/sentry-elixir/pull/987)) ### Bug Fixes diff --git a/lib/sentry.ex b/lib/sentry.ex index 2e2d0730..a4202000 100644 --- a/lib/sentry.ex +++ b/lib/sentry.ex @@ -40,7 +40,7 @@ defmodule Sentry do > was the `:included_environments` option (a list of environments to report events for). > This was used together with the `:environment_name` option to determine whether to > send events. `:included_environments` is deprecated in v10.0.0 in favor of setting - > or not setting `:dsn`. It will be removed in v11.0.0. + > or not setting `:dsn`. It will be removed in v12.0.0. You can even rely on more specific logic to determine the environment name. It's not uncommon for most applications to have a "staging" environment. In order @@ -183,7 +183,17 @@ defmodule Sentry do > with `:source_code_exclude_patterns`. """ - alias Sentry.{CheckIn, Client, ClientError, ClientReport, Config, Event, LoggerUtils, Options} + alias Sentry.{ + CheckIn, + Client, + ClientError, + ClientReport, + Config, + Event, + LoggerUtils, + Options, + TelemetryProcessor + } require Logger @@ -350,7 +360,7 @@ defmodule Sentry do """ @spec send_event(Event.t(), keyword()) :: send_result def send_event(event, options \\ []) do - # TODO: remove on v11.0.0, :included_environments was deprecated in 10.0.0. + # TODO: remove on v12.0.0, :included_environments was deprecated in 10.0.0. included_envs = Config.included_environments() cond do @@ -378,7 +388,7 @@ defmodule Sentry do end def send_transaction(transaction, options \\ []) do - # TODO: remove on v11.0.0, :included_environments was deprecated in 10.0.0. + # TODO: remove on v12.0.0, :included_environments was deprecated in 10.0.0. included_envs = Config.included_environments() cond do @@ -502,4 +512,42 @@ defmodule Sentry do nil -> nil end end + + @doc """ + Flushes all pending events to Sentry. + + This is a blocking call that drains all the buffers and waits for the scheduler + to process all pending items. Useful before application shutdown to ensure + all telemetry events are sent. + + ## Options + + * `:timeout` - Maximum time to wait for flush to complete (default: 5000ms) + + ## Examples + + # Flush with default timeout + Sentry.flush() + + # Flush with custom timeout + Sentry.flush(timeout: 10_000) + + """ + @doc since: "12.0.0" + @spec flush(keyword()) :: :ok + def flush(opts \\ []) do + timeout = Keyword.get(opts, :timeout, 5000) + + try do + TelemetryProcessor.flush(TelemetryProcessor.default_name(), timeout) + catch + :exit, {:noproc, _} -> + Logger.warning("Sentry.flush/1 failed: TelemetryProcessor not running") + :ok + + :exit, reason -> + Logger.warning("Sentry.flush/1 failed unexpectedly: #{inspect(reason)}") + :ok + end + end end diff --git a/lib/sentry/application.ex b/lib/sentry/application.ex index 2e0d122c..70c47418 100644 --- a/lib/sentry/application.ex +++ b/lib/sentry/application.ex @@ -34,11 +34,15 @@ defmodule Sentry.Application do [] end - maybe_log_event_buffer = + maybe_telemetry_processor = if Config.enable_logs?() do [ - {Task.Supervisor, name: Sentry.LogEventBuffer.TaskSupervisor}, - Sentry.LogEventBuffer + {Sentry.TelemetryProcessor, + [ + buffer_capacities: Config.telemetry_buffer_capacities(), + scheduler_weights: Config.telemetry_scheduler_weights(), + transport_capacity: Config.transport_capacity() + ]} ] else [] @@ -58,7 +62,7 @@ defmodule Sentry.Application do ] ++ maybe_http_client_spec ++ maybe_span_storage ++ - maybe_log_event_buffer ++ + maybe_telemetry_processor ++ maybe_rate_limiter() ++ [Sentry.Transport.SenderPool] diff --git a/lib/sentry/client.ex b/lib/sentry/client.ex index 80b5f8a7..98b06191 100644 --- a/lib/sentry/client.ex +++ b/lib/sentry/client.ex @@ -14,7 +14,6 @@ defmodule Sentry.Client do Envelope, Event, Interfaces, - LogEvent, LoggerUtils, Options, Transaction, @@ -135,69 +134,6 @@ defmodule Sentry.Client do end end - @doc """ - Sends a batch of log events to Sentry. - - Log events are sent asynchronously and do not support sampling. - They are buffered and sent in batches according to the Sentry Logs Protocol. - - If a `:before_send_log` callback is configured, it will be called for each log event. - If the callback returns `nil` or `false`, the log event is not sent. If it returns an - updated `Sentry.LogEvent`, that will be used instead. - - Returns `{:ok, envelope_id}` on success or `{:error, reason}` on failure. - """ - @doc since: "12.0.0" - @spec send_log_batch([LogEvent.t()]) :: - {:ok, envelope_id :: String.t()} | {:error, ClientError.t()} - def send_log_batch([]), do: {:ok, ""} - - def send_log_batch(log_events) when is_list(log_events) do - case Sentry.Test.maybe_collect_logs(log_events) do - :collected -> - {:ok, ""} - - :not_collecting -> - log_events - |> run_before_send_log_callbacks() - |> send_log_events() - end - end - - defp run_before_send_log_callbacks(log_events) do - callback = Config.before_send_log() - - if callback do - for log_event <- log_events, - %LogEvent{} = modified_event <- [call_before_send_log(log_event, callback)] do - modified_event - end - else - log_events - end - end - - defp call_before_send_log(log_event, function) when is_function(function, 1) do - function.(log_event) - end - - defp call_before_send_log(log_event, {mod, fun}) do - apply(mod, fun, [log_event]) - end - - defp send_log_events([]), do: {:ok, ""} - - defp send_log_events(log_events) do - client = Config.client() - - request_retries = - Application.get_env(:sentry, :request_retries, Transport.default_retries()) - - log_events - |> Envelope.from_log_events() - |> Transport.encode_and_post_envelope(client, request_retries) - end - defp sample_event(sample_rate) do cond do sample_rate == 1 -> :ok diff --git a/lib/sentry/client_report/sender.ex b/lib/sentry/client_report/sender.ex index 6bbc1d28..d49c6f2e 100644 --- a/lib/sentry/client_report/sender.ex +++ b/lib/sentry/client_report/sender.ex @@ -31,6 +31,7 @@ defmodule Sentry.ClientReport.Sender do | Sentry.CheckIn.t() | ClientReport.t() | Sentry.Event.t() + | Sentry.Transaction.t() def record_discarded_events(reason, event_items, genserver) when is_list(event_items) do # We silently ignore events whose reasons aren't valid because we have to add it to the allowlist in Snuba diff --git a/lib/sentry/config.ex b/lib/sentry/config.ex index 1357ba33..c1a1e779 100644 --- a/lib/sentry/config.ex +++ b/lib/sentry/config.ex @@ -394,17 +394,6 @@ defmodule Sentry.Config do Use `Sentry.LogsHandler` to capture log events from Erlang's `:logger`. *Available since 12.0.0*. """ - ], - max_log_events: [ - type: :non_neg_integer, - default: 100, - doc: """ - The maximum number of log events to buffer before flushing to Sentry. - Log events are buffered and sent in batches to reduce network overhead. - When the buffer reaches this size, it will be flushed immediately. - Otherwise, logs are flushed every 5 seconds. Only applies when `:enable_logs` - is `true`. *Available since 12.0.0*. - """ ] ] @@ -486,6 +475,41 @@ defmodule Sentry.Config do connections to keep in the pool. Only applied if `:client` is set to `Sentry.HackneyClient`. """ + ], + telemetry_buffer_capacities: [ + type: {:map, {:in, [:log]}, :pos_integer}, + default: %{}, + type_doc: "`%{category => pos_integer()}`", + doc: """ + Overrides for the maximum number of items each telemetry buffer can hold. + When a buffer reaches capacity, oldest items are dropped to make room. + Currently only the `:log` category is managed by the TelemetryProcessor. + Default: log=1000. + *Available since v12.0.0*. + """ + ], + telemetry_scheduler_weights: [ + type: {:map, {:in, [:low]}, :pos_integer}, + default: %{}, + type_doc: "`%{priority => pos_integer()}`", + doc: """ + Overrides for the weighted round-robin scheduler priority weights. + Higher weights mean more sending slots for that priority level. + Currently only the `:low` priority (logs) is managed by the TelemetryProcessor. + Default: low=2. + *Available since v12.0.0*. + """ + ], + transport_capacity: [ + type: :pos_integer, + default: 1000, + doc: """ + Maximum number of items the transport queue can hold. For log envelopes, + each log event counts as one item toward capacity. When the queue is full, + the scheduler stops dequeuing from buffers until space becomes available. + The transport queue processes one envelope at a time. + *Available since v12.0.0*. + """ ] ] @@ -827,8 +851,14 @@ defmodule Sentry.Config do @spec enable_logs?() :: boolean() def enable_logs?, do: fetch!(:enable_logs) - @spec max_log_events() :: non_neg_integer() - def max_log_events, do: fetch!(:max_log_events) + @spec telemetry_buffer_capacities() :: %{Sentry.Telemetry.Category.t() => pos_integer()} + def telemetry_buffer_capacities, do: fetch!(:telemetry_buffer_capacities) + + @spec telemetry_scheduler_weights() :: %{Sentry.Telemetry.Category.priority() => pos_integer()} + def telemetry_scheduler_weights, do: fetch!(:telemetry_scheduler_weights) + + @spec transport_capacity() :: pos_integer() + def transport_capacity, do: fetch!(:transport_capacity) @spec before_send_log() :: (Sentry.LogEvent.t() -> Sentry.LogEvent.t() | nil | false) | {module(), atom()} | nil diff --git a/lib/sentry/envelope.ex b/lib/sentry/envelope.ex index 6da3cf7c..7a8519e7 100644 --- a/lib/sentry/envelope.ex +++ b/lib/sentry/envelope.ex @@ -114,6 +114,20 @@ defmodule Sentry.Envelope do def get_data_category(%Event{}), do: "error" def get_data_category(%LogBatch{}), do: "log_item" + @doc """ + Returns the total number of payload items in the envelope. + + For log envelopes, this counts individual log events within the LogBatch. + For other envelope types, each item counts as 1. + """ + @spec item_count(t()) :: non_neg_integer() + def item_count(%__MODULE__{items: items}) do + Enum.reduce(items, 0, fn + %LogBatch{log_events: log_events}, acc -> acc + length(log_events) + _other, acc -> acc + 1 + end) + end + @doc """ Encodes the envelope into its binary representation. diff --git a/lib/sentry/log_event_buffer.ex b/lib/sentry/log_event_buffer.ex deleted file mode 100644 index 4599de92..00000000 --- a/lib/sentry/log_event_buffer.ex +++ /dev/null @@ -1,249 +0,0 @@ -defmodule Sentry.LogEventBuffer do - @moduledoc false - # Internal module for buffering log events before sending them to Sentry. - # - # This module is responsible for: - # - Buffering log events in memory - # - Flushing events when the buffer is full or after a timeout - # - Managing the lifecycle of the buffer process - # - # Per the Sentry Logs Protocol spec: - # - Events are flushed when buffer reaches max_events (default 100) OR every 5 seconds - # - Maximum of 1000 events can be queued to prevent memory issues - - use GenServer - require Logger - - alias Sentry.{Config, LogEvent} - - @flush_interval_ms 5_000 - @max_queue_size 1_000 - - @typedoc false - @type state :: %{ - events: [LogEvent.t()], - count: non_neg_integer(), - max_events: non_neg_integer(), - timer_ref: reference() | nil - } - - ## Public API - - @doc """ - Starts the log event buffer process. - - ## Options - - * `:name` - The name to register the process under. Defaults to `#{inspect(__MODULE__)}`. - * `:max_events` - Maximum events before flushing. Defaults to `Sentry.Config.max_log_events/0`. - - """ - @spec start_link(keyword()) :: GenServer.on_start() - def start_link(opts \\ []) do - {name, opts} = Keyword.pop(opts, :name, __MODULE__) - GenServer.start_link(__MODULE__, opts, name: name) - end - - @doc """ - Adds a log event to the buffer. - - If the buffer is full (max_queue_size reached), the event is dropped. - If the buffer reaches max_events, it is flushed immediately. - - In test mode with collection enabled, logs are collected immediately - and not added to the buffer. - - ## Options - - * `:server` - The server to add the event to. Defaults to `#{inspect(__MODULE__)}`. - - """ - @spec add_event(LogEvent.t(), keyword()) :: :ok - def add_event(%LogEvent{} = event, opts \\ []) do - server = Keyword.get(opts, :server, __MODULE__) - - # In test mode, try to collect immediately before buffering - # This ensures the caller chain is preserved for Sentry.Test collection - case Sentry.Test.maybe_collect_logs([event]) do - :collected -> - :ok - - :not_collecting -> - GenServer.cast(server, {:add_event, event}) - end - end - - @doc """ - Flushes all buffered events immediately. - - ## Options - - * `:server` - The server to flush. Defaults to `#{inspect(__MODULE__)}`. - - """ - @spec flush(keyword()) :: :ok - def flush(opts \\ []) do - server = Keyword.get(opts, :server, __MODULE__) - GenServer.call(server, :flush) - end - - @doc """ - Returns the current number of buffered events. - - ## Options - - * `:server` - The server to query. Defaults to `#{inspect(__MODULE__)}`. - - """ - @spec size(keyword()) :: non_neg_integer() - def size(opts \\ []) do - server = Keyword.get(opts, :server, __MODULE__) - GenServer.call(server, :size) - end - - ## GenServer callbacks - - @impl GenServer - def init(opts) do - max_events = Keyword.get(opts, :max_events, Config.max_log_events()) - - state = %{ - events: [], - count: 0, - max_events: max_events, - timer_ref: schedule_flush() - } - - {:ok, state} - end - - @impl GenServer - def handle_cast({:add_event, event}, state) do - # Check if queue is at max capacity - if state.count >= @max_queue_size do - # Drop the event to prevent memory issues - log_warning("Log event buffer is full (#{@max_queue_size} events), dropping event") - {:noreply, state} - else - events = [event | state.events] - new_count = state.count + 1 - - if new_count >= state.max_events do - # Flush immediately if we've reached max_events - send_events(events) - cancel_timer(state.timer_ref) - flush_stale_timeout_message() - {:noreply, %{state | events: [], count: 0, timer_ref: schedule_flush()}} - else - {:noreply, %{state | events: events, count: new_count}} - end - end - end - - @impl GenServer - def handle_call(:flush, _from, state) do - send_events(state.events) - cancel_timer(state.timer_ref) - flush_stale_timeout_message() - {:reply, :ok, %{state | events: [], count: 0, timer_ref: schedule_flush()}} - end - - @impl GenServer - def handle_call(:size, _from, state) do - {:reply, state.count, state} - end - - @impl GenServer - def handle_info(:flush_timeout, state) do - if state.count > 0 do - send_events(state.events) - end - - {:noreply, %{state | events: [], count: 0, timer_ref: schedule_flush()}} - end - - @impl GenServer - def terminate(_reason, state) do - # Flush any remaining events on shutdown - if state.count > 0 do - send_events(state.events) - end - - :ok - end - - ## Private helpers - - defp schedule_flush do - Process.send_after(self(), :flush_timeout, @flush_interval_ms) - end - - defp cancel_timer(nil), do: :ok - - defp cancel_timer(timer_ref) do - _ = Process.cancel_timer(timer_ref) - :ok - end - - # Flush any stale :flush_timeout message that may be in the queue. - # This can happen if the timer fires while we're in the middle of a flush. - defp flush_stale_timeout_message do - receive do - :flush_timeout -> - log_debug("Flushed stale timeout message") - :ok - after - 0 -> :ok - end - end - - defp log_warning(message) do - level = Config.log_level() - - if Logger.compare_levels(level, :warning) != :lt do - Logger.warning(message, domain: [:sentry]) - end - end - - defp log_debug(message) do - level = Config.log_level() - - if Logger.compare_levels(level, :debug) != :lt do - Logger.debug(message, domain: [:sentry]) - end - end - - defp send_events([]), do: :ok - - defp send_events(events) do - events = Enum.reverse(events) - - log_debug("[LogEventBuffer] Sending #{length(events)} log events to Sentry") - - # In test mode, send synchronously so tests can collect logs immediately - _ = - if Config.test_mode?() do - do_send_events(events) - else - # Send asynchronously via Task.Supervisor to avoid blocking and prevent unbounded task spawning - Task.Supervisor.start_child(__MODULE__.TaskSupervisor, fn -> do_send_events(events) end) - end - - :ok - end - - defp do_send_events(events) do - case Sentry.Client.send_log_batch(events) do - {:ok, envelope_id} -> - log_debug( - "[LogEventBuffer] Successfully sent #{length(events)} log events (envelope_id: #{envelope_id})" - ) - - :ok - - {:error, reason} -> - log_warning("[LogEventBuffer] Failed to send log events: #{inspect(reason)}") - :ok - end - end -end diff --git a/lib/sentry/logger_handler.ex b/lib/sentry/logger_handler.ex index d251473b..35b23c63 100644 --- a/lib/sentry/logger_handler.ex +++ b/lib/sentry/logger_handler.ex @@ -134,9 +134,9 @@ defmodule Sentry.LoggerHandler do If set to `:all`, all metadata will be included. This is independent from `:metadata`. """ ], - logs_buffer: [ + telemetry_processor: [ type: {:or, [:atom, :pid, {:tuple, [:atom, :atom]}]}, - default: Sentry.LogEventBuffer, + default: Sentry.TelemetryProcessor, type_doc: "`t:GenServer.server/0`", doc: false ] @@ -287,7 +287,7 @@ defmodule Sentry.LoggerHandler do :logs_level, :logs_excluded_domains, :logs_metadata, - :logs_buffer, + :telemetry_processor, backends: [] ] diff --git a/lib/sentry/logger_handler/logs_backend.ex b/lib/sentry/logger_handler/logs_backend.ex index d2508791..00ff9f38 100644 --- a/lib/sentry/logger_handler/logs_backend.ex +++ b/lib/sentry/logger_handler/logs_backend.ex @@ -11,13 +11,12 @@ defmodule Sentry.LoggerHandler.LogsBackend do # - `:logs_level` - Minimum log level to send # - `:logs_excluded_domains` - Domains to exclude # - `:logs_metadata` - Metadata keys to include as attributes - # - `:logs_buffer` - Buffer process for batching (defaults to LogEventBuffer) @behaviour Sentry.LoggerHandler.Backend require Logger - alias Sentry.{LogEvent, LogEventBuffer, LoggerUtils} + alias Sentry.{LogEvent, LoggerUtils, TelemetryProcessor} @impl true def handle_event(%{level: log_level, meta: log_meta} = log_event, config, _handler_id) do @@ -49,8 +48,8 @@ defmodule Sentry.LoggerHandler.LogsBackend do # Create log event log_event_struct = LogEvent.from_logger_event(log_event, attributes, parameters) - # Add to buffer - LogEventBuffer.add_event(log_event_struct, server: config.logs_buffer) + # Add to TelemetryProcessor buffer (use configured processor for test isolation) + TelemetryProcessor.add(config.telemetry_processor, log_event_struct) :ok end diff --git a/lib/sentry/telemetry/buffer.ex b/lib/sentry/telemetry/buffer.ex new file mode 100644 index 00000000..d64e2bf6 --- /dev/null +++ b/lib/sentry/telemetry/buffer.ex @@ -0,0 +1,208 @@ +defmodule Sentry.Telemetry.Buffer do + @moduledoc """ + A fixed-capacity FIFO buffer for telemetry items with batch-aware flushing. + + This module is both a GenServer and a struct. The struct holds the buffer state + (a bounded queue with overflow-drops-oldest semantics), while the GenServer + provides concurrent access for producers and consumers. + + ## Options + + * `:category` - The telemetry category (required), currently only `:log` + * `:name` - The name to register the GenServer under (optional) + * `:capacity` - Buffer capacity (defaults to category default) + * `:batch_size` - Items per batch (defaults to category default) + * `:timeout` - Flush timeout in ms (defaults to category default) + * `:on_item` - Optional callback function invoked when an item is added + + """ + @moduledoc since: "12.0.0" + + use GenServer + + alias __MODULE__ + alias Sentry.Telemetry.Category + + @enforce_keys [:category, :capacity, :batch_size] + defstruct [ + :category, + :capacity, + :batch_size, + :timeout, + :on_item, + :last_flush_time, + items: :queue.new(), + size: 0 + ] + + @type t :: %Buffer{ + category: Category.t(), + capacity: pos_integer(), + batch_size: pos_integer(), + timeout: pos_integer() | nil, + on_item: (-> any()) | nil, + last_flush_time: integer(), + items: :queue.queue(), + size: non_neg_integer() + } + + ## Public API + + @doc """ + Starts the buffer process. + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) when is_list(opts) do + {name, opts} = Keyword.pop(opts, :name) + + if name do + GenServer.start_link(__MODULE__, opts, name: name) + else + GenServer.start_link(__MODULE__, opts) + end + end + + @doc """ + Adds an item to the buffer. + + Returns `:ok`. If the buffer is full, the oldest item is dropped. + If an `on_item` callback was provided, it will be invoked. + """ + @spec add(GenServer.server(), term()) :: :ok + def add(server, item) do + GenServer.cast(server, {:add, item}) + end + + @doc """ + Polls a batch of items if the buffer is ready to flush. + + Returns `{:ok, items}` if ready, or `:not_ready` if not. + """ + @spec poll_if_ready(GenServer.server()) :: {:ok, [term()]} | :not_ready + def poll_if_ready(server) do + GenServer.call(server, :poll_if_ready) + end + + @doc """ + Drains all items from the buffer. + + Returns a list of all items in FIFO order. + """ + @spec drain(GenServer.server()) :: [term()] + def drain(server) do + GenServer.call(server, :drain) + end + + @doc """ + Returns the current number of items in the buffer. + """ + @spec size(GenServer.server()) :: non_neg_integer() + def size(server) do + GenServer.call(server, :size) + end + + @doc """ + Returns whether the buffer is ready to flush. + """ + @spec is_ready?(GenServer.server()) :: boolean() + def is_ready?(server) do + GenServer.call(server, :is_ready?) + end + + @doc """ + Returns the buffer's category. + """ + @spec category(GenServer.server()) :: Category.t() + def category(server) do + GenServer.call(server, :category) + end + + ## GenServer Callbacks + + @impl true + def init(opts) do + category = Keyword.fetch!(opts, :category) + defaults = Category.default_config(category) + + state = %Buffer{ + category: category, + capacity: Keyword.get(opts, :capacity, defaults.capacity), + batch_size: Keyword.get(opts, :batch_size, defaults.batch_size), + timeout: Keyword.get(opts, :timeout, defaults.timeout), + on_item: Keyword.get(opts, :on_item), + last_flush_time: System.monotonic_time(:millisecond) + } + + {:ok, state} + end + + @impl true + def handle_cast({:add, item}, %Buffer{} = state) do + state = offer(state, item) + + if state.on_item, do: state.on_item.() + + {:noreply, state} + end + + @impl true + def handle_call(:poll_if_ready, _from, %Buffer{} = state) do + if ready_to_flush?(state) do + batch_count = min(state.batch_size, state.size) + {items, state} = poll_batch(state, batch_count) + state = %{state | last_flush_time: System.monotonic_time(:millisecond)} + {:reply, {:ok, items}, state} + else + {:reply, :not_ready, state} + end + end + + def handle_call(:drain, _from, %Buffer{} = state) do + items = :queue.to_list(state.items) + {:reply, items, %{state | items: :queue.new(), size: 0}} + end + + def handle_call(:size, _from, %Buffer{} = state) do + {:reply, state.size, state} + end + + def handle_call(:is_ready?, _from, %Buffer{} = state) do + {:reply, ready_to_flush?(state), state} + end + + def handle_call(:category, _from, %Buffer{} = state) do + {:reply, state.category, state} + end + + defp offer(%Buffer{size: size, capacity: capacity} = state, item) + when size >= capacity do + {{:value, _dropped}, items} = :queue.out(state.items) + %{state | items: :queue.in(item, items)} + end + + defp offer(%Buffer{} = state, item) do + %{state | items: :queue.in(item, state.items), size: state.size + 1} + end + + defp poll_batch(state, count), do: poll_batch(state, count, []) + defp poll_batch(state, 0, acc), do: {Enum.reverse(acc), state} + defp poll_batch(%{size: 0} = state, _count, acc), do: {Enum.reverse(acc), state} + + defp poll_batch(state, count, acc) do + {{:value, item}, items} = :queue.out(state.items) + state = %{state | items: items, size: state.size - 1} + poll_batch(state, count - 1, [item | acc]) + end + + defp ready_to_flush?(%{size: 0}), do: false + + defp ready_to_flush?(%{size: size, batch_size: batch_size} = state) do + size >= batch_size or timeout_elapsed?(state) + end + + defp timeout_elapsed?(%{timeout: nil}), do: false + + defp timeout_elapsed?(%{timeout: timeout, last_flush_time: last_flush_time}) do + System.monotonic_time(:millisecond) - last_flush_time >= timeout + end +end diff --git a/lib/sentry/telemetry/category.ex b/lib/sentry/telemetry/category.ex new file mode 100644 index 00000000..572cadf1 --- /dev/null +++ b/lib/sentry/telemetry/category.ex @@ -0,0 +1,117 @@ +defmodule Sentry.Telemetry.Category do + @moduledoc """ + Defines telemetry categories for the Sentry SDK with their priorities and default configurations. + + The TelemetryProcessor uses categories to classify different types of telemetry data + and prioritize their sending based on a weighted round-robin scheduler. + + Currently, only the `:log` category is managed by the TelemetryProcessor. + Other categories (errors, transactions, check-ins) will be added in future versions. + + ## Categories + + * `:log` - Log entries (low priority) + + ## Priorities and Weights + + * `:low` - weight 2 (logs) + + """ + @moduledoc since: "12.0.0" + + @typedoc "Telemetry category types." + @type t :: :log + + @typedoc "Priority levels for categories." + @type priority :: :low + + @typedoc "Buffer configuration for a category." + @type config :: %{ + capacity: pos_integer(), + batch_size: pos_integer(), + timeout: pos_integer() | nil + } + + @priorities [:low] + @categories [:log] + + @weights %{ + low: 2 + } + + @default_configs %{ + log: %{capacity: 1000, batch_size: 100, timeout: 5000} + } + + @doc """ + Returns the priority level for a given category. + + ## Examples + + iex> Sentry.Telemetry.Category.priority(:log) + :low + + """ + @spec priority(t()) :: priority() + def priority(:log), do: :low + + @doc """ + Returns the weight for a given priority level. + + Weights determine how many slots each priority gets in the round-robin cycle. + + ## Examples + + iex> Sentry.Telemetry.Category.weight(:low) + 2 + + """ + @spec weight(priority()) :: pos_integer() + def weight(priority) when priority in @priorities do + Map.fetch!(@weights, priority) + end + + @doc """ + Returns the default buffer configuration for a given category. + + ## Configuration keys + + * `:capacity` - Maximum items the buffer can hold + * `:batch_size` - Number of items to send per batch + * `:timeout` - Flush timeout in milliseconds (nil for immediate) + + ## Examples + + iex> Sentry.Telemetry.Category.default_config(:log) + %{capacity: 1000, batch_size: 100, timeout: 5000} + + """ + @spec default_config(t()) :: config() + def default_config(category) when category in @categories do + Map.fetch!(@default_configs, category) + end + + @doc """ + Returns all telemetry categories. + + ## Examples + + iex> Sentry.Telemetry.Category.all() + [:log] + + """ + @spec all() :: [t()] + def all, do: @categories + + @doc """ + Returns all priority levels in descending order (highest to lowest). + + ## Examples + + iex> Sentry.Telemetry.Category.priorities() + [:low] + + """ + @spec priorities() :: [priority()] + def priorities, do: @priorities +end diff --git a/lib/sentry/telemetry/scheduler.ex b/lib/sentry/telemetry/scheduler.ex new file mode 100644 index 00000000..0558ac44 --- /dev/null +++ b/lib/sentry/telemetry/scheduler.ex @@ -0,0 +1,434 @@ +defmodule Sentry.Telemetry.Scheduler do + @moduledoc """ + GenServer implementing a weighted round-robin scheduler for telemetry buffers. + + The scheduler cycles through category buffers based on priority weights, + ensuring critical telemetry gets priority over high-volume data under load. + + Currently, only the `:log` category is managed. The weighted round-robin + structure is in place for future expansion to additional categories. + + ## Weights + + * `:low` - weight 2 (logs) + + ## Signal-Based Wake + + The scheduler sleeps until signaled via `signal/1`. When signaled, it wakes + and attempts to process items from the current buffer in the cycle. If the + buffer is not ready or the transport queue is full, it advances to the next position. + + ## Transport Queue + + The scheduler includes a bounded FIFO queue for transport concurrency control, + processing one envelope at a time (single-worker model). The queue is capped + at a configurable capacity (default 1000 items) to prevent unbounded memory growth. + For log envelopes, each log event counts as one item toward capacity. + + """ + @moduledoc since: "12.0.0" + + use GenServer + + require Logger + + alias __MODULE__ + + alias Sentry.Telemetry.{Buffer, Category} + alias Sentry.{ClientReport, Config, Envelope, LogEvent, Transport} + + @default_capacity 1000 + + @type buffers :: %{ + log: GenServer.server() + } + + defstruct [ + :buffers, + :priority_cycle, + :cycle_position, + :on_envelope, + :capacity, + :active_ref, + :active_item_count, + queue: :queue.new(), + size: 0 + ] + + @type t :: %Scheduler{ + buffers: buffers(), + priority_cycle: [Category.t()], + cycle_position: non_neg_integer(), + on_envelope: (Envelope.t() -> any()) | nil, + capacity: pos_integer(), + queue: :queue.queue(), + size: non_neg_integer(), + active_ref: reference() | nil, + active_item_count: non_neg_integer() + } + + ## Public API + + @doc """ + Builds a priority cycle based on category weights. + + Returns a list of categories where each category appears a number of times + equal to its priority weight. + + ## Examples + + iex> Sentry.Telemetry.Scheduler.build_priority_cycle() + [:log, :log] + + """ + @spec build_priority_cycle(map() | nil) :: [Category.t()] + def build_priority_cycle(weights \\ nil) + def build_priority_cycle(nil), do: build_priority_cycle(default_weights()) + + def build_priority_cycle(weights) when weights == %{}, + do: build_priority_cycle(default_weights()) + + def build_priority_cycle(weights) when is_map(weights) do + merged_weights = Map.merge(default_weights(), weights) + + for {category, priority} <- category_priority_mapping(), + _i <- 1..Map.fetch!(merged_weights, priority) do + category + end + end + + @doc """ + Starts the scheduler process. + + ## Options + + * `:buffers` - Map of category to buffer pid (required) + * `:name` - The name to register the GenServer under (optional) + * `:weights` - Custom priority weights (optional) + * `:on_envelope` - Callback function invoked with built envelopes (optional) + * `:capacity` - Maximum items in the transport queue (default: #{@default_capacity}) + + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) when is_list(opts) do + {name, opts} = Keyword.pop(opts, :name) + + if name do + GenServer.start_link(__MODULE__, opts, name: name) + else + GenServer.start_link(__MODULE__, opts) + end + end + + @doc """ + Signals the scheduler to wake and process items. + + This is a non-blocking call that triggers the scheduler to check buffers + and send any ready items. + """ + @spec signal(GenServer.server()) :: :ok + def signal(server) do + GenServer.cast(server, :signal) + end + + @doc """ + Flushes all buffers by draining their contents and sending all items. + + This is a blocking call that returns when all items have been processed, + including any envelopes queued for transport. + """ + @spec flush(GenServer.server(), timeout()) :: :ok + def flush(server, timeout \\ 5000) do + GenServer.call(server, :flush, timeout) + end + + ## GenServer Callbacks + + @impl true + def init(opts) do + buffers = Keyword.fetch!(opts, :buffers) + weights = Keyword.get(opts, :weights) + on_envelope = Keyword.get(opts, :on_envelope) + capacity = Keyword.get(opts, :capacity, @default_capacity) + + priority_cycle = build_priority_cycle(weights) + + state = %Scheduler{ + buffers: buffers, + priority_cycle: priority_cycle, + cycle_position: 0, + on_envelope: on_envelope, + capacity: capacity + } + + {:ok, state} + end + + @impl true + def handle_cast(:signal, %Scheduler{} = state) do + state = process_cycle(state) + {:noreply, state} + end + + @impl true + def handle_call(:flush, _from, %Scheduler{} = state) do + state = flush_all_buffers(state) + state = wait_for_active(state) + state = flush_queue(state) + {:reply, :ok, state} + end + + @impl true + def handle_info({:DOWN, ref, :process, _pid, reason}, %{active_ref: ref} = state) do + if reason != :normal do + Logger.warning("Sentry transport send process exited abnormally: #{inspect(reason)}") + end + + state = %{ + state + | active_ref: nil, + size: state.size - state.active_item_count, + active_item_count: 0 + } + + state = maybe_process_next(state) + + {:noreply, state} + end + + def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do + {:noreply, state} + end + + defp process_cycle(%Scheduler{} = state) do + cycle_length = length(state.priority_cycle) + max_attempts = cycle_length + + process_cycle(state, 0, max_attempts) + end + + defp process_cycle(state, attempts, max_attempts) when attempts >= max_attempts do + state + end + + defp process_cycle(%Scheduler{} = state, attempts, max_attempts) do + if not transport_has_space?(state) do + # Transport queue is full, stop processing. Items stay in buffers. + state + else + category = Enum.at(state.priority_cycle, state.cycle_position) + buffer = Map.fetch!(state.buffers, category) + + case Buffer.poll_if_ready(buffer) do + {:ok, items} when items != [] -> + state = send_items(state, category, items) + state = advance_cycle(state) + process_cycle(state, attempts + 1, max_attempts) + + _ -> + state = advance_cycle(state) + process_cycle(state, attempts + 1, max_attempts) + end + end + end + + defp send_items(state, :log, log_events) do + process_and_send_logs(state, log_events, &send_envelope/2) + end + + defp flush_all_buffers(%Scheduler{} = state) do + for {category, buffer} <- state.buffers do + items = Buffer.drain(buffer) + + if items != [] do + case category do + :log -> process_and_send_logs(state, items, &send_envelope_direct/2) + end + end + end + + state + end + + defp process_and_send_logs(%{on_envelope: on_envelope} = state, log_events, send_fn) do + processed_logs = apply_before_send_log_callbacks(log_events) + + if processed_logs != [] do + # Skip test collection when on_envelope is set (used by unit tests) + if is_nil(on_envelope) do + case Sentry.Test.maybe_collect_logs(processed_logs) do + :collected -> + state + + :not_collecting -> + envelope = Envelope.from_log_events(processed_logs) + send_fn.(state, envelope) + end + else + envelope = Envelope.from_log_events(processed_logs) + send_fn.(state, envelope) + end + else + state + end + end + + defp apply_before_send_log_callbacks(log_events) do + callback = Config.before_send_log() + + if callback do + for log_event <- log_events, + %LogEvent{} = modified_event <- [call_before_send_log(log_event, callback)] do + modified_event + end + else + log_events + end + end + + defp call_before_send_log(log_event, function) when is_function(function, 1) do + function.(log_event) + rescue + error -> + Logger.warning("before_send_log callback failed: #{inspect(error)}") + + log_event + end + + defp call_before_send_log(log_event, {mod, fun}) do + apply(mod, fun, [log_event]) + rescue + error -> + Logger.warning("before_send_log callback failed: #{inspect(error)}") + + log_event + end + + defp advance_cycle(%Scheduler{} = state) do + cycle_length = length(state.priority_cycle) + new_position = rem(state.cycle_position + 1, cycle_length) + %{state | cycle_position: new_position} + end + + # Used during normal processing - enqueues to internal transport queue + defp send_envelope(%Scheduler{on_envelope: callback} = state, envelope) + when is_function(callback, 1) do + callback.(envelope) + state + end + + defp send_envelope(%Scheduler{on_envelope: nil} = state, envelope) do + item_count = Envelope.item_count(envelope) + + if state.size + item_count > state.capacity do + Logger.warning("Sentry: transport queue full, dropping #{item_count} log item(s)") + + ClientReport.Sender.record_discarded_events(:queue_overflow, envelope.items) + state + else + queue = :queue.in({envelope, item_count}, state.queue) + state = %{state | queue: queue, size: state.size + item_count} + maybe_process_next(state) + end + end + + # Used during flush - bypasses transport queue, sends directly or via callback + defp send_envelope_direct(%Scheduler{on_envelope: callback}, envelope) + when is_function(callback, 1) do + callback.(envelope) + end + + defp send_envelope_direct(%Scheduler{}, envelope) do + send_direct(envelope) + end + + defp send_direct(envelope) do + client = Config.client() + request_retries = Application.get_env(:sentry, :request_retries, Transport.default_retries()) + + case Transport.encode_and_post_envelope(envelope, client, request_retries) do + {:ok, _id} -> + :ok + + {:error, error} -> + Logger.warning("Sentry: failed to send log envelope: #{Exception.message(error)}") + + {:error, error} + end + end + + defp transport_has_space?(%{on_envelope: cb}) when is_function(cb, 1), do: true + defp transport_has_space?(%{size: size, capacity: capacity}), do: size < capacity + + # Transport queue management (merged from QueueWorker) + + defp maybe_process_next(%{active_ref: ref} = state) when not is_nil(ref), do: state + + defp maybe_process_next(%Scheduler{} = state) do + case :queue.out(state.queue) do + {{:value, {envelope, item_count}}, queue} -> + {_pid, ref} = spawn_monitor(fn -> send(envelope) end) + %{state | queue: queue, active_ref: ref, active_item_count: item_count} + + {:empty, _queue} -> + state + end + end + + defp send(envelope) do + client = Config.client() + request_retries = Application.get_env(:sentry, :request_retries, Transport.default_retries()) + Transport.encode_and_post_envelope(envelope, client, request_retries) + end + + defp wait_for_active(%{active_ref: nil} = state), do: state + + defp wait_for_active(%{active_ref: ref} = state) do + receive do + {:DOWN, ^ref, :process, _pid, _reason} -> + %{ + state + | active_ref: nil, + size: state.size - state.active_item_count, + active_item_count: 0 + } + after + 5000 -> + Process.demonitor(ref, [:flush]) + + %{ + state + | active_ref: nil, + size: state.size - state.active_item_count, + active_item_count: 0 + } + end + end + + defp flush_queue(%Scheduler{} = state) do + {entries, queue} = drain_queue(state.queue) + Enum.each(entries, fn {envelope, _item_count} -> send(envelope) end) + %{state | queue: queue, size: 0} + end + + defp drain_queue(queue), do: drain_queue(queue, []) + + defp drain_queue(queue, acc) do + case :queue.out(queue) do + {{:value, entry}, queue} -> drain_queue(queue, [entry | acc]) + {:empty, queue} -> {Enum.reverse(acc), queue} + end + end + + defp default_weights do + %{ + low: Category.weight(:low) + } + end + + defp category_priority_mapping do + [ + {:log, :low} + ] + end +end diff --git a/lib/sentry/telemetry_processor.ex b/lib/sentry/telemetry_processor.ex new file mode 100644 index 00000000..10ef897b --- /dev/null +++ b/lib/sentry/telemetry_processor.ex @@ -0,0 +1,299 @@ +defmodule Sentry.TelemetryProcessor do + @moduledoc """ + Supervisor managing telemetry buffers and scheduler for the Sentry SDK. + + The TelemetryProcessor is the coordinator for log telemetry data flowing + through the SDK. It manages a log ring buffer coordinated by a weighted + round-robin scheduler. + + Currently, only logs are managed by the TelemetryProcessor. Other categories + (errors, transactions, check-ins) will be added in future versions. + + ## Architecture + + The processor starts as a supervisor with the following children: + + * Log Buffer - for log entries (low priority) + * Scheduler - weighted round-robin scheduler with integrated transport queue + + ## Usage + + # Add log events to the buffer + TelemetryProcessor.add(processor, %Sentry.LogEvent{...}) + + # Flush all pending items + TelemetryProcessor.flush(processor) + + """ + @moduledoc since: "12.0.0" + + use Supervisor + + alias Sentry.Telemetry.{Buffer, Category, Scheduler} + alias Sentry.LogEvent + + @default_name __MODULE__ + + @type option :: + {:name, atom()} + | {:buffer_capacities, %{Category.t() => pos_integer()}} + | {:buffer_configs, %{Category.t() => map()}} + | {:scheduler_weights, %{Category.priority() => pos_integer()}} + | {:on_envelope, (Sentry.Envelope.t() -> any())} + | {:transport_capacity, pos_integer()} + + ## Public API + + @doc """ + Returns the default processor name. + """ + @spec default_name() :: atom() + def default_name, do: @default_name + + @doc """ + Starts the TelemetryProcessor supervisor. + + ## Options + + * `:name` - Name to register the supervisor under (defaults to `#{inspect(@default_name)}`) + * `:buffer_capacities` - Map of category to capacity override (optional) + * `:buffer_configs` - Map of category to config map with `:capacity`, `:batch_size`, `:timeout` (optional) + * `:scheduler_weights` - Map of priority to weight override (optional) + * `:on_envelope` - Callback function invoked when envelopes are ready to send (optional) + * `:transport_capacity` - Maximum number of items the transport queue can hold (default: 1000). For log envelopes, each log event counts as one item. + + ## Examples + + TelemetryProcessor.start_link() + + TelemetryProcessor.start_link( + buffer_capacities: %{log: 2000}, + scheduler_weights: %{low: 3} + ) + + """ + @spec start_link([option()]) :: Supervisor.on_start() + def start_link(opts \\ []) do + {name, opts} = Keyword.pop(opts, :name, @default_name) + Supervisor.start_link(__MODULE__, [{:processor_name, name} | opts], name: name) + end + + @doc """ + Adds a log event to the log buffer. + + Uses the processor from process dictionary or the default (`#{inspect(@default_name)}`). + See `add/2` for the version accepting a custom processor. + + Returns `:ok`. + """ + @spec add(LogEvent.t()) :: :ok + def add(%LogEvent{} = item) do + add(processor_name(), item) + end + + @doc """ + Adds a log event to the log buffer. + + After adding, the scheduler is signaled to wake and process items. + + Returns `:ok`. + """ + @spec add(Supervisor.supervisor(), LogEvent.t()) :: :ok + def add(processor, %LogEvent{} = item) when is_atom(processor) do + Buffer.add(buffer_name(processor, :log), item) + Scheduler.signal(scheduler_name(processor)) + :ok + end + + def add(processor, %LogEvent{} = item) do + buffer = get_buffer(processor, :log) + Buffer.add(buffer, item) + scheduler = get_scheduler(processor) + Scheduler.signal(scheduler) + :ok + end + + @doc """ + Flushes all buffers by draining their contents and sending all items. + + Uses the processor from process dictionary or the default (`#{inspect(@default_name)}`). + This is a blocking call that returns when all items have been processed. + """ + @spec flush() :: :ok + def flush do + flush(processor_name()) + end + + @doc """ + Flushes all buffers by draining their contents and sending all items. + + This is a blocking call that returns when all items have been processed. + The optional timeout specifies how long to wait (default: 5000ms). + """ + @spec flush(Supervisor.supervisor(), timeout()) :: :ok + def flush(processor, timeout \\ 5000) + + def flush(processor, timeout) when is_atom(processor) do + Scheduler.flush(scheduler_name(processor), timeout) + end + + def flush(processor, timeout) do + scheduler = get_scheduler(processor) + Scheduler.flush(scheduler, timeout) + end + + @doc """ + Returns the buffer pid for a given category. + """ + @spec get_buffer(Supervisor.supervisor(), Category.t()) :: pid() + def get_buffer(processor, category) when category == :log do + children = Supervisor.which_children(processor) + buffer_id = buffer_id(category) + + case List.keyfind(children, buffer_id, 0) do + {^buffer_id, pid, :worker, _} when is_pid(pid) -> pid + _ -> raise "Buffer not found for category: #{category}" + end + end + + @doc """ + Returns the scheduler pid. + """ + @spec get_scheduler(Supervisor.supervisor()) :: pid() + def get_scheduler(processor) do + children = Supervisor.which_children(processor) + + case List.keyfind(children, :scheduler, 0) do + {:scheduler, pid, :worker, _} when is_pid(pid) -> pid + _ -> raise "Scheduler not found" + end + end + + @doc """ + Returns the current size of the log buffer. + + Uses the processor from process dictionary or the default. + Returns 0 if the processor is not running. + """ + @spec buffer_size(Category.t()) :: non_neg_integer() + def buffer_size(category) when category == :log do + buffer_size(processor_name(), category) + end + + @doc """ + Returns the current size of a buffer for a given category. + + Returns 0 if the processor is not running. + """ + @spec buffer_size(Supervisor.supervisor(), Category.t()) :: non_neg_integer() + def buffer_size(processor, category) when category == :log do + case safe_get_buffer(processor, category) do + {:ok, buffer} -> Buffer.size(buffer) + :error -> 0 + end + end + + defp safe_get_buffer(processor, category) when is_atom(processor) do + try do + {:ok, + buffer_name(processor, category) |> GenServer.whereis() || get_buffer(processor, category)} + catch + :exit, _ -> :error + end + end + + defp safe_get_buffer(processor, category) do + try do + {:ok, get_buffer(processor, category)} + catch + :exit, _ -> :error + end + end + + @impl true + def init(opts) do + processor_name = Keyword.fetch!(opts, :processor_name) + buffer_capacities = Keyword.get(opts, :buffer_capacities, %{}) + buffer_configs = Keyword.get(opts, :buffer_configs, %{}) + scheduler_weights = Keyword.get(opts, :scheduler_weights) + on_envelope = Keyword.get(opts, :on_envelope) + transport_capacity = Keyword.get(opts, :transport_capacity, 1000) + + buffer_names = + for category <- Category.all(), into: %{} do + {category, buffer_name(processor_name, category)} + end + + buffer_specs = + for category <- Category.all() do + config = build_buffer_config(category, buffer_capacities, buffer_configs) + + %{ + id: buffer_id(category), + start: + {Buffer, :start_link, + [ + [ + category: category, + name: Map.fetch!(buffer_names, category), + capacity: config.capacity, + batch_size: config.batch_size, + timeout: config.timeout + ] + ]} + } + end + + scheduler_opts = + [ + buffers: %{ + log: Map.fetch!(buffer_names, :log) + }, + name: scheduler_name(processor_name), + capacity: transport_capacity + ] + |> maybe_add_opt(:weights, scheduler_weights) + |> maybe_add_opt(:on_envelope, on_envelope) + + scheduler_spec = %{ + id: :scheduler, + start: {Scheduler, :start_link, [scheduler_opts]} + } + + children = buffer_specs ++ [scheduler_spec] + + Supervisor.init(children, strategy: :one_for_one) + end + + defp buffer_id(:log), do: :log_buffer + + @doc false + @spec buffer_name(atom(), Category.t()) :: atom() + def buffer_name(processor, category), do: :"#{processor}.buffer.#{category}" + + @doc false + @spec scheduler_name(atom()) :: atom() + def scheduler_name(processor), do: :"#{processor}.scheduler" + + defp build_buffer_config(category, capacities, configs) do + defaults = Category.default_config(category) + + config = + case Map.get(capacities, category) do + nil -> defaults + capacity -> Map.put(defaults, :capacity, capacity) + end + + case Map.get(configs, category) do + nil -> config + category_config -> Map.merge(config, category_config) + end + end + + defp processor_name do + Process.get(:sentry_telemetry_processor, @default_name) + end + + defp maybe_add_opt(opts, _key, nil), do: opts + defp maybe_add_opt(opts, key, value), do: Keyword.put(opts, key, value) +end diff --git a/lib/sentry/test.ex b/lib/sentry/test.ex index 9dab4914..514806f1 100644 --- a/lib/sentry/test.ex +++ b/lib/sentry/test.ex @@ -200,6 +200,29 @@ defmodule Sentry.Test do start_collecting(key: @events_key) start_collecting(key: @transactions_key) start_collecting(key: @logs_key) + + # Allow the TelemetryProcessor scheduler to collect log events on behalf of this process. + # Logs flow through the scheduler (a separate process) and need explicit + # permission in NimbleOwnership to store collected items for the test process. + try do + processor = + Process.get(:sentry_telemetry_processor, Sentry.TelemetryProcessor.default_name()) + + scheduler_name = Sentry.TelemetryProcessor.scheduler_name(processor) + scheduler_pid = GenServer.whereis(scheduler_name) + + if scheduler_pid do + case NimbleOwnership.allow(@server, self(), scheduler_pid, @logs_key) do + :ok -> :ok + {:error, %NimbleOwnership.Error{reason: {:already_allowed, _}}} -> :ok + {:error, _} -> :ok + end + end + catch + :exit, _ -> :ok + end + + :ok end @doc """ @@ -431,8 +454,8 @@ defmodule Sentry.Test do ...> body: "Test log message", ...> timestamp: System.system_time(:microsecond) / 1_000_000 ...> } - iex> Sentry.LogEventBuffer.add_event(log_event) - :ok + iex> Sentry.Test.maybe_collect_logs([log_event]) + :collected iex> [%Sentry.LogEvent{} = collected] = Sentry.Test.pop_sentry_logs() iex> collected.body "Test log message" diff --git a/test/envelope_test.exs b/test/envelope_test.exs index 7fb6fb15..3a9357fa 100644 --- a/test/envelope_test.exs +++ b/test/envelope_test.exs @@ -3,7 +3,7 @@ defmodule Sentry.EnvelopeTest do import Sentry.TestHelpers - alias Sentry.{Attachment, CheckIn, ClientReport, Envelope, Event} + alias Sentry.{Attachment, CheckIn, ClientReport, Envelope, Event, LogEvent} describe "to_binary/1" do test "encodes an envelope" do @@ -195,6 +195,51 @@ defmodule Sentry.EnvelopeTest do ] end + describe "item_count/1" do + test "counts log events in a log envelope" do + log_events = + Enum.map(1..5, fn _ -> + %LogEvent{ + timestamp: System.system_time(:nanosecond) / 1_000_000_000, + level: :info, + body: "test log" + } + end) + + envelope = Envelope.from_log_events(log_events) + assert Envelope.item_count(envelope) == 5 + end + + test "counts single event envelope as 1" do + event = Event.create_event([]) + envelope = Envelope.from_event(event) + assert Envelope.item_count(envelope) == 1 + end + + test "counts event with attachments" do + attachments = [ + %Attachment{data: "a", filename: "a.txt"}, + %Attachment{data: "b", filename: "b.txt"} + ] + + event = %Event{Event.create_event([]) | attachments: attachments} + envelope = Envelope.from_event(event) + # 1 event + 2 attachments + assert Envelope.item_count(envelope) == 3 + end + + test "counts check-in envelope as 1" do + check_in = %CheckIn{ + check_in_id: Sentry.UUID.uuid4_hex(), + monitor_slug: "test", + status: :ok + } + + envelope = Envelope.from_check_in(check_in) + assert Envelope.item_count(envelope) == 1 + end + end + test "returns correct data_category" do assert Envelope.get_data_category(%Sentry.Event{ event_id: Sentry.UUID.uuid4_hex(), diff --git a/test/sentry/config_test.exs b/test/sentry/config_test.exs index 92e4e86a..e5c26ac7 100644 --- a/test/sentry/config_test.exs +++ b/test/sentry/config_test.exs @@ -289,6 +289,49 @@ defmodule Sentry.ConfigTest do assert output =~ ":hackney_pool_max_connections option is deprecated" end + + test ":telemetry_buffer_capacities" do + # Default value is empty map + assert Config.validate!([])[:telemetry_buffer_capacities] == %{} + + # Custom value with valid categories + capacities = %{log: 2000} + + assert Config.validate!(telemetry_buffer_capacities: capacities)[ + :telemetry_buffer_capacities + ] == capacities + + # Invalid: wrong category key + assert_raise ArgumentError, ~r/telemetry_buffer_capacities/, fn -> + Config.validate!(telemetry_buffer_capacities: %{invalid: 100}) + end + + # Invalid: not a positive integer value + assert_raise ArgumentError, ~r/telemetry_buffer_capacities/, fn -> + Config.validate!(telemetry_buffer_capacities: %{log: 0}) + end + end + + test ":telemetry_scheduler_weights" do + # Default value is empty map + assert Config.validate!([])[:telemetry_scheduler_weights] == %{} + + # Custom value with valid priorities + weights = %{low: 5} + + assert Config.validate!(telemetry_scheduler_weights: weights)[:telemetry_scheduler_weights] == + weights + + # Invalid: wrong priority key + assert_raise ArgumentError, ~r/telemetry_scheduler_weights/, fn -> + Config.validate!(telemetry_scheduler_weights: %{invalid: 5}) + end + + # Invalid: not a positive integer value + assert_raise ArgumentError, ~r/telemetry_scheduler_weights/, fn -> + Config.validate!(telemetry_scheduler_weights: %{low: 0}) + end + end end describe "put_config/2" do diff --git a/test/sentry/logger_handler/logs_test.exs b/test/sentry/logger_handler/logs_test.exs index d8d076fb..be6f4b15 100644 --- a/test/sentry/logger_handler/logs_test.exs +++ b/test/sentry/logger_handler/logs_test.exs @@ -3,7 +3,7 @@ defmodule Sentry.LoggerHandler.LogsTest do import Sentry.TestHelpers - alias Sentry.LogEventBuffer + alias Sentry.TelemetryProcessor require Logger require OpenTelemetry.Tracer, as: Tracer @@ -15,20 +15,11 @@ defmodule Sentry.LoggerHandler.LogsTest do put_test_config( dsn: "http://public:secret@localhost:#{bypass.port}/1", - enable_logs: true, - max_log_events: 100 + enable_logs: true ) - buffer = :"log_event_buffer_#{System.unique_integer([:positive])}" - start_supervised!({LogEventBuffer, name: buffer}) - - on_exit(fn -> - if Process.whereis(buffer) do - LogEventBuffer.flush(server: buffer) - end - end) - - %{bypass: bypass, buffer: buffer} + # TelemetryProcessor is already started by Sentry.Case + %{bypass: bypass} end setup :add_logs_handler @@ -42,7 +33,7 @@ defmodule Sentry.LoggerHandler.LogsTest do end describe "logging with handler" do - test "creates log event and adds to buffer", %{bypass: bypass, buffer: buffer} do + test "creates log event and adds to buffer", %{bypass: bypass} do test_pid = self() Bypass.expect_once(bypass, "POST", "/api/1/envelope/", fn conn -> @@ -65,36 +56,35 @@ defmodule Sentry.LoggerHandler.LogsTest do Plug.Conn.resp(conn, 200, ~s<{"id": "test-123"}>) end) - initial_size = LogEventBuffer.size(server: buffer) + initial_size = TelemetryProcessor.buffer_size(:log) Logger.info("Test log message") - assert_buffer_size(buffer, initial_size + 1) + assert_buffer_size(nil, initial_size + 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end - test "filters logs below configured level", %{handler_name: handler_name, buffer: buffer} do + test "filters logs below configured level", %{handler_name: handler_name} do assert {:ok, config} = :logger.get_handler_config(handler_name) updated_config = %{config.config | logs_level: :warning} assert :ok = :logger.update_handler_config(handler_name, :config, updated_config) - initial_size = LogEventBuffer.size(server: buffer) + initial_size = TelemetryProcessor.buffer_size(:log) Logger.info("Info message should be filtered") Logger.debug("Debug message should be filtered") - wait_for_buffer_stable(buffer, initial_size) + wait_for_buffer_stable(nil, initial_size) - assert LogEventBuffer.size(server: buffer) == initial_size + assert TelemetryProcessor.buffer_size(:log) == initial_size end test "accepts logs at or above configured level", %{ handler_name: handler_name, - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -127,50 +117,49 @@ defmodule Sentry.LoggerHandler.LogsTest do updated_config = %{config.config | logs_level: :info} assert :ok = :logger.update_handler_config(handler_name, :config, updated_config) - initial_size = LogEventBuffer.size(server: buffer) + initial_size = TelemetryProcessor.buffer_size(:log) Logger.info("Info message") Logger.warning("Warning message") Logger.error("Error message") - assert_buffer_size(buffer, initial_size + 3) + assert_buffer_size(nil, initial_size + 3) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end - test "filters excluded domains", %{handler_name: handler_name, buffer: buffer} do + test "filters excluded domains", %{handler_name: handler_name} do assert {:ok, config} = :logger.get_handler_config(handler_name) updated_config = %{config.config | logs_excluded_domains: [:cowboy]} assert :ok = :logger.update_handler_config(handler_name, :config, updated_config) - initial_size = LogEventBuffer.size(server: buffer) + initial_size = TelemetryProcessor.buffer_size(:log) Logger.info("Cowboy message", domain: [:cowboy]) - wait_for_buffer_stable(buffer, initial_size) + wait_for_buffer_stable(nil, initial_size) - assert LogEventBuffer.size(server: buffer) == initial_size + assert TelemetryProcessor.buffer_size(:log) == initial_size end - test "includes logs from non-excluded domains", %{handler_name: handler_name, buffer: buffer} do + test "includes logs from non-excluded domains", %{handler_name: handler_name} do assert {:ok, config} = :logger.get_handler_config(handler_name) updated_config = %{config.config | logs_excluded_domains: [:cowboy]} assert :ok = :logger.update_handler_config(handler_name, :config, updated_config) - initial_size = LogEventBuffer.size(server: buffer) + initial_size = TelemetryProcessor.buffer_size(:log) Logger.info("Regular message") Logger.info("Phoenix message", domain: [:phoenix]) - assert_buffer_size(buffer, initial_size + 2) + assert_buffer_size(nil, initial_size + 2) end test "includes metadata as attributes", %{ handler_name: handler_name, - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -202,40 +191,41 @@ defmodule Sentry.LoggerHandler.LogsTest do updated_config = %{config.config | logs_metadata: [:request_id, :user_id]} assert :ok = :logger.update_handler_config(handler_name, :config, updated_config) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() Logger.metadata(request_id: "abc123", user_id: 42, other_meta: "should not be included") Logger.info("Request processed") - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end test "includes all metadata when configured with :all", %{ - handler_name: handler_name, - buffer: buffer + handler_name: handler_name } do assert {:ok, config} = :logger.get_handler_config(handler_name) updated_config = %{config.config | logs_metadata: :all} assert :ok = :logger.update_handler_config(handler_name, :config, updated_config) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() Logger.metadata(request_id: "abc123", user_id: 42, custom_field: "value") Logger.info("Request with metadata") - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) end - test "does not send logs when enable_logs is false at handler setup time" do - # Create a separate buffer for this test - isolated from the setup's handler - isolated_buffer = :"isolated_buffer_#{System.unique_integer([:positive])}" - start_supervised!({LogEventBuffer, name: isolated_buffer}, id: isolated_buffer) + test "does not send logs when enable_logs is false at handler setup time", %{ + handler_name: handler_name + } do + # Remove the main handler first so we can test with enable_logs: false + :ok = :logger.remove_handler(handler_name) - handler_name = :"sentry_logs_handler_disabled_#{System.unique_integer([:positive])}" + disabled_handler_name = + :"sentry_logs_handler_disabled_#{System.unique_integer([:positive])}" # Set enable_logs to false BEFORE adding a new handler put_test_config(enable_logs: false) @@ -244,33 +234,31 @@ defmodule Sentry.LoggerHandler.LogsTest do config: %{ logs_level: :info, logs_excluded_domains: [], - logs_metadata: [], - logs_buffer: isolated_buffer + logs_metadata: [] } } # Add handler with enable_logs: false - LogsBackend should NOT be included - assert :ok = :logger.add_handler(handler_name, Sentry.LoggerHandler, handler_config) + assert :ok = + :logger.add_handler(disabled_handler_name, Sentry.LoggerHandler, handler_config) on_exit(fn -> - _ = :logger.remove_handler(handler_name) + _ = :logger.remove_handler(disabled_handler_name) end) - initial_size = LogEventBuffer.size(server: isolated_buffer) - assert initial_size == 0 + initial_size = TelemetryProcessor.buffer_size(:log) Logger.info("Test message") # Give some time for the log to be processed Process.sleep(100) - # The isolated buffer should still be empty because LogsBackend was not enabled - assert LogEventBuffer.size(server: isolated_buffer) == 0 + # Buffer should still be at initial size because LogsBackend was not enabled + assert TelemetryProcessor.buffer_size(:log) == initial_size end test "generates trace_id when no trace context is available", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -293,20 +281,19 @@ defmodule Sentry.LoggerHandler.LogsTest do Plug.Conn.resp(conn, 200, ~s<{"id": "test-123"}>) end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() Logger.info("Log without trace") - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end test "captures message template with %s parameters via Logger metadata", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -348,21 +335,20 @@ defmodule Sentry.LoggerHandler.LogsTest do Plug.Conn.resp(conn, 200, ~s<{"id": "test-123"}>) end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() # Use Logger with %s template and parameters via metadata Logger.info("User %s logged in from %s", parameters: ["jane_doe", "192.168.1.1"]) - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end test "captures message template with %{key} named parameters", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -404,21 +390,20 @@ defmodule Sentry.LoggerHandler.LogsTest do Plug.Conn.resp(conn, 200, ~s<{"id": "test-123"}>) end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() # Use Logger with %{key} template and named parameters Logger.info("Hello %{name} from %{city}", parameters: %{name: "Jane", city: "NYC"}) - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end test "does not include template attributes for plain string messages", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -440,13 +425,13 @@ defmodule Sentry.LoggerHandler.LogsTest do Plug.Conn.resp(conn, 200, ~s<{"id": "test-123"}>) end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() Logger.info("Simple log message") - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end @@ -464,8 +449,7 @@ defmodule Sentry.LoggerHandler.LogsTest do end test "automatically includes trace context from OpenTelemetry spans", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -501,22 +485,21 @@ defmodule Sentry.LoggerHandler.LogsTest do end end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() Tracer.with_span "test_span" do Logger.info("Log inside OTel span") end - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end test "includes trace context from nested OpenTelemetry spans", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -554,7 +537,7 @@ defmodule Sentry.LoggerHandler.LogsTest do end end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() require OpenTelemetry.Tracer, as: Tracer @@ -566,16 +549,15 @@ defmodule Sentry.LoggerHandler.LogsTest do end end - assert_buffer_size(buffer, 2) + assert_buffer_size(nil, 2) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end test "works out-of-the-box when handler is configured", %{ - bypass: bypass, - buffer: buffer + bypass: bypass } do test_pid = self() @@ -604,22 +586,22 @@ defmodule Sentry.LoggerHandler.LogsTest do end end) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() Tracer.with_span "api_call" do Logger.info("Processing API request") end - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end end describe "before_send_log callback" do - test "allows modifying log events before sending", %{bypass: bypass, buffer: buffer} do + test "allows modifying log events before sending", %{bypass: bypass} do test_pid = self() put_test_config( @@ -646,14 +628,14 @@ defmodule Sentry.LoggerHandler.LogsTest do Logger.info("Test message") - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end - test "filters out log events when callback returns nil", %{bypass: bypass, buffer: buffer} do + test "filters out log events when callback returns nil", %{bypass: bypass} do put_test_config( before_send_log: fn log_event -> if String.contains?(log_event.body, "should_be_filtered") do @@ -681,14 +663,14 @@ defmodule Sentry.LoggerHandler.LogsTest do Logger.info("This message should_be_filtered") Logger.info("This message should pass") - assert_buffer_size(buffer, 2) + assert_buffer_size(nil, 2) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end - test "filters out log events when callback returns false", %{bypass: bypass, buffer: buffer} do + test "filters out log events when callback returns false", %{bypass: bypass} do put_test_config( before_send_log: fn log_event -> if String.contains?(log_event.body, "drop_me") do @@ -716,14 +698,14 @@ defmodule Sentry.LoggerHandler.LogsTest do Logger.info("drop_me please") Logger.info("Keep this message") - assert_buffer_size(buffer, 2) + assert_buffer_size(nil, 2) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end - test "supports MFA tuple callback format", %{bypass: bypass, buffer: buffer} do + test "supports MFA tuple callback format", %{bypass: bypass} do test_pid = self() put_test_config(before_send_log: {__MODULE__, :before_send_log_callback}) @@ -746,22 +728,22 @@ defmodule Sentry.LoggerHandler.LogsTest do Logger.info("Test MFA callback") - assert_buffer_size(buffer, 1) + assert_buffer_size(nil, 1) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() assert_receive :envelope_sent, 1000 end - test "does not send any logs when all are filtered", %{buffer: buffer} do + test "does not send any logs when all are filtered", %{} do put_test_config(before_send_log: fn _log_event -> nil end) Logger.info("All messages filtered 1") Logger.info("All messages filtered 2") - assert_buffer_size(buffer, 2) + assert_buffer_size(nil, 2) - LogEventBuffer.flush(server: buffer) + TelemetryProcessor.flush() refute_receive _, 100 end @@ -771,7 +753,7 @@ defmodule Sentry.LoggerHandler.LogsTest do %{log_event | attributes: Map.put(log_event.attributes, "mfa_added", "true")} end - defp add_logs_handler(%{buffer: buffer}) do + defp add_logs_handler(%{telemetry_processor: telemetry_processor}) do handler_name = :"sentry_logs_handler_#{System.unique_integer([:positive])}" handler_config = %{ @@ -779,7 +761,7 @@ defmodule Sentry.LoggerHandler.LogsTest do logs_level: :info, logs_excluded_domains: [], logs_metadata: [], - logs_buffer: buffer + telemetry_processor: telemetry_processor } } @@ -792,13 +774,13 @@ defmodule Sentry.LoggerHandler.LogsTest do %{handler_name: handler_name} end - defp assert_buffer_size(buffer, expected_size, timeout \\ 1000) do - wait_until(fn -> LogEventBuffer.size(server: buffer) == expected_size end, timeout) - assert LogEventBuffer.size(server: buffer) == expected_size + defp assert_buffer_size(_buffer, expected_size, timeout \\ 1000) do + wait_until(fn -> TelemetryProcessor.buffer_size(:log) == expected_size end, timeout) + assert TelemetryProcessor.buffer_size(:log) == expected_size end - defp wait_for_buffer_stable(buffer, expected_size, timeout \\ 1000) do - wait_until(fn -> LogEventBuffer.size(server: buffer) == expected_size end, timeout) + defp wait_for_buffer_stable(_buffer, expected_size, timeout \\ 1000) do + wait_until(fn -> TelemetryProcessor.buffer_size(:log) == expected_size end, timeout) end defp wait_until(condition_fn, timeout) do diff --git a/test/sentry/telemetry/buffer_test.exs b/test/sentry/telemetry/buffer_test.exs new file mode 100644 index 00000000..107cc121 --- /dev/null +++ b/test/sentry/telemetry/buffer_test.exs @@ -0,0 +1,209 @@ +defmodule Sentry.Telemetry.BufferTest do + use Sentry.Case, async: false + + alias Sentry.Telemetry.Buffer + alias Sentry.Telemetry.Category + + defp make_item(id) do + # Buffer is generic; we use simple maps as test items + %{id: id} + end + + describe "start_link/1" do + test "starts with required category option" do + assert {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_start) + assert Process.alive?(pid) + GenServer.stop(pid) + end + + test "uses default config from Category module" do + assert {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_defaults) + state = :sys.get_state(pid) + + defaults = Category.default_config(:log) + assert state.capacity == defaults.capacity + assert state.batch_size == defaults.batch_size + assert state.timeout == defaults.timeout + GenServer.stop(pid) + end + + test "allows overriding capacity, batch_size, and timeout" do + assert {:ok, pid} = + Buffer.start_link( + category: :log, + name: :test_buffer_override, + capacity: 50, + batch_size: 10, + timeout: 1000 + ) + + state = :sys.get_state(pid) + assert state.capacity == 50 + assert state.batch_size == 10 + assert state.timeout == 1000 + GenServer.stop(pid) + end + end + + describe "add/2" do + test "adds item to buffer" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_add) + assert :ok = Buffer.add(pid, make_item("test1")) + assert Buffer.size(pid) == 1 + GenServer.stop(pid) + end + + test "signals on_item callback when provided" do + test_pid = self() + + {:ok, pid} = + Buffer.start_link( + category: :log, + name: :test_buffer_signal, + on_item: fn -> send(test_pid, :item_added) end + ) + + Buffer.add(pid, make_item("test1")) + assert_receive :item_added, 100 + GenServer.stop(pid) + end + end + + describe "poll_if_ready/1" do + test "returns batch when ready (size >= batch_size)" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_poll, batch_size: 2) + Buffer.add(pid, make_item("e1")) + Buffer.add(pid, make_item("e2")) + + {:ok, items} = Buffer.poll_if_ready(pid) + assert length(items) == 2 + assert Buffer.size(pid) == 0 + GenServer.stop(pid) + end + + test "returns :not_ready when size < batch_size and no timeout" do + {:ok, pid} = + Buffer.start_link(category: :log, name: :test_buffer_not_ready, batch_size: 5) + + Buffer.add(pid, make_item("e1")) + + assert :not_ready = Buffer.poll_if_ready(pid) + GenServer.stop(pid) + end + end + + describe "drain/1" do + test "returns all items and empties buffer" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_drain) + Buffer.add(pid, make_item("e1")) + Buffer.add(pid, make_item("e2")) + + items = Buffer.drain(pid) + assert length(items) == 2 + assert Buffer.size(pid) == 0 + GenServer.stop(pid) + end + end + + describe "size/1" do + test "returns current buffer size" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_size) + assert Buffer.size(pid) == 0 + Buffer.add(pid, make_item("e1")) + assert Buffer.size(pid) == 1 + GenServer.stop(pid) + end + end + + describe "is_ready?/1" do + test "returns true when buffer is ready to flush" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_ready, batch_size: 1) + Buffer.add(pid, make_item("e1")) + assert Buffer.is_ready?(pid) == true + GenServer.stop(pid) + end + + test "returns false when buffer is not ready" do + {:ok, pid} = + Buffer.start_link(category: :log, name: :test_buffer_not_ready2, batch_size: 10) + + Buffer.add(pid, make_item("e1")) + assert Buffer.is_ready?(pid) == false + GenServer.stop(pid) + end + end + + describe "overflow behavior" do + test "drops oldest item when buffer is full" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_overflow, capacity: 2) + Buffer.add(pid, make_item("e1")) + Buffer.add(pid, make_item("e2")) + Buffer.add(pid, make_item("e3")) + + # Size should still be 2 + assert Buffer.size(pid) == 2 + + # Drain should show e2 and e3 (e1 was dropped) + items = Buffer.drain(pid) + item_ids = Enum.map(items, & &1.id) + assert item_ids == ["e2", "e3"] + GenServer.stop(pid) + end + end + + describe "category/1" do + test "returns the buffer's category" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_category) + assert Buffer.category(pid) == :log + GenServer.stop(pid) + end + end + + describe "FIFO ordering" do + test "items are returned in insertion order" do + {:ok, pid} = Buffer.start_link(category: :log, name: :test_buffer_fifo, batch_size: 5) + + for i <- 1..5, do: Buffer.add(pid, make_item("e#{i}")) + + {:ok, items} = Buffer.poll_if_ready(pid) + assert Enum.map(items, & &1.id) == ["e1", "e2", "e3", "e4", "e5"] + GenServer.stop(pid) + end + + test "FIFO ordering is preserved after overflow" do + {:ok, pid} = + Buffer.start_link(category: :log, name: :test_buffer_fifo_overflow, capacity: 3) + + for i <- 1..5, do: Buffer.add(pid, make_item("e#{i}")) + + items = Buffer.drain(pid) + assert Enum.map(items, & &1.id) == ["e3", "e4", "e5"] + GenServer.stop(pid) + end + end + + describe "timeout-based flush" do + test "poll_if_ready returns items after timeout elapsed" do + {:ok, pid} = + Buffer.start_link( + category: :log, + name: :test_buffer_timeout, + batch_size: 100, + timeout: 50 + ) + + Buffer.add(pid, make_item("e1")) + + # Not ready yet (size < batch_size, timeout not elapsed) + assert :not_ready = Buffer.poll_if_ready(pid) + + # Wait for timeout + Process.sleep(60) + + # Now ready due to timeout + {:ok, items} = Buffer.poll_if_ready(pid) + assert [%{id: "e1"}] = items + GenServer.stop(pid) + end + end +end diff --git a/test/sentry/telemetry/scheduler_test.exs b/test/sentry/telemetry/scheduler_test.exs new file mode 100644 index 00000000..b5676b07 --- /dev/null +++ b/test/sentry/telemetry/scheduler_test.exs @@ -0,0 +1,452 @@ +defmodule Sentry.Telemetry.SchedulerTest do + use Sentry.Case, async: false + + import ExUnit.CaptureLog + import Sentry.TestHelpers + + alias Sentry.Telemetry.{Buffer, Scheduler} + alias Sentry.LogEvent + + defp make_log_event(body \\ "test log") do + %LogEvent{ + timestamp: System.system_time(:nanosecond) / 1_000_000_000, + level: :info, + body: body + } + end + + describe "build_priority_cycle/0" do + test "builds cycle with correct weight for log category" do + cycle = Scheduler.build_priority_cycle() + + # Default weight: low=2 + assert length(cycle) == 2 + assert Enum.frequencies(cycle) == %{log: 2} + end + + test "builds cycle with custom weights" do + custom_weights = %{low: 5} + cycle = Scheduler.build_priority_cycle(custom_weights) + + assert length(cycle) == 5 + assert Enum.frequencies(cycle) == %{log: 5} + end + end + + describe "start_link/1" do + test "starts scheduler with buffers" do + buffers = start_test_buffers() + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + name: :test_scheduler_start + ) + + assert Process.alive?(pid) + GenServer.stop(pid) + stop_buffers(buffers) + end + + test "accepts custom weights" do + buffers = start_test_buffers() + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + weights: %{low: 5}, + name: :test_scheduler_weights + ) + + state = :sys.get_state(pid) + assert length(state.priority_cycle) == 5 + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "signal/1" do + test "wakes scheduler to process log items" do + buffers = start_test_buffers(batch_size: 1) + test_pid = self() + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + name: :test_scheduler_signal + ) + + Buffer.add(buffers.log, make_log_event()) + Scheduler.signal(pid) + + assert_receive {:envelope, envelope}, 500 + assert [%Sentry.LogBatch{log_events: events}] = envelope.items + assert length(events) == 1 + + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "envelope building" do + test "batches log events into single envelope" do + buffers = start_test_buffers(batch_size: 2) + test_pid = self() + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + name: :test_scheduler_log_batch + ) + + # Add two log events (batch size = 2) + Buffer.add(buffers.log, make_log_event()) + Buffer.add(buffers.log, make_log_event()) + Scheduler.signal(pid) + + assert_receive {:envelope, envelope}, 500 + # Logs are batched into LogBatch + assert [%Sentry.LogBatch{log_events: events}] = envelope.items + assert length(events) == 2 + + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "flush/1" do + test "drains log buffer and sends envelopes" do + buffers = start_test_buffers(batch_size: 100) + test_pid = self() + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + name: :test_scheduler_flush + ) + + # Add items without signaling + Buffer.add(buffers.log, make_log_event()) + Buffer.add(buffers.log, make_log_event()) + Buffer.add(buffers.log, make_log_event()) + + # Flush should drain everything + :ok = Scheduler.flush(pid) + + # Should receive an envelope with all logs batched + envelopes = receive_envelopes_until_empty() + assert length(envelopes) == 1 + + [envelope] = envelopes + assert [%Sentry.LogBatch{log_events: events}] = envelope.items + assert length(events) == 3 + + # Buffer should be empty + assert Buffer.size(buffers.log) == 0 + + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "before_send_log callback error protection" do + test "callback that raises still allows events to be processed" do + buffers = start_test_buffers(batch_size: 1) + test_pid = self() + + put_test_config( + before_send_log: fn _log_event -> + raise "boom" + end + ) + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + name: :"test_scheduler_raise_#{System.unique_integer([:positive])}" + ) + + log = + capture_log(fn -> + Buffer.add(buffers.log, make_log_event("test")) + Scheduler.signal(pid) + + assert_receive {:envelope, envelope}, 500 + # Event passes through unmodified when callback raises + assert [%Sentry.LogBatch{log_events: [%LogEvent{body: "test"}]}] = envelope.items + end) + + assert log =~ "before_send_log callback failed" + + GenServer.stop(pid) + stop_buffers(buffers) + end + + test "callback that raises does not crash the Scheduler" do + buffers = start_test_buffers(batch_size: 1) + test_pid = self() + + put_test_config( + before_send_log: fn _log_event -> + raise "boom" + end + ) + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + name: :"test_scheduler_survive_#{System.unique_integer([:positive])}" + ) + + capture_log(fn -> + Buffer.add(buffers.log, make_log_event("first")) + Scheduler.signal(pid) + assert_receive {:envelope, _}, 500 + end) + + # Scheduler is still alive and functional + assert Process.alive?(pid) + + # Can still process new events + put_test_config(before_send_log: fn log_event -> log_event end) + + Buffer.add(buffers.log, make_log_event("second")) + Scheduler.signal(pid) + assert_receive {:envelope, envelope}, 500 + assert [%Sentry.LogBatch{log_events: [%LogEvent{body: "second"}]}] = envelope.items + + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "transport queue capacity" do + test "stops processing when transport queue is full" do + buffers = start_test_buffers(batch_size: 1) + uid = System.unique_integer([:positive]) + + put_test_config(dsn: "http://public:secret@localhost:9999/1") + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + capacity: 0, + name: :"test_scheduler_full_#{uid}" + ) + + # Add item and signal — should not process since capacity is 0 + Buffer.add(buffers.log, make_log_event("overflow")) + Scheduler.signal(pid) + + # Give scheduler time to process + Process.sleep(50) + + # Item should still be in buffer since transport queue has no space + assert Buffer.size(buffers.log) == 1 + + GenServer.stop(pid) + stop_buffers(buffers) + end + + test "items stay in buffer when transport queue is full" do + buffers = start_test_buffers(batch_size: 1) + uid = System.unique_integer([:positive]) + + put_test_config(dsn: "http://public:secret@localhost:9999/1") + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + capacity: 1, + name: :"test_scheduler_backpressure_#{uid}" + ) + + # Simulate full queue by setting size >= capacity + :sys.replace_state(pid, fn state -> + %{state | size: 1} + end) + + Buffer.add(buffers.log, make_log_event("queued")) + Scheduler.signal(pid) + Process.sleep(50) + + # Item should remain in buffer since transport queue has no space + assert Buffer.size(buffers.log) == 1 + + GenServer.stop(pid) + stop_buffers(buffers) + end + + test "rejects envelope when its items would exceed capacity" do + buffers = start_test_buffers(batch_size: 5) + uid = System.unique_integer([:positive]) + + put_test_config(dsn: "http://public:secret@localhost:9999/1") + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + capacity: 3, + name: :"test_scheduler_item_count_#{uid}" + ) + + for i <- 1..5, do: Buffer.add(buffers.log, make_log_event("log_#{i}")) + + log = + capture_log(fn -> + Scheduler.signal(pid) + Process.sleep(50) + end) + + assert log =~ "transport queue full, dropping 5 log item(s)" + + state = :sys.get_state(pid) + assert state.size == 0 + + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "transport send process error handling" do + test "logs warning when send process exits abnormally" do + buffers = start_test_buffers(batch_size: 1) + uid = System.unique_integer([:positive]) + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + name: :"test_scheduler_down_#{uid}" + ) + + # Inject a fake active_ref into state, then send a matching :DOWN + # with a crash reason to trigger the warning log. + fake_ref = make_ref() + + :sys.replace_state(pid, fn state -> + %{state | active_ref: fake_ref, active_item_count: 1, size: 1} + end) + + log = + capture_log(fn -> + send(pid, {:DOWN, fake_ref, :process, self(), {:error, :something_broke}}) + # Synchronize: :sys.get_state goes through the mailbox, ensuring :DOWN is processed + :sys.get_state(pid) + end) + + assert log =~ "Sentry transport send process exited abnormally" + assert log =~ "something_broke" + + GenServer.stop(pid) + stop_buffers(buffers) + end + + test "does not log on normal send process exit" do + buffers = start_test_buffers(batch_size: 1) + uid = System.unique_integer([:positive]) + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + name: :"test_scheduler_normal_down_#{uid}" + ) + + fake_ref = make_ref() + + :sys.replace_state(pid, fn state -> + %{state | active_ref: fake_ref, active_item_count: 1, size: 1} + end) + + log = + capture_log(fn -> + send(pid, {:DOWN, fake_ref, :process, self(), :normal}) + :sys.get_state(pid) + end) + + refute log =~ "Sentry transport send process exited abnormally" + + GenServer.stop(pid) + stop_buffers(buffers) + end + end + + describe "transport send failure logging" do + test "logs warning when direct transport send fails during flush" do + bypass = Bypass.open() + + put_test_config(dsn: "http://public:secret@localhost:#{bypass.port}/1") + prev_retries = Application.get_env(:sentry, :request_retries) + Application.put_env(:sentry, :request_retries, []) + + on_exit(fn -> + if prev_retries do + Application.put_env(:sentry, :request_retries, prev_retries) + else + Application.delete_env(:sentry, :request_retries) + end + end) + + Bypass.expect(bypass, fn conn -> + Plug.Conn.resp(conn, 500, ~s<{"error": "internal"}>) + end) + + buffers = start_test_buffers(batch_size: 1) + uid = System.unique_integer([:positive]) + + {:ok, pid} = + Scheduler.start_link( + buffers: buffers, + name: :"test_scheduler_send_fail_#{uid}" + ) + + Buffer.add(buffers.log, make_log_event("fail-send")) + + # Flush uses the direct send path which logs failures + log = + capture_log(fn -> + Scheduler.flush(pid) + end) + + assert log =~ "failed to send log envelope" + + GenServer.stop(pid) + stop_buffers(buffers) + Bypass.down(bypass) + end + end + + # Helper functions + + defp start_test_buffers(opts \\ []) do + uid = System.unique_integer([:positive]) + batch_size = Keyword.get(opts, :batch_size, 100) + + log_buf = + start_supervised!( + {Buffer, category: :log, batch_size: batch_size, name: :"test_log_buf_#{uid}"}, + id: :"test_log_buf_#{uid}" + ) + + %{log: log_buf} + end + + defp stop_buffers(buffers) do + for {_category, pid} <- buffers, Process.alive?(pid) do + GenServer.stop(pid) + end + end + + defp receive_envelopes_until_empty(acc \\ []) do + receive do + {:envelope, envelope} -> + receive_envelopes_until_empty([envelope | acc]) + after + 100 -> Enum.reverse(acc) + end + end +end diff --git a/test/sentry/telemetry_processor_integration_test.exs b/test/sentry/telemetry_processor_integration_test.exs new file mode 100644 index 00000000..d77c2dcc --- /dev/null +++ b/test/sentry/telemetry_processor_integration_test.exs @@ -0,0 +1,105 @@ +defmodule Sentry.TelemetryProcessorIntegrationTest do + use Sentry.Case, async: false + + import Sentry.TestHelpers + + alias Sentry.TelemetryProcessor + alias Sentry.Telemetry.Buffer + alias Sentry.{Envelope, LogBatch, LogEvent} + + setup context do + test_pid = self() + ref = make_ref() + + stop_supervised!(context.telemetry_processor) + + uid = System.unique_integer([:positive]) + processor_name = :"test_integration_#{uid}" + + start_supervised!( + {TelemetryProcessor, + name: processor_name, + on_envelope: fn envelope -> send(test_pid, {ref, envelope}) end, + buffer_configs: %{log: %{batch_size: 1}}}, + id: processor_name + ) + + Process.put(:sentry_telemetry_processor, processor_name) + put_test_config(dsn: "http://public:secret@localhost:9999/1") + + %{processor: processor_name, ref: ref} + end + + describe "log batching" do + test "sends log events as batched envelopes", ctx do + TelemetryProcessor.add(ctx.processor, make_log_event("log-1")) + TelemetryProcessor.add(ctx.processor, make_log_event("log-2")) + + envelopes = collect_envelopes(ctx.ref, 2) + assert length(envelopes) == 2 + + for envelope <- envelopes do + assert [%LogBatch{log_events: [%LogEvent{}]}] = envelope.items + end + end + + test "flush drains log buffer completely", ctx do + scheduler = TelemetryProcessor.get_scheduler(ctx.processor) + :sys.suspend(scheduler) + + TelemetryProcessor.add(ctx.processor, make_log_event("flush-1")) + TelemetryProcessor.add(ctx.processor, make_log_event("flush-2")) + TelemetryProcessor.add(ctx.processor, make_log_event("flush-3")) + + buffer = TelemetryProcessor.get_buffer(ctx.processor, :log) + assert Buffer.size(buffer) == 3 + + :sys.resume(scheduler) + :ok = TelemetryProcessor.flush(ctx.processor) + + assert Buffer.size(buffer) == 0 + end + + test "applies before_send_log callback", ctx do + put_test_config( + before_send_log: fn log_event -> + if log_event.body == "drop me", do: nil, else: log_event + end + ) + + TelemetryProcessor.add(ctx.processor, make_log_event("keep me")) + TelemetryProcessor.add(ctx.processor, make_log_event("drop me")) + + envelopes = collect_envelopes(ctx.ref, 1) + assert length(envelopes) == 1 + + [envelope] = envelopes + assert [%LogBatch{log_events: [%LogEvent{body: "keep me"}]}] = envelope.items + + # The dropped event should not produce an envelope + refute_receive {_, %Envelope{}}, 200 + end + end + + defp make_log_event(body) do + %LogEvent{ + timestamp: System.system_time(:nanosecond) / 1_000_000_000, + level: :info, + body: body + } + end + + defp collect_envelopes(ref, expected_count) do + collect_envelopes(ref, expected_count, []) + end + + defp collect_envelopes(_ref, 0, acc), do: Enum.reverse(acc) + + defp collect_envelopes(ref, remaining, acc) do + receive do + {^ref, envelope} -> collect_envelopes(ref, remaining - 1, [envelope | acc]) + after + 1000 -> Enum.reverse(acc) + end + end +end diff --git a/test/sentry/telemetry_processor_test.exs b/test/sentry/telemetry_processor_test.exs new file mode 100644 index 00000000..c5ef4689 --- /dev/null +++ b/test/sentry/telemetry_processor_test.exs @@ -0,0 +1,179 @@ +defmodule Sentry.TelemetryProcessorTest do + use Sentry.Case, async: false + + alias Sentry.TelemetryProcessor + alias Sentry.Telemetry.Buffer + alias Sentry.LogEvent + + defp make_log_event do + %LogEvent{ + timestamp: System.system_time(:nanosecond) / 1_000_000_000, + level: :info, + body: "test log" + } + end + + describe "start_link/1" do + test "starts the processor supervisor with all children" do + {:ok, pid} = TelemetryProcessor.start_link(name: :test_processor_start) + assert Process.alive?(pid) + + children = Supervisor.which_children(pid) + # 1 buffer (log) + Scheduler = 2 + assert length(children) == 2 + + Supervisor.stop(pid) + end + + test "registers with given name" do + {:ok, pid} = TelemetryProcessor.start_link(name: :test_processor_named) + assert Process.whereis(:test_processor_named) == pid + Supervisor.stop(pid) + end + + test "accepts custom buffer capacities" do + {:ok, pid} = + TelemetryProcessor.start_link( + name: :test_processor_capacities, + buffer_capacities: %{log: 500} + ) + + log_buffer = TelemetryProcessor.get_buffer(pid, :log) + state = :sys.get_state(log_buffer) + assert state.capacity == 500 + + Supervisor.stop(pid) + end + end + + describe "add/2" do + test "routes LogEvent to log buffer" do + {:ok, pid} = TelemetryProcessor.start_link(name: :test_processor_add_log) + + log = make_log_event() + :ok = TelemetryProcessor.add(pid, log) + + Process.sleep(10) + + log_buffer = TelemetryProcessor.get_buffer(pid, :log) + assert Buffer.size(log_buffer) == 1 + + Supervisor.stop(pid) + end + + test "signals scheduler when item added" do + test_pid = self() + + {:ok, pid} = + TelemetryProcessor.start_link( + name: :test_processor_signal, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + buffer_configs: %{log: %{batch_size: 1}} + ) + + :ok = TelemetryProcessor.add(pid, make_log_event()) + + assert_receive {:envelope, envelope}, 500 + assert [%Sentry.LogBatch{log_events: [_]}] = envelope.items + + Supervisor.stop(pid) + end + end + + describe "flush/2" do + test "drains log buffer and sends envelopes" do + test_pid = self() + + {:ok, pid} = + TelemetryProcessor.start_link( + name: :test_processor_flush, + on_envelope: fn envelope -> send(test_pid, {:envelope, envelope}) end, + buffer_configs: %{log: %{batch_size: 100}} + ) + + TelemetryProcessor.add(pid, make_log_event()) + TelemetryProcessor.add(pid, make_log_event()) + TelemetryProcessor.add(pid, make_log_event()) + + # Drain any messages from signal-triggered sends + receive_envelopes() + + :ok = TelemetryProcessor.flush(pid) + + buffer = TelemetryProcessor.get_buffer(pid, :log) + assert Buffer.size(buffer) == 0 + + Supervisor.stop(pid) + end + + test "flush with timeout returns :ok" do + test_pid = self() + ref = make_ref() + + {:ok, pid} = + TelemetryProcessor.start_link( + name: :test_processor_flush_timeout, + on_envelope: fn envelope -> send(test_pid, {ref, envelope}) end, + buffer_configs: %{log: %{batch_size: 1}} + ) + + TelemetryProcessor.add(pid, make_log_event()) + + assert :ok = TelemetryProcessor.flush(pid, 5000) + + assert_receive {^ref, _envelope}, 1000 + + Supervisor.stop(pid) + end + end + + describe "get_buffer/2" do + test "returns buffer pid for log category" do + {:ok, pid} = TelemetryProcessor.start_link(name: :test_processor_get_buffer) + + buffer = TelemetryProcessor.get_buffer(pid, :log) + assert is_pid(buffer) + assert Process.alive?(buffer) + assert Buffer.category(buffer) == :log + + Supervisor.stop(pid) + end + end + + describe "get_scheduler/1" do + test "returns scheduler pid" do + {:ok, pid} = TelemetryProcessor.start_link(name: :test_processor_get_scheduler) + + scheduler = TelemetryProcessor.get_scheduler(pid) + assert is_pid(scheduler) + assert Process.alive?(scheduler) + + Supervisor.stop(pid) + end + end + + describe "integration" do + test "respects transport_capacity option" do + {:ok, pid} = + TelemetryProcessor.start_link( + name: :test_processor_capacity, + transport_capacity: 42 + ) + + scheduler = TelemetryProcessor.get_scheduler(pid) + state = :sys.get_state(scheduler) + assert state.capacity == 42 + + Supervisor.stop(pid) + end + end + + defp receive_envelopes(acc \\ []) do + receive do + {:envelope, envelope} -> + receive_envelopes([envelope | acc]) + after + 50 -> Enum.reverse(acc) + end + end +end diff --git a/test/sentry/test_test.exs b/test/sentry/test_test.exs index 41e241e8..623da1e6 100644 --- a/test/sentry/test_test.exs +++ b/test/sentry/test_test.exs @@ -5,6 +5,7 @@ defmodule Sentry.TestTest do alias Sentry.Event alias Sentry.Test + alias Sentry.{LogEvent, TelemetryProcessor} doctest Test @@ -188,4 +189,33 @@ defmodule Sentry.TestTest do assert {:ok, ""} = Sentry.capture_message("Oops") end + + test "pop_sentry_logs works with per-test TelemetryProcessor" do + uid = System.unique_integer([:positive]) + processor_name = :"test_processor_logs_#{uid}" + + {:ok, _pid} = + start_supervised( + {TelemetryProcessor, name: processor_name, buffer_configs: %{log: %{batch_size: 1}}}, + id: processor_name + ) + + Process.put(:sentry_telemetry_processor, processor_name) + + assert :ok = Test.start_collecting_sentry_reports() + + log_event = %LogEvent{ + level: :info, + body: "per-test log", + timestamp: System.system_time(:microsecond) / 1_000_000 + } + + TelemetryProcessor.add(processor_name, log_event) + + # Give the scheduler time to process + Process.sleep(100) + + logs = Test.pop_sentry_logs() + assert [%LogEvent{body: "per-test log"}] = logs + end end diff --git a/test/sentry_test.exs b/test/sentry_test.exs index fa9e2038..06bbea11 100644 --- a/test/sentry_test.exs +++ b/test/sentry_test.exs @@ -346,4 +346,16 @@ defmodule SentryTest do assert_receive {:after_send, "test-transaction", "340"} end end + + describe "flush/1" do + test "returns :ok silently when TelemetryProcessor is not running" do + # Default TelemetryProcessor is not started in test — this is the :noproc path + log = + capture_log(fn -> + assert :ok = Sentry.flush() + end) + + refute log =~ "failed unexpectedly" + end + end end diff --git a/test/support/case.ex b/test/support/case.ex index acea237d..6fd2000b 100644 --- a/test/support/case.ex +++ b/test/support/case.ex @@ -9,21 +9,43 @@ defmodule Sentry.Case do # Start a fresh RateLimiter for each test with unique names for isolation. setup_rate_limiter() + # Start a fresh TelemetryProcessor for each test with unique name for isolation. + # Returns the processor name so tests can pass it to handlers for isolation. + telemetry_processor = setup_telemetry_processor() + # Create fresh counters with unique keys for each test to ensure complete isolation. # SenderPool checks process dictionary for custom keys, falling back to global defaults. setup_sender_pool_counters() - case context[:span_storage] do - nil -> :ok - true -> setup_span_storage([]) - opts when is_list(opts) -> setup_span_storage(opts) - end + span_storage_result = + case context[:span_storage] do + nil -> %{} + true -> setup_span_storage([]) + opts when is_list(opts) -> setup_span_storage(opts) + end + + Map.merge(%{telemetry_processor: telemetry_processor}, span_storage_result) end defp setup_rate_limiter do - table_name = :"test_rate_limiter_#{System.unique_integer([:positive])}" - Process.put(:rate_limiter_table_name, table_name) - start_supervised!({Sentry.Transport.RateLimiter, name: table_name}, id: table_name) + uid = System.unique_integer([:positive]) + rate_limiter_table = :"test_rate_limiter_#{uid}" + + Process.put(:rate_limiter_table_name, rate_limiter_table) + start_supervised!({Sentry.Transport.RateLimiter, name: rate_limiter_table}, id: rate_limiter_table) + end + + defp setup_telemetry_processor do + uid = System.unique_integer([:positive]) + processor_name = :"test_telemetry_processor_#{uid}" + + start_supervised!( + {Sentry.TelemetryProcessor, name: processor_name}, + id: processor_name + ) + + Process.put(:sentry_telemetry_processor, processor_name) + processor_name end defp setup_sender_pool_counters do @@ -48,6 +70,6 @@ defmodule Sentry.Case do opts = [name: server_name, table_name: table_name] ++ opts start_supervised!({Sentry.OpenTelemetry.SpanStorage, opts}) - {:ok, server_name: server_name, table_name: table_name} + %{server_name: server_name, table_name: table_name} end end diff --git a/test_integrations/phoenix_app/config/dev.exs b/test_integrations/phoenix_app/config/dev.exs index fcde6874..6b76919e 100644 --- a/test_integrations/phoenix_app/config/dev.exs +++ b/test_integrations/phoenix_app/config/dev.exs @@ -90,8 +90,7 @@ config :sentry, enable_source_code_context: true, send_result: :sync, traces_sample_rate: 1.0, - enable_logs: true, - max_log_events: 5 + enable_logs: true config :phoenix_app, Oban, repo: PhoenixApp.Repo, diff --git a/test_integrations/phoenix_app/lib/phoenix_app_web/controllers/page_controller.ex b/test_integrations/phoenix_app/lib/phoenix_app_web/controllers/page_controller.ex index c3a3f838..c6be8b47 100644 --- a/test_integrations/phoenix_app/lib/phoenix_app_web/controllers/page_controller.ex +++ b/test_integrations/phoenix_app/lib/phoenix_app_web/controllers/page_controller.ex @@ -143,8 +143,8 @@ defmodule PhoenixAppWeb.PageController do retry_count: 0 ) - # Force flush the log buffer immediately - Sentry.LogEventBuffer.flush() + # Force flush the telemetry processor immediately + Sentry.flush() json(conn, %{ message: "Logs demo completed - check your Sentry logs!",