Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion crates/fluss/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ impl FlussConnection {
pub async fn new(arg: Config) -> Result<Self> {
arg.validate_security()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_scanner_fetch()
arg.validate_scanner()
.map_err(|msg| Error::IllegalArgument { message: msg })?;
arg.validate_writer()
.map_err(|msg| Error::IllegalArgument { message: msg })?;

let timeout = Duration::from_millis(arg.connect_timeout_ms);
Expand Down
2 changes: 1 addition & 1 deletion crates/fluss/src/client/table/remote_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,7 @@ impl RemoteLogDownloader {
let fetcher = Arc::new(ProductionFetcher {
credentials_rx,
local_log_dir: Arc::new(local_log_dir),
remote_log_read_concurrency: remote_log_read_concurrency.max(1),
remote_log_read_concurrency,
});

Self::new_with_fetcher(fetcher, max_prefetch_segments, max_concurrent_downloads)
Expand Down
4 changes: 0 additions & 4 deletions crates/fluss/src/client/write/writer_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ impl WriterClient {
pub fn new(config: Config, metadata: Arc<Metadata>) -> Result<Self> {
let ack = Self::get_ack(&config)?;

config
.validate_idempotence()
.map_err(|message| Error::IllegalArgument { message })?;

let idempotence_manager = Arc::new(IdempotenceManager::new(
config.writer_enable_idempotence,
config.writer_max_inflight_requests_per_bucket,
Expand Down
207 changes: 165 additions & 42 deletions crates/fluss/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,34 +270,7 @@ impl Config {
/// Validates idempotence configuration. Returns `Ok(())` when the config is
/// consistent, or an error message when idempotence is enabled but other
/// settings are incompatible.
pub fn validate_idempotence(&self) -> Result<(), String> {
if !self.writer_enable_idempotence {
return Ok(());
}
let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
if !acks_is_all {
return Err(format!(
"Idempotent writes require acks='all' (-1), but got acks='{}'",
self.writer_acks
));
}
if self.writer_retries <= 0 {
return Err(format!(
"Idempotent writes require retries > 0, but got retries={}",
self.writer_retries
));
}
if self.writer_max_inflight_requests_per_bucket
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
{
return Err(format!(
"Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
self.writer_max_inflight_requests_per_bucket
));
}
Ok(())
}


/// Validates security configuration. Returns `Ok(())` when the config is
/// consistent, or an error message when SASL is enabled but the config is
Expand All @@ -324,7 +297,16 @@ impl Config {
}
Ok(())
}
pub fn validate_scanner_fetch(&self) -> Result<(), String> {
pub fn validate_scanner(&self) -> Result<(), String> {
if self.scanner_remote_log_prefetch_num == 0 {
return Err("scanner_remote_log_prefetch_num must be > 0".to_string());
}
if self.scanner_remote_log_read_concurrency == 0 {
return Err("scanner_remote_log_read_concurrency must be > 0".to_string());
}
if self.remote_file_download_thread_num == 0 {
return Err("remote_file_download_thread_num must be > 0".to_string());
}
if self.scanner_log_fetch_min_bytes <= 0 {
return Err("scanner_log_fetch_min_bytes must be > 0".to_string());
}
Expand All @@ -350,6 +332,57 @@ impl Config {
}
Ok(())
}

pub fn validate_writer(&self) -> Result<(), String> {
if self.writer_request_max_size <= 0 {
return Err("writer_request_max_size must be > 0".to_string());
}
if self.writer_batch_size <= 0 {
return Err("writer_batch_size must be > 0".to_string());
}
if self.writer_batch_timeout_ms < 0 {
return Err("writer_batch_timeout_ms must be >= 0".to_string());
}
if self.writer_max_inflight_requests_per_bucket == 0 {
return Err("writer_max_inflight_requests_per_bucket must be > 0".to_string());
}
if self.writer_buffer_memory_size == 0 {
return Err("writer_buffer_memory_size must be > 0".to_string());
}
if self.writer_batch_size > self.writer_request_max_size {
return Err("writer_batch_size must be <= writer_request_max_size".to_string());
}
if self.writer_batch_size as usize > self.writer_buffer_memory_size {
return Err("writer_batch_size must be <= writer_buffer_memory_size".to_string());
}
// idempotence checks
if !self.writer_enable_idempotence {
return Ok(());
}
let acks_is_all = self.writer_acks.eq_ignore_ascii_case("all") || self.writer_acks == "-1";
if !acks_is_all {
return Err(format!(
"Idempotent writes require acks='all' (-1), but got acks='{}'",
self.writer_acks
));
}
if self.writer_retries <= 0 {
return Err(format!(
"Idempotent writes require retries > 0, but got retries={}",
self.writer_retries
));
}
if self.writer_max_inflight_requests_per_bucket
> MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE
{
return Err(format!(
"Idempotent writes require max-inflight-requests-per-bucket <= {}, but got {}",
MAX_IN_FLIGHT_REQUESTS_PER_BUCKET_FOR_IDEMPOTENCE,
self.writer_max_inflight_requests_per_bucket
));
}
Ok(())
Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to have these checks as well:

  • writer_batch_size <= writer_request_max_size - otherwise batches never drain as they exceed max size defined for request
  • writer_batch_size <= writer_buffer_memory_size - or If a single full batch exceeds max_size, it doesn't fit buffer and just keeps piling, we have runtime check for it, but better to validate at config level

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the requested checks in config validation

}
}

#[cfg(test)]
Expand Down Expand Up @@ -419,13 +452,38 @@ mod tests {
};
assert!(config.validate_security().is_err());
}

#[test]
fn test_scanner_fetch_defaults_valid() {
fn test_scanner_defaults_valid() {
let config = Config::default();
assert!(config.validate_scanner_fetch().is_ok());
assert_eq!(config.scanner_log_fetch_max_bytes, 16 * 1024 * 1024);
assert_eq!(config.scanner_log_fetch_min_bytes, 1);
assert_eq!(config.scanner_log_fetch_wait_max_time_ms, 500);
assert!(config.validate_scanner().is_ok());
}

#[test]
fn test_scanner_remote_log_prefetch_num_zero() {
let config = Config {
scanner_remote_log_prefetch_num: 0,
..Config::default()
};
assert!(config.validate_scanner().is_err());
}

#[test]
fn test_scanner_remote_log_read_concurrency_zero() {
let config = Config {
scanner_remote_log_read_concurrency: 0,
..Config::default()
};
assert!(config.validate_scanner().is_err());
}

#[test]
fn test_remote_file_download_thread_num_zero() {
let config = Config {
remote_file_download_thread_num: 0,
..Config::default()
};
assert!(config.validate_scanner().is_err());
}

#[test]
Expand All @@ -435,7 +493,7 @@ mod tests {
scanner_log_fetch_max_bytes: 1,
..Config::default()
};
assert!(config.validate_scanner_fetch().is_err());
assert!(config.validate_scanner().is_err());
}

#[test]
Expand All @@ -444,13 +502,78 @@ mod tests {
scanner_log_fetch_wait_max_time_ms: -1,
..Config::default()
};
assert!(config.validate_scanner_fetch().is_err());
assert!(config.validate_scanner().is_err());
}

#[test]
fn test_idempotence_default_is_valid() {
fn test_writer_defaults_valid() {
let config = Config::default();
assert!(config.validate_idempotence().is_ok());
assert!(config.validate_writer().is_ok());
}

#[test]
fn test_writer_request_max_size_zero() {
let config = Config {
writer_request_max_size: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_size_zero() {
let config = Config {
writer_batch_size: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_timeout_negative() {
let config = Config {
writer_batch_timeout_ms: -1,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_max_inflight_requests_per_bucket_zero() {
let config = Config {
writer_max_inflight_requests_per_bucket: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_buffer_memory_size_zero() {
let config = Config {
writer_buffer_memory_size: 0,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_size_exceeds_request_max_size() {
let config = Config {
writer_batch_size: 20 * 1024 * 1024,
writer_request_max_size: 10 * 1024 * 1024,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
fn test_writer_batch_size_exceeds_buffer_memory_size() {
let config = Config {
writer_batch_size: 128 * 1024 * 1024,
writer_buffer_memory_size: 64 * 1024 * 1024,
..Config::default()
};
assert!(config.validate_writer().is_err());
}

#[test]
Expand All @@ -462,7 +585,7 @@ mod tests {
writer_max_inflight_requests_per_bucket: 100,
..Config::default()
};
assert!(config.validate_idempotence().is_ok());
assert!(config.validate_writer().is_ok());
}

#[test]
Expand All @@ -472,7 +595,7 @@ mod tests {
writer_acks: "1".to_string(),
..Config::default()
};
assert!(config.validate_idempotence().is_err());
assert!(config.validate_writer().is_err());
}

#[test]
Expand All @@ -482,7 +605,7 @@ mod tests {
writer_retries: 0,
..Config::default()
};
assert!(config.validate_idempotence().is_err());
assert!(config.validate_writer().is_err());
}

#[test]
Expand All @@ -492,6 +615,6 @@ mod tests {
writer_max_inflight_requests_per_bucket: 10,
..Config::default()
};
assert!(config.validate_idempotence().is_err());
assert!(config.validate_writer().is_err());
}
}