diff --git a/.gitignore b/.gitignore index d1ed1a09..466b4051 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ /pkg/ /spec/reports/ /tmp/ +/vendor/bundle *.bundle *.so *.o diff --git a/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb new file mode 100644 index 00000000..eded0ad8 --- /dev/null +++ b/lib/ldclient-rb/impl/integrations/file_data_source_v2.rb @@ -0,0 +1,466 @@ +# frozen_string_literal: true + +require 'ldclient-rb/impl/util' +require 'ldclient-rb/interfaces/data_system' +require 'ldclient-rb/util' + +require 'concurrent/atomics' +require 'json' +require 'yaml' +require 'pathname' +require 'thread' + +module LaunchDarkly + module Impl + module Integrations + # + # Internal implementation of both Initializer and Synchronizer protocols for file-based data. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + # This component reads feature flag and segment data from local files and provides them + # via the FDv2 protocol interfaces. Each instance implements both Initializer and Synchronizer + # protocols: + # - As an Initializer: reads files once and returns initial data + # - As a Synchronizer: watches for file changes and yields updates + # + # The files use the same format as the v1 file data source, supporting flags, flagValues, + # and segments in JSON or YAML format. + # + class FileDataSourceV2 + include LaunchDarkly::Interfaces::DataSystem::Initializer + include LaunchDarkly::Interfaces::DataSystem::Synchronizer + + # To avoid pulling in 'listen' and its transitive dependencies for people who aren't using the + # file data source or who don't need auto-updating, we only enable auto-update if the 'listen' + # gem has been provided by the host app. + @@have_listen = false + begin + require 'listen' + @@have_listen = true + rescue LoadError + # Ignored + end + + # + # Initialize the file data source. + # + # @param logger [Logger] the logger + # @param paths [Array, String] file paths to load (or a single path string) + # @param poll_interval [Float] seconds between polling checks when watching files (default: 1) + # + def initialize(logger, paths:, poll_interval: 1) + @logger = logger + @paths = paths.is_a?(Array) ? paths : [paths] + @poll_interval = poll_interval + + @closed = false + @update_queue = Queue.new + @lock = Mutex.new + @listener = nil + end + + # + # Return the name of this data source. + # + # @return [String] + # + def name + 'FileDataV2' + end + + # + # Implementation of the Initializer.fetch method. + # + # Reads all configured files once and returns their contents as a Basis. + # + # @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for file data) + # @return [LaunchDarkly::Result] A Result containing either a Basis or an error message + # + def fetch(selector_store) + @lock.synchronize do + if @closed + return LaunchDarkly::Result.fail('FileDataV2 source has been closed') + end + + result = load_all_to_changeset + return result unless result.success? + + change_set = result.value + basis = LaunchDarkly::Interfaces::DataSystem::Basis.new( + change_set: change_set, + persist: false, + environment_id: nil + ) + + LaunchDarkly::Result.success(basis) + end + rescue => e + @logger.error { "[LDClient] Error fetching file data: #{e.message}" } + LaunchDarkly::Result.fail("Error fetching file data: #{e.message}", e) + end + + # + # Implementation of the Synchronizer.sync method. + # + # Yields initial data from files, then continues to watch for file changes + # and yields updates when files are modified. + # + # @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for file data) + # @yield [LaunchDarkly::Interfaces::DataSystem::Update] Yields Update objects as synchronization progresses + # @return [void] + # + def sync(selector_store) + # First yield initial data + initial_result = fetch(selector_store) + unless initial_result.success? + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + initial_result.error, + Time.now + ) + ) + return + end + + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + change_set: initial_result.value.change_set + ) + + # Start watching for file changes + @lock.synchronize do + @listener = start_listener unless @closed + end + + until @closed + begin + update = @update_queue.pop + + # stop() pushes nil to wake us up when shutting down + break if update.nil? + + yield update + rescue => e + yield LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::OFF, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, + 0, + "Error in file data synchronizer: #{e.message}", + Time.now + ) + ) + break + end + end + end + + # + # Stop the data source and clean up resources. + # + # @return [void] + # + def stop + @lock.synchronize do + return if @closed + @closed = true + + listener = @listener + @listener = nil + + listener&.stop + end + + # Signal shutdown to sync generator + @update_queue.push(nil) + end + + # + # Load all files and build a changeset. + # + # @return [LaunchDarkly::Result] A Result containing either a ChangeSet or an error message + # + private def load_all_to_changeset + flags_dict = {} + segments_dict = {} + + @paths.each do |path| + begin + load_file(path, flags_dict, segments_dict) + rescue => e + Impl::Util.log_exception(@logger, "Unable to load flag data from \"#{path}\"", e) + return LaunchDarkly::Result.fail("Unable to load flag data from \"#{path}\": #{e.message}", e) + end + end + + builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new + builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + + flags_dict.each do |key, flag_data| + builder.add_put( + LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG, + key, + flag_data[:version] || 1, + flag_data + ) + end + + segments_dict.each do |key, segment_data| + builder.add_put( + LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT, + key, + segment_data[:version] || 1, + segment_data + ) + end + + # Use no_selector since we don't have versioning information from files + change_set = builder.finish(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector) + + LaunchDarkly::Result.success(change_set) + end + + # + # Load a single file and add its contents to the provided dictionaries. + # + # @param path [String] path to the file + # @param flags_dict [Hash] dictionary to add flags to + # @param segments_dict [Hash] dictionary to add segments to + # + private def load_file(path, flags_dict, segments_dict) + parsed = parse_content(File.read(path)) + + (parsed[:flags] || {}).each do |key, flag| + flag[:version] ||= 1 + add_item(flags_dict, 'flags', flag) + end + + (parsed[:flagValues] || {}).each do |key, value| + add_item(flags_dict, 'flags', make_flag_with_value(key.to_s, value)) + end + + (parsed[:segments] || {}).each do |key, segment| + segment[:version] ||= 1 + add_item(segments_dict, 'segments', segment) + end + end + + # + # Parse file content as JSON or YAML. + # + # @param content [String] file content string + # @return [Hash] parsed dictionary with symbolized keys + # + private def parse_content(content) + # Ruby's YAML parser correctly handles JSON as well + symbolize_all_keys(YAML.safe_load(content)) || {} + end + + # + # Recursively symbolize all keys in a hash or array. + # + # @param value [Object] the value to symbolize + # @return [Object] the value with all keys symbolized + private def symbolize_all_keys(value) + if value.is_a?(Hash) + value.map { |k, v| [k.to_sym, symbolize_all_keys(v)] }.to_h + elsif value.is_a?(Array) + value.map { |v| symbolize_all_keys(v) } + else + value + end + end + + # + # Add an item to a dictionary, checking for duplicates. + # + # @param items_dict [Hash] dictionary to add to + # @param kind_name [String] name of the kind (for error messages) + # @param item [Hash] item to add + # + private def add_item(items_dict, kind_name, item) + key = item[:key].to_sym + if items_dict[key].nil? + items_dict[key] = item + else + raise ArgumentError, "In #{kind_name}, key \"#{item[:key]}\" was used more than once" + end + end + + # + # Create a simple flag configuration from a key-value pair. + # + # @param key [String] flag key + # @param value [Object] flag value + # @return [Hash] flag dictionary + # + private def make_flag_with_value(key, value) + { + key: key, + on: true, + version: 1, + fallthrough: { variation: 0 }, + variations: [value], + } + end + + # + # Callback invoked when files change. + # + # Reloads all files and queues an update. + # + private def on_file_change + @lock.synchronize do + return if @closed + + begin + result = load_all_to_changeset + + if result.success? + update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::VALID, + change_set: result.value + ) + @update_queue.push(update) + else + error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::INVALID_DATA, + 0, + result.error, + Time.now + ) + ) + @update_queue.push(error_update) + end + rescue => e + @logger.error { "[LDClient] Error processing file change: #{e.message}" } + error_update = LaunchDarkly::Interfaces::DataSystem::Update.new( + state: LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED, + error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new( + LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN, + 0, + "Error processing file change: #{e.message}", + Time.now + ) + ) + @update_queue.push(error_update) + end + end + end + + # + # Start watching files for changes. + # + # @return [Object] auto-updater instance + # + private def start_listener + resolved_paths = @paths.map do |p| + begin + Pathname.new(File.absolute_path(p)).realpath.to_s + rescue + @logger.warn { "[LDClient] Cannot watch for changes to data file \"#{p}\" because it is an invalid path" } + nil + end + end.compact + + if @@have_listen + start_listener_with_listen_gem(resolved_paths) + else + FileDataSourcePollerV2.new(resolved_paths, @poll_interval, method(:on_file_change), @logger) + end + end + + # + # Start listening for file changes using the listen gem. + # + # @param resolved_paths [Array] resolved file paths to watch + # @return [Listen::Listener] the listener instance + # + private def start_listener_with_listen_gem(resolved_paths) + path_set = resolved_paths.to_set + dir_paths = resolved_paths.map { |p| File.dirname(p) }.uniq + opts = { latency: @poll_interval } + l = Listen.to(*dir_paths, **opts) do |modified, added, removed| + paths = modified + added + removed + if paths.any? { |p| path_set.include?(p) } + on_file_change + end + end + l.start + l + end + end + + # + # Used internally by FileDataSourceV2 to track data file changes if the 'listen' gem is not available. + # + class FileDataSourcePollerV2 + # + # Initialize the file data poller. + # + # @param resolved_paths [Array] resolved file paths to watch + # @param interval [Float] polling interval in seconds + # @param on_change_callback [Proc] callback to invoke when files change + # @param logger [Logger] the logger + # + def initialize(resolved_paths, interval, on_change_callback, logger) + @stopped = Concurrent::AtomicBoolean.new(false) + @on_change = on_change_callback + @logger = logger + + get_file_times = proc do + ret = {} + resolved_paths.each do |path| + begin + ret[path] = File.mtime(path) + rescue Errno::ENOENT + ret[path] = nil + end + end + ret + end + + last_times = get_file_times.call + @thread = Thread.new do + loop do + sleep interval + break if @stopped.value + + begin + new_times = get_file_times.call + changed = false + last_times.each do |path, old_time| + new_time = new_times[path] + if !new_time.nil? && new_time != old_time + changed = true + break + end + end + last_times = new_times + @on_change.call if changed + rescue => e + Impl::Util.log_exception(@logger, "Unexpected exception in FileDataSourcePollerV2", e) + end + end + end + @thread.name = "LD/FileDataSourceV2" + end + + def stop + @stopped.make_true + @thread.run # wakes it up if it's sleeping + end + end + end + end +end diff --git a/lib/ldclient-rb/integrations/file_data.rb b/lib/ldclient-rb/integrations/file_data.rb index fb85ad98..fe29f0c8 100644 --- a/lib/ldclient-rb/integrations/file_data.rb +++ b/lib/ldclient-rb/integrations/file_data.rb @@ -1,4 +1,5 @@ require 'ldclient-rb/impl/integrations/file_data_source' +require 'ldclient-rb/impl/integrations/file_data_source_v2' module LaunchDarkly module Integrations @@ -103,6 +104,75 @@ def self.data_source(options={}) lambda { |sdk_key, config| Impl::Integrations::FileDataSourceImpl.new(config.feature_store, config.data_source_update_sink, config.logger, options) } end + + # + # Returns a builder for the FDv2-compatible file data source. + # + # This type is not stable, and not subject to any backwards + # compatibility guarantees or semantic versioning. It is not suitable for production usage. + # + # Do not use it. + # You have been warned. + # + # This method returns a builder proc that can be used with the FDv2 data system + # configuration as both an Initializer and a Synchronizer. When used as an Initializer + # (via `fetch`), it reads files once. When used as a Synchronizer (via `sync`), it + # watches for file changes and automatically updates when files are modified. + # + # @param options [Hash] the configuration options + # @option options [Array, String] :paths The paths of the source files for loading flag data. These + # may be absolute paths or relative to the current working directory. (Required) + # @option options [Float] :poll_interval The minimum interval, in seconds, between checks for + # file modifications - used only if the native file-watching mechanism from 'listen' is not + # being used. The default value is 1 second. + # @return [FileDataSourceV2Builder] a builder that can be used as an FDv2 initializer or synchronizer + # + # @example Using as an initializer + # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) + # data_system_config = LaunchDarkly::DataSystemConfig.new( + # initializers: [file_source] + # ) + # config = LaunchDarkly::Config.new(data_system: data_system_config) + # + # @example Using as a synchronizer + # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) + # data_system_config = LaunchDarkly::DataSystemConfig.new( + # synchronizer: file_source + # ) + # config = LaunchDarkly::Config.new(data_system: data_system_config) + # + # @example Using as both initializer and synchronizer + # file_source = LaunchDarkly::Integrations::FileData.data_source_v2(paths: ['flags.json']) + # data_system_config = LaunchDarkly::DataSystemConfig.new( + # initializers: [file_source], + # synchronizer: file_source + # ) + # config = LaunchDarkly::Config.new(data_system: data_system_config) + # + def self.data_source_v2(options = {}) + paths = options[:paths] || [] + poll_interval = options[:poll_interval] || 1 + + FileDataSourceV2Builder.new(paths, poll_interval) + end + end + + # + # Builder for FileDataSourceV2. + # + class FileDataSourceV2Builder + def initialize(paths, poll_interval) + @paths = paths + @poll_interval = poll_interval + end + + def build(_sdk_key, config) + Impl::Integrations::FileDataSourceV2.new( + config.logger, + paths: @paths, + poll_interval: @poll_interval + ) + end end end end diff --git a/spec/integrations/file_data_source_v2_spec.rb b/spec/integrations/file_data_source_v2_spec.rb new file mode 100644 index 00000000..670e9adf --- /dev/null +++ b/spec/integrations/file_data_source_v2_spec.rb @@ -0,0 +1,520 @@ +# frozen_string_literal: true + +require "spec_helper" +require "tempfile" +require "ldclient-rb/impl/integrations/file_data_source_v2" +require "ldclient-rb/integrations/file_data" +require "ldclient-rb/interfaces/data_system" + +module LaunchDarkly + module Integrations + RSpec.describe "FileDataSourceV2" do + let(:logger) { $null_log } + + let(:all_properties_json) { <<-EOF +{ + "flags": { + "flag1": { + "key": "flag1", + "on": true, + "fallthrough": { + "variation": 2 + }, + "variations": [ "fall", "off", "on" ] + } + }, + "flagValues": { + "flag2": "value2" + }, + "segments": { + "seg1": { + "key": "seg1", + "include": ["user1"] + } + } +} +EOF + } + + let(:all_properties_yaml) { <<-EOF +--- +flags: + flag1: + key: flag1 + "on": true +flagValues: + flag2: value2 +segments: + seg1: + key: seg1 + include: ["user1"] +EOF + } + + let(:flag_only_json) { <<-EOF +{ + "flags": { + "flag1": { + "key": "flag1", + "on": true, + "fallthrough": { + "variation": 2 + }, + "variations": [ "fall", "off", "on" ] + } + } +} +EOF + } + + let(:segment_only_json) { <<-EOF +{ + "segments": { + "seg1": { + "key": "seg1", + "include": ["user1"] + } + } +} +EOF + } + + let(:flag_values_only_json) { <<-EOF +{ + "flagValues": { + "flag2": "value2" + } +} +EOF + } + + class MockSelectorStore + include LaunchDarkly::Interfaces::DataSystem::SelectorStore + + def initialize(selector) + @selector = selector + end + + def selector + @selector + end + end + + before do + @tmp_dir = Dir.mktmpdir + end + + after do + FileUtils.rm_rf(@tmp_dir) + end + + def make_temp_file(content) + file = Tempfile.new('flags', @tmp_dir) + IO.write(file, content) + file + end + + def no_selector_store + MockSelectorStore.new(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector) + end + + describe "initializer (fetch)" do + it "creates valid initializer" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + + basis = result.value + expect(basis.persist).to eq(false) + expect(basis.environment_id).to be_nil + expect(basis.change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + + # Should have 2 flags and 1 segment + changes = basis.change_set.changes + expect(changes.length).to eq(3) + + flag_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + segment_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + + expect(flag_changes.length).to eq(2) + expect(segment_changes.length).to eq(1) + + # Check selector is no_selector + expect(basis.change_set.selector).to eq(LaunchDarkly::Interfaces::DataSystem::Selector.no_selector) + ensure + source.stop + end + end + + it "handles missing file" do + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: ['no-such-file.json']) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(false) + expect(result.error).to include("no-such-file.json") + ensure + source.stop + end + end + + it "handles invalid JSON" do + file = make_temp_file('{"flagValues":{') + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(false) + expect(result.error).to include("Unable to load flag data") + ensure + source.stop + end + end + + it "handles duplicate keys" do + file1 = make_temp_file(flag_only_json) + file2 = make_temp_file(flag_only_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file1.path, file2.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(false) + expect(result.error).to include("was used more than once") + ensure + source.stop + end + end + + it "loads multiple files" do + file1 = make_temp_file(flag_only_json) + file2 = make_temp_file(segment_only_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file1.path, file2.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + + changes = result.value.change_set.changes + flag_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + segment_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + + expect(flag_changes.length).to eq(1) + expect(segment_changes.length).to eq(1) + ensure + source.stop + end + end + + it "loads YAML" do + file = make_temp_file(all_properties_yaml) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + expect(result.value.change_set.changes.length).to eq(3) # 2 flags + 1 segment + ensure + source.stop + end + end + + it "handles flag values" do + file = make_temp_file(flag_values_only_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + + changes = result.value.change_set.changes + flag_changes = changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + expect(flag_changes.length).to eq(1) + + # Check the flag was created with the expected structure + flag_change = flag_changes[0] + expect(flag_change.key).to eq(:flag2) + expect(flag_change.object[:key]).to eq("flag2") + expect(flag_change.object[:on]).to eq(true) + expect(flag_change.object[:variations]).to eq(["value2"]) + ensure + source.stop + end + end + end + + describe "synchronizer (sync)" do + it "creates valid synchronizer" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new( + logger, + paths: [file.path], + poll_interval: 0.1 + ) + + updates = [] + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + break if updates.length >= 1 + end + end + + # Wait for initial update with timeout + deadline = Time.now + 5 + while updates.empty? && Time.now < deadline + sleep 0.1 + end + + expect(updates.length).to be >= 1 + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + expect(updates[0].change_set).not_to be_nil + expect(updates[0].change_set.intent_code).to eq(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL) + expect(updates[0].change_set.changes.length).to eq(3) + ensure + source.stop + sync_thread&.join(2) + end + end + + it "detects file changes" do + file = make_temp_file(flag_only_json) + + source = Impl::Integrations::FileDataSourceV2.new( + logger, + paths: [file.path], + poll_interval: 0.1 + ) + + updates = [] + update_received = Concurrent::Event.new + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + update_received.set + break if updates.length >= 2 + end + end + + # Wait for initial update + expect(update_received.wait(5)).to eq(true), "Did not receive initial update" + expect(updates.length).to eq(1) + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + + initial_flags = updates[0].change_set.changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG } + expect(initial_flags.length).to eq(1) + + # Modify the file + update_received.reset + sleep 0.2 # Ensure filesystem timestamp changes + IO.write(file, segment_only_json) + + # Wait for the change to be detected + expect(update_received.wait(5)).to eq(true), "Did not receive update after file change" + expect(updates.length).to eq(2) + expect(updates[1].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + + segment_changes = updates[1].change_set.changes.select { |c| c.kind == LaunchDarkly::Interfaces::DataSystem::ObjectKind::SEGMENT } + expect(segment_changes.length).to eq(1) + ensure + source.stop + sync_thread&.join(2) + end + end + + it "reports error on invalid file update" do + file = make_temp_file(flag_only_json) + + source = Impl::Integrations::FileDataSourceV2.new( + logger, + paths: [file.path], + poll_interval: 0.1 + ) + + updates = [] + update_received = Concurrent::Event.new + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + update_received.set + break if updates.length >= 2 + end + end + + # Wait for initial update + expect(update_received.wait(5)).to eq(true), "Did not receive initial update" + expect(updates.length).to eq(1) + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + + # Make the file invalid + update_received.reset + sleep 0.2 # Ensure filesystem timestamp changes + IO.write(file, '{"invalid json') + + # Wait for the error to be detected + expect(update_received.wait(5)).to eq(true), "Did not receive update after file became invalid" + expect(updates.length).to eq(2) + expect(updates[1].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::INTERRUPTED) + expect(updates[1].error).not_to be_nil + ensure + source.stop + sync_thread&.join(2) + end + end + + it "can be stopped" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + updates = [] + + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + end + end + + # Give it a moment to process initial data + sleep 0.3 + + # Stop it + source.stop + + # Thread should complete + sync_thread.join(2) + expect(sync_thread.alive?).to eq(false) + + # Should have received at least the initial update + expect(updates.length).to be >= 1 + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + end + end + + describe "fetch after stop" do + it "returns error" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + # First fetch should work + result = source.fetch(no_selector_store) + expect(result.success?).to eq(true) + + # Stop the source + source.stop + + # Second fetch should fail + result = source.fetch(no_selector_store) + expect(result.success?).to eq(false) + expect(result.error).to include("closed") + end + end + + describe "name property" do + it "returns correct name" do + file = make_temp_file(all_properties_json) + + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: [file.path]) + + begin + expect(source.name).to eq("FileDataV2") + ensure + source.stop + end + end + end + + describe "accepts single path string" do + it "works with string instead of array" do + file = make_temp_file(flag_only_json) + + # Pass a single string instead of a list + source = Impl::Integrations::FileDataSourceV2.new(logger, paths: file.path) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + expect(result.value.change_set.changes.length).to eq(1) + ensure + source.stop + end + end + end + + describe "public API (data_source_v2)" do + it "creates builder that works as initializer" do + file = make_temp_file(all_properties_json) + + builder = FileData.data_source_v2(paths: [file.path]) + config = LaunchDarkly::Config.new(logger: logger) + + source = builder.build('sdk-key', config) + + begin + result = source.fetch(no_selector_store) + + expect(result.success?).to eq(true) + expect(result.value.change_set.changes.length).to eq(3) + ensure + source.stop + end + end + + it "creates builder that works as synchronizer" do + file = make_temp_file(all_properties_json) + + builder = FileData.data_source_v2(paths: [file.path], poll_interval: 0.1) + config = LaunchDarkly::Config.new(logger: logger) + + source = builder.build('sdk-key', config) + + updates = [] + + begin + sync_thread = Thread.new do + source.sync(no_selector_store) do |update| + updates << update + break if updates.length >= 1 + end + end + + deadline = Time.now + 5 + while updates.empty? && Time.now < deadline + sleep 0.1 + end + + expect(updates.length).to be >= 1 + expect(updates[0].state).to eq(LaunchDarkly::Interfaces::DataSource::Status::VALID) + ensure + source.stop + sync_thread&.join(2) + end + end + end + end + end +end