Complete API reference for the Fluss Python client.
| Method / Property |
Config Key |
Description |
Config(properties: dict = None) |
|
Create config from a dict of key-value pairs |
bootstrap_servers |
bootstrap.servers |
Get/set coordinator server address |
writer_request_max_size |
writer.request-max-size |
Get/set max request size in bytes |
writer_acks |
writer.acks |
Get/set acknowledgment setting ("all" for all replicas) |
writer_retries |
writer.retries |
Get/set number of retries on failure |
writer_batch_size |
writer.batch-size |
Get/set write batch size in bytes |
writer_batch_timeout_ms |
writer.batch-timeout-ms |
Get/set max time in ms to wait for a writer batch to fill up before sending |
writer_bucket_no_key_assigner |
writer.bucket.no-key-assigner |
Get/set bucket assignment strategy ("sticky" or "round_robin") |
scanner_remote_log_prefetch_num |
scanner.remote-log.prefetch-num |
Get/set number of remote log segments to prefetch |
remote_file_download_thread_num |
remote-file.download-thread-num |
Get/set number of threads for remote log downloads |
scanner_remote_log_read_concurrency |
scanner.remote-log.read-concurrency |
Get/set streaming read concurrency within a remote log file |
scanner_log_max_poll_records |
scanner.log.max-poll-records |
Get/set max number of records returned in a single poll() |
scanner_log_fetch_max_bytes |
scanner.log.fetch.max-bytes |
Get/set maximum bytes per fetch response for LogScanner |
scanner_log_fetch_min_bytes |
scanner.log.fetch.min-bytes |
Get/set minimum bytes the server must accumulate before returning a fetch response |
scanner_log_fetch_wait_max_time_ms |
scanner.log.fetch.wait-max-time-ms |
Get/set maximum time (ms) the server may wait to satisfy min-bytes |
scanner_log_fetch_max_bytes_for_bucket |
scanner.log.fetch.max-bytes-for-bucket |
Get/set maximum bytes per fetch response per bucket for LogScanner |
connect_timeout_ms |
connect-timeout |
Get/set TCP connect timeout in milliseconds |
request_timeout_ms |
request-timeout |
Get/set max time in ms to wait for an RPC response after the request is sent (does not limit request write/send time) |
security_protocol |
security.protocol |
Get/set security protocol ("PLAINTEXT" or "sasl") |
security_sasl_mechanism |
security.sasl.mechanism |
Get/set SASL mechanism (only "PLAIN" is supported) |
security_sasl_username |
security.sasl.username |
Get/set SASL username (required when protocol is "sasl") |
security_sasl_password |
security.sasl.password |
Get/set SASL password (required when protocol is "sasl") |
| Method |
Description |
await FlussConnection.create(config) -> FlussConnection |
Connect to a Fluss cluster |
await conn.get_admin() -> FlussAdmin |
Get admin interface |
await conn.get_table(table_path) -> FlussTable |
Get a table for read/write operations |
conn.close() |
Close the connection |
Supports with statement (context manager).
| Method |
Description |
await create_database(name, database_descriptor=None, ignore_if_exists=False) |
Create a database |
await drop_database(name, ignore_if_not_exists=False, cascade=True) |
Drop a database |
await list_databases() -> list[str] |
List all databases |
await database_exists(name) -> bool |
Check if a database exists |
await get_database_info(name) -> DatabaseInfo |
Get database metadata |
await create_table(table_path, table_descriptor, ignore_if_exists=False) |
Create a table |
await drop_table(table_path, ignore_if_not_exists=False) |
Drop a table |
await get_table_info(table_path) -> TableInfo |
Get table metadata |
await list_tables(database_name) -> list[str] |
List tables in a database |
await table_exists(table_path) -> bool |
Check if a table exists |
await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int] |
Get offsets for buckets |
await list_partition_offsets(table_path, partition_name, bucket_ids, offset_spec) -> dict[int, int] |
Get offsets for a partition's buckets |
await create_partition(table_path, partition_spec, ignore_if_exists=False) |
Create a partition |
await drop_partition(table_path, partition_spec, ignore_if_not_exists=False) |
Drop a partition |
await list_partition_infos(table_path) -> list[PartitionInfo] |
List partitions |
await get_latest_lake_snapshot(table_path) -> LakeSnapshot |
Get latest lake snapshot |
await get_server_nodes() -> list[ServerNode] |
Get all alive server nodes |
| Property |
Description |
.id -> int |
Server node ID |
.host -> str |
Hostname of the server |
.port -> int |
Port number |
.server_type -> str |
Server type ("CoordinatorServer" or "TabletServer") |
.uid -> str |
Unique identifier (e.g. "cs-0", "ts-1") |
| Method |
Description |
new_scan() -> TableScan |
Create a scan builder |
new_append() -> TableAppend |
Create an append builder for log tables |
new_upsert() -> TableUpsert |
Create an upsert builder for PK tables |
new_lookup() -> TableLookup |
Create a lookup builder for PK tables |
get_table_info() -> TableInfo |
Get table metadata |
get_table_path() -> TablePath |
Get table path |
has_primary_key() -> bool |
Check if table has a primary key |
| Method |
Description |
.project(indices) -> TableScan |
Project columns by index |
.project_by_name(names) -> TableScan |
Project columns by name |
await .create_log_scanner() -> LogScanner |
Create record-based scanner (for poll()) |
await .create_record_batch_log_scanner() -> LogScanner |
Create batch-based scanner (for poll_arrow(), to_arrow(), etc.) |
Builder for creating an AppendWriter. Obtain via FlussTable.new_append().
| Method |
Description |
.create_writer() -> AppendWriter |
Create the append writer |
Builder for creating an UpsertWriter. Obtain via FlussTable.new_upsert().
| Method |
Description |
.partial_update_by_name(columns) -> TableUpsert |
Configure partial update by column names |
.partial_update_by_index(indices) -> TableUpsert |
Configure partial update by column indices |
.create_writer() -> UpsertWriter |
Create the upsert writer |
Builder for creating a Lookuper. Obtain via FlussTable.new_lookup().
| Method |
Description |
.create_lookuper() -> Lookuper |
Create the lookuper |
| Method |
Description |
.append(row) -> WriteResultHandle |
Append a row (dict, list, or tuple) |
.write_arrow(table) |
Write a PyArrow Table |
.write_arrow_batch(batch) -> WriteResultHandle |
Write a PyArrow RecordBatch |
.write_pandas(df) |
Write a Pandas DataFrame |
await .flush() |
Flush all pending writes |
| Method |
Description |
.upsert(row) -> WriteResultHandle |
Upsert a row (insert or update by PK) |
.delete(pk) -> WriteResultHandle |
Delete a row by primary key |
await .flush() |
Flush all pending operations |
| Method |
Description |
await .wait() |
Wait for server acknowledgment of this write |
| Method |
Description |
await .lookup(pk) -> dict | None |
Lookup a row by primary key |
| Method |
Description |
.subscribe(bucket_id, start_offset) |
Subscribe to a bucket |
.subscribe_buckets(bucket_offsets) |
Subscribe to multiple buckets ({bucket_id: offset}) |
.subscribe_partition(partition_id, bucket_id, start_offset) |
Subscribe to a partition bucket |
.subscribe_partition_buckets(partition_bucket_offsets) |
Subscribe to multiple partition+bucket combos ({(part_id, bucket_id): offset}) |
.unsubscribe(bucket_id) |
Unsubscribe from a bucket (non-partitioned tables) |
.unsubscribe_partition(partition_id, bucket_id) |
Unsubscribe from a partition bucket |
.poll(timeout_ms) -> ScanRecords |
Poll individual records (record scanner only) |
.poll_arrow(timeout_ms) -> pa.Table |
Poll as Arrow Table (batch scanner only) |
.poll_record_batch(timeout_ms) -> list[RecordBatch] |
Poll batches with metadata (batch scanner only) |
.to_arrow() -> pa.Table |
Read all subscribed data as Arrow Table (batch scanner only) |
.to_pandas() -> pd.DataFrame |
Read all subscribed data as DataFrame (batch scanner only) |
Returned by LogScanner.poll(). Records are grouped by bucket.
Note: Flat iteration and integer indexing traverse buckets in an arbitrary order that is consistent within a single ScanRecords instance but may differ between poll() calls. Use per-bucket access (.items(), .records(bucket)) when bucket ordering matters.
scan_records = scanner.poll(timeout_ms=5000)
# Sequence access
scan_records[0] # first record
scan_records[-1] # last record
scan_records[:5] # first 5 records
# Per-bucket access
for bucket, records in scan_records.items():
for record in records:
print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}")
# Flat iteration
for record in scan_records:
print(record.row)
| Method |
Description |
.buckets() -> list[TableBucket] |
List of distinct buckets |
.records(bucket) -> list[ScanRecord] |
Records for a specific bucket (empty list if bucket not present) |
.count() -> int |
Total record count across all buckets |
.is_empty() -> bool |
Check if empty |
| Expression |
Returns |
Description |
scan_records[0] |
ScanRecord |
Record by flat index |
scan_records[-1] |
ScanRecord |
Negative indexing |
scan_records[1:5] |
list[ScanRecord] |
Slice |
scan_records[bucket] |
list[ScanRecord] |
Records for a bucket |
| Method / Protocol |
Description |
.keys() |
Same as .buckets() |
.values() |
Lazy iterator over record lists, one per bucket |
.items() |
Lazy iterator over (bucket, records) pairs |
len(scan_records) |
Same as .count() |
bucket in scan_records |
Membership test |
for record in scan_records |
Flat iteration over all records |
| Property |
Description |
.offset -> int |
Record offset in the log |
.timestamp -> int |
Record timestamp |
.change_type -> ChangeType |
Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete) |
.row -> dict |
Row data as {column_name: value} |
| Property |
Description |
.batch -> pa.RecordBatch |
Arrow RecordBatch data |
.bucket -> TableBucket |
Bucket this batch belongs to |
.base_offset -> int |
First record offset |
.last_offset -> int |
Last record offset |
| Method |
Description |
Schema(schema: pa.Schema, primary_keys=None) |
Create from PyArrow schema |
.get_column_names() -> list[str] |
Get column names |
.get_column_types() -> list[str] |
Get column type names |
.get_columns() -> list[tuple[str, str]] |
Get (name, type) pairs |
.get_primary_keys() -> list[str] |
Get primary key columns |
| Method |
Description |
TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, log_format=None, kv_format=None, properties=None, custom_properties=None) |
Create table descriptor |
.get_schema() -> Schema |
Get the schema |
| Method / Property |
Description |
TablePath(database, table) |
Create a table path |
.database_name -> str |
Database name |
.table_name -> str |
Table name |
| Property / Method |
Description |
.table_id -> int |
Table ID |
.table_path -> TablePath |
Table path |
.num_buckets -> int |
Number of buckets |
.schema_id -> int |
Schema ID |
.comment -> str | None |
Table comment |
.created_time -> int |
Creation timestamp |
.modified_time -> int |
Last modification timestamp |
.get_primary_keys() -> list[str] |
Primary key columns |
.get_partition_keys() -> list[str] |
Partition columns |
.get_bucket_keys() -> list[str] |
Bucket key columns |
.has_primary_key() -> bool |
Has primary key? |
.is_partitioned() -> bool |
Is partitioned? |
.get_schema() -> Schema |
Get table schema |
.get_column_names() -> list[str] |
Column names |
.get_column_count() -> int |
Number of columns |
.get_properties() -> dict |
All table properties |
.get_custom_properties() -> dict |
Custom properties only |
| Property |
Description |
.partition_id -> int |
Partition ID |
.partition_name -> str |
Partition name |
| Method / Property |
Description |
DatabaseDescriptor(comment=None, custom_properties=None) |
Create descriptor |
.comment -> str | None |
Database comment |
.get_custom_properties() -> dict |
Custom properties |
| Property / Method |
Description |
.database_name -> str |
Database name |
.created_time -> int |
Creation timestamp |
.modified_time -> int |
Last modification timestamp |
.get_database_descriptor() -> DatabaseDescriptor |
Get descriptor |
| Property / Method |
Description |
.snapshot_id -> int |
Snapshot ID |
.table_buckets_offset -> dict[TableBucket, int] |
All bucket offsets |
.get_bucket_offset(bucket) -> int | None |
Get offset for a bucket |
.get_table_buckets() -> list[TableBucket] |
Get all buckets |
| Method / Property |
Description |
TableBucket(table_id, bucket) |
Create non-partitioned bucket |
TableBucket.with_partition(table_id, partition_id, bucket) |
Create partitioned bucket |
.table_id -> int |
Table ID |
.bucket_id -> int |
Bucket ID |
.partition_id -> int | None |
Partition ID (None if non-partitioned) |
| Property |
Description |
.message -> str |
Error message |
.error_code -> int |
Error code (ErrorCode.CLIENT_ERROR for client-side errors, server code otherwise) |
Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from Exception. See Error Handling for details on matching specific error codes.
| Constant |
Value |
Description |
fluss.EARLIEST_OFFSET |
-2 |
Start reading from earliest available offset |
| Method |
Description |
OffsetSpec.earliest() |
Earliest available offset |
OffsetSpec.latest() |
Latest offset |
OffsetSpec.timestamp(ts) |
Offset at or after the given timestamp (millis) |
To start reading from the latest offset (only new records), resolve the current offset via list_offsets before subscribing:
offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
scanner.subscribe(bucket_id=0, start_offset=offsets[0])
| Value |
Short String |
Description |
ChangeType.AppendOnly (0) |
+A |
Append-only |
ChangeType.Insert (1) |
+I |
Insert |
ChangeType.UpdateBefore (2) |
-U |
Previous value of updated row |
ChangeType.UpdateAfter (3) |
+U |
New value of updated row |
ChangeType.Delete (4) |
-D |
Delete |