diff --git a/crates/fluss/src/client/connection.rs b/crates/fluss/src/client/connection.rs index 78e9362b..23d8e530 100644 --- a/crates/fluss/src/client/connection.rs +++ b/crates/fluss/src/client/connection.rs @@ -43,7 +43,9 @@ impl FlussConnection { pub async fn new(arg: Config) -> Result { arg.validate_security() .map_err(|msg| Error::IllegalArgument { message: msg })?; - arg.validate_scanner_fetch() + arg.validate_scanner() + .map_err(|msg| Error::IllegalArgument { message: msg })?; + arg.validate_writer() .map_err(|msg| Error::IllegalArgument { message: msg })?; let timeout = Duration::from_millis(arg.connect_timeout_ms); diff --git a/crates/fluss/src/client/table/remote_log.rs b/crates/fluss/src/client/table/remote_log.rs index 6bc95512..4d96ce96 100644 --- a/crates/fluss/src/client/table/remote_log.rs +++ b/crates/fluss/src/client/table/remote_log.rs @@ -778,7 +778,7 @@ impl RemoteLogDownloader { let fetcher = Arc::new(ProductionFetcher { credentials_rx, local_log_dir: Arc::new(local_log_dir), - remote_log_read_concurrency: remote_log_read_concurrency.max(1), + remote_log_read_concurrency, }); Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads) diff --git a/crates/fluss/src/client/write/writer_client.rs b/crates/fluss/src/client/write/writer_client.rs index aee6bcd9..ffdf96b1 100644 --- a/crates/fluss/src/client/write/writer_client.rs +++ b/crates/fluss/src/client/write/writer_client.rs @@ -54,10 +54,6 @@ impl WriterClient { pub fn new(config: Config, metadata: Arc) -> Result { let ack = Self::get_ack(&config)?; - config - .validate_idempotence() - .map_err(|message| Error::IllegalArgument { message })?; - let idempotence_manager = Arc::new(IdempotenceManager::new( config.writer_enable_idempotence, config.writer_max_inflight_requests_per_bucket, diff --git a/crates/fluss/src/config.rs b/crates/fluss/src/config.rs index 32db44f6..5309af12 100644 --- a/crates/fluss/src/config.rs +++ b/crates/fluss/src/config.rs @@ -270,34 +270,7 @@ impl Config { /// Validates idempotence configuration. Returns `Ok(())` when the config is /// consistent, or an error message when idempotence is enabled but other /// settings are incompatible. - pub fn validate_idempotence(&self) -> Result<(), String> { - if !self.writer_enable_idempotence { - return Ok(()); - } - let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1"; - if !acks_is_all { - return Err(format!( - "Idempotent writes require acks='all' (-1), but got acks='{}'", - self.writer_acks - )); - } - if self.writer_retries <= 0 { - return Err(format!( - "Idempotent writes require retries > 0, but got retries={}", - self.writer_retries - )); - } - if self.writer_max_inflight_requests_per_bucket - > MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE - { - return Err(format!( - "Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}", - MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE, - self.writer_max_inflight_requests_per_bucket - )); - } - Ok(()) - } + /// Validates security configuration. Returns `Ok(())` when the config is /// consistent, or an error message when SASL is enabled but the config is @@ -324,7 +297,16 @@ impl Config { } Ok(()) } - pub fn validate_scanner_fetch(&self) -> Result<(), String> { + pub fn validate_scanner(&self) -> Result<(), String> { + if self.scanner_remote_log_prefetch_num == 0 { + return Err("scanner_remote_log_prefetch_num must be > 0".to_string()); + } + if self.scanner_remote_log_read_concurrency == 0 { + return Err("scanner_remote_log_read_concurrency must be > 0".to_string()); + } + if self.remote_file_download_thread_num == 0 { + return Err("remote_file_download_thread_num must be > 0".to_string()); + } if self.scanner_log_fetch_min_bytes <= 0 { return Err("scanner_log_fetch_min_bytes must be > 0".to_string()); } @@ -350,6 +332,57 @@ impl Config { } Ok(()) } + + pub fn validate_writer(&self) -> Result<(), String> { + if self.writer_request_max_size <= 0 { + return Err("writer_request_max_size must be > 0".to_string()); + } + if self.writer_batch_size <= 0 { + return Err("writer_batch_size must be > 0".to_string()); + } + if self.writer_batch_timeout_ms < 0 { + return Err("writer_batch_timeout_ms must be >= 0".to_string()); + } + if self.writer_max_inflight_requests_per_bucket == 0 { + return Err("writer_max_inflight_requests_per_bucket must be > 0".to_string()); + } + if self.writer_buffer_memory_size == 0 { + return Err("writer_buffer_memory_size must be > 0".to_string()); + } + if self.writer_batch_size > self.writer_request_max_size { + return Err("writer_batch_size must be <= writer_request_max_size".to_string()); + } + if self.writer_batch_size as usize > self.writer_buffer_memory_size { + return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string()); + } + // idempotence checks + if !self.writer_enable_idempotence { + return Ok(()); + } + let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1"; + if !acks_is_all { + return Err(format!( + "Idempotent writes require acks='all' (-1), but got acks='{}'", + self.writer_acks + )); + } + if self.writer_retries <= 0 { + return Err(format!( + "Idempotent writes require retries > 0, but got retries={}", + self.writer_retries + )); + } + if self.writer_max_inflight_requests_per_bucket + > MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE + { + return Err(format!( + "Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}", + MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE, + self.writer_max_inflight_requests_per_bucket + )); + } + Ok(()) + } } #[cfg(test)] @@ -419,13 +452,38 @@ mod tests { }; assert!(config.validate_security().is_err()); } + #[test] - fn test_scanner_fetch_defaults_valid() { + fn test_scanner_defaults_valid() { let config = Config::default(); - assert!(config.validate_scanner_fetch().is_ok()); - assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024); - assert_eq!(config.scanner_log_fetch_min_bytes, 1); - assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500); + assert!(config.validate_scanner().is_ok()); + } + + #[test] + fn test_scanner_remote_log_prefetch_num_zero() { + let config = Config { + scanner_remote_log_prefetch_num: 0, + ..Config::default() + }; + assert!(config.validate_scanner().is_err()); + } + + #[test] + fn test_scanner_remote_log_read_concurrency_zero() { + let config = Config { + scanner_remote_log_read_concurrency: 0, + ..Config::default() + }; + assert!(config.validate_scanner().is_err()); + } + + #[test] + fn test_remote_file_download_thread_num_zero() { + let config = Config { + remote_file_download_thread_num: 0, + ..Config::default() + }; + assert!(config.validate_scanner().is_err()); } #[test] @@ -435,7 +493,7 @@ mod tests { scanner_log_fetch_max_bytes: 1, ..Config::default() }; - assert!(config.validate_scanner_fetch().is_err()); + assert!(config.validate_scanner().is_err()); } #[test] @@ -444,13 +502,78 @@ mod tests { scanner_log_fetch_wait_max_time_ms: -1, ..Config::default() }; - assert!(config.validate_scanner_fetch().is_err()); + assert!(config.validate_scanner().is_err()); } #[test] - fn test_idempotence_default_is_valid() { + fn test_writer_defaults_valid() { let config = Config::default(); - assert!(config.validate_idempotence().is_ok()); + assert!(config.validate_writer().is_ok()); + } + + #[test] + fn test_writer_request_max_size_zero() { + let config = Config { + writer_request_max_size: 0, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); + } + + #[test] + fn test_writer_batch_size_zero() { + let config = Config { + writer_batch_size: 0, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); + } + + #[test] + fn test_writer_batch_timeout_negative() { + let config = Config { + writer_batch_timeout_ms: -1, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); + } + + #[test] + fn test_writer_max_inflight_requests_per_bucket_zero() { + let config = Config { + writer_max_inflight_requests_per_bucket: 0, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); + } + + #[test] + fn test_writer_buffer_memory_size_zero() { + let config = Config { + writer_buffer_memory_size: 0, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); + } + + #[test] + fn test_writer_batch_size_exceeds_request_max_size() { + let config = Config { + writer_batch_size: 20 * 1024 * 1024, + writer_request_max_size: 10 * 1024 * 1024, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); + } + + #[test] + fn test_writer_batch_size_exceeds_buffer_memory_size() { + let config = Config { + writer_batch_size: 128 * 1024 * 1024, + writer_buffer_memory_size: 64 * 1024 * 1024, + ..Config::default() + }; + assert!(config.validate_writer().is_err()); } #[test] @@ -462,7 +585,7 @@ mod tests { writer_max_inflight_requests_per_bucket: 100, ..Config::default() }; - assert!(config.validate_idempotence().is_ok()); + assert!(config.validate_writer().is_ok()); } #[test] @@ -472,7 +595,7 @@ mod tests { writer_acks: "1".to_string(), ..Config::default() }; - assert!(config.validate_idempotence().is_err()); + assert!(config.validate_writer().is_err()); } #[test] @@ -482,7 +605,7 @@ mod tests { writer_retries: 0, ..Config::default() }; - assert!(config.validate_idempotence().is_err()); + assert!(config.validate_writer().is_err()); } #[test] @@ -492,6 +615,6 @@ mod tests { writer_max_inflight_requests_per_bucket: 10, ..Config::default() }; - assert!(config.validate_idempotence().is_err()); + assert!(config.validate_writer().is_err()); } }