Skip to content

Latest commit

 

History

History
370 lines (291 loc) · 25.8 KB

File metadata and controls

370 lines (291 loc) · 25.8 KB
sidebar_position 2

API Reference

Complete API reference for the Fluss Python client.

Config

Method / Property Config Key Description
Config(properties: dict = None) Create config from a dict of key-value pairs
bootstrap_servers bootstrap.servers Get/set coordinator server address
writer_request_max_size writer.request-max-size Get/set max request size in bytes
writer_acks writer.acks Get/set acknowledgment setting ("all" for all replicas)
writer_retries writer.retries Get/set number of retries on failure
writer_batch_size writer.batch-size Get/set write batch size in bytes
writer_batch_timeout_ms writer.batch-timeout-ms Get/set max time in ms to wait for a writer batch to fill up before sending
writer_bucket_no_key_assigner writer.bucket.no-key-assigner Get/set bucket assignment strategy ("sticky" or "round_robin")
scanner_remote_log_prefetch_num scanner.remote-log.prefetch-num Get/set number of remote log segments to prefetch
remote_file_download_thread_num remote-file.download-thread-num Get/set number of threads for remote log downloads
scanner_remote_log_read_concurrency scanner.remote-log.read-concurrency Get/set streaming read concurrency within a remote log file
scanner_log_max_poll_records scanner.log.max-poll-records Get/set max number of records returned in a single poll()
scanner_log_fetch_max_bytes scanner.log.fetch.max-bytes Get/set maximum bytes per fetch response for LogScanner
scanner_log_fetch_min_bytes scanner.log.fetch.min-bytes Get/set minimum bytes the server must accumulate before returning a fetch response
scanner_log_fetch_wait_max_time_ms scanner.log.fetch.wait-max-time-ms Get/set maximum time (ms) the server may wait to satisfy min-bytes
scanner_log_fetch_max_bytes_for_bucket scanner.log.fetch.max-bytes-for-bucket Get/set maximum bytes per fetch response per bucket for LogScanner
connect_timeout_ms connect-timeout Get/set TCP connect timeout in milliseconds
request_timeout_ms request-timeout Get/set max time in ms to wait for an RPC response after the request is sent (does not limit request write/send time)
security_protocol security.protocol Get/set security protocol ("PLAINTEXT" or "sasl")
security_sasl_mechanism security.sasl.mechanism Get/set SASL mechanism (only "PLAIN" is supported)
security_sasl_username security.sasl.username Get/set SASL username (required when protocol is "sasl")
security_sasl_password security.sasl.password Get/set SASL password (required when protocol is "sasl")

FlussConnection

Method Description
await FlussConnection.create(config) -> FlussConnection Connect to a Fluss cluster
await conn.get_admin() -> FlussAdmin Get admin interface
await conn.get_table(table_path) -> FlussTable Get a table for read/write operations
conn.close() Close the connection

Supports with statement (context manager).

FlussAdmin

Method Description
await create_database(name, database_descriptor=None, ignore_if_exists=False) Create a database
await drop_database(name, ignore_if_not_exists=False, cascade=True) Drop a database
await list_databases() -> list[str] List all databases
await database_exists(name) -> bool Check if a database exists
await get_database_info(name) -> DatabaseInfo Get database metadata
await create_table(table_path, table_descriptor, ignore_if_exists=False) Create a table
await drop_table(table_path, ignore_if_not_exists=False) Drop a table
await get_table_info(table_path) -> TableInfo Get table metadata
await list_tables(database_name) -> list[str] List tables in a database
await table_exists(table_path) -> bool Check if a table exists
await list_offsets(table_path, bucket_ids, offset_spec) -> dict[int, int] Get offsets for buckets
await list_partition_offsets(table_path, partition_name, bucket_ids, offset_spec) -> dict[int, int] Get offsets for a partition's buckets
await create_partition(table_path, partition_spec, ignore_if_exists=False) Create a partition
await drop_partition(table_path, partition_spec, ignore_if_not_exists=False) Drop a partition
await list_partition_infos(table_path) -> list[PartitionInfo] List partitions
await get_latest_lake_snapshot(table_path) -> LakeSnapshot Get latest lake snapshot
await get_server_nodes() -> list[ServerNode] Get all alive server nodes

ServerNode

Property Description
.id -> int Server node ID
.host -> str Hostname of the server
.port -> int Port number
.server_type -> str Server type ("CoordinatorServer" or "TabletServer")
.uid -> str Unique identifier (e.g. "cs-0", "ts-1")

FlussTable

Method Description
new_scan() -> TableScan Create a scan builder
new_append() -> TableAppend Create an append builder for log tables
new_upsert() -> TableUpsert Create an upsert builder for PK tables
new_lookup() -> TableLookup Create a lookup builder for PK tables
get_table_info() -> TableInfo Get table metadata
get_table_path() -> TablePath Get table path
has_primary_key() -> bool Check if table has a primary key

TableScan

Method Description
.project(indices) -> TableScan Project columns by index
.project_by_name(names) -> TableScan Project columns by name
await .create_log_scanner() -> LogScanner Create record-based scanner (for poll())
await .create_record_batch_log_scanner() -> LogScanner Create batch-based scanner (for poll_arrow(), to_arrow(), etc.)

TableAppend

Builder for creating an AppendWriter. Obtain via FlussTable.new_append().

Method Description
.create_writer() -> AppendWriter Create the append writer

TableUpsert

Builder for creating an UpsertWriter. Obtain via FlussTable.new_upsert().

Method Description
.partial_update_by_name(columns) -> TableUpsert Configure partial update by column names
.partial_update_by_index(indices) -> TableUpsert Configure partial update by column indices
.create_writer() -> UpsertWriter Create the upsert writer

TableLookup

Builder for creating a Lookuper. Obtain via FlussTable.new_lookup().

Method Description
.create_lookuper() -> Lookuper Create the lookuper

AppendWriter

Method Description
.append(row) -> WriteResultHandle Append a row (dict, list, or tuple)
.write_arrow(table) Write a PyArrow Table
.write_arrow_batch(batch) -> WriteResultHandle Write a PyArrow RecordBatch
.write_pandas(df) Write a Pandas DataFrame
await .flush() Flush all pending writes

UpsertWriter

Method Description
.upsert(row) -> WriteResultHandle Upsert a row (insert or update by PK)
.delete(pk) -> WriteResultHandle Delete a row by primary key
await .flush() Flush all pending operations

WriteResultHandle

Method Description
await .wait() Wait for server acknowledgment of this write

Lookuper

Method Description
await .lookup(pk) -> dict | None Lookup a row by primary key

LogScanner

Method Description
.subscribe(bucket_id, start_offset) Subscribe to a bucket
.subscribe_buckets(bucket_offsets) Subscribe to multiple buckets ({bucket_id: offset})
.subscribe_partition(partition_id, bucket_id, start_offset) Subscribe to a partition bucket
.subscribe_partition_buckets(partition_bucket_offsets) Subscribe to multiple partition+bucket combos ({(part_id, bucket_id): offset})
.unsubscribe(bucket_id) Unsubscribe from a bucket (non-partitioned tables)
.unsubscribe_partition(partition_id, bucket_id) Unsubscribe from a partition bucket
.poll(timeout_ms) -> ScanRecords Poll individual records (record scanner only)
.poll_arrow(timeout_ms) -> pa.Table Poll as Arrow Table (batch scanner only)
.poll_record_batch(timeout_ms) -> list[RecordBatch] Poll batches with metadata (batch scanner only)
.to_arrow() -> pa.Table Read all subscribed data as Arrow Table (batch scanner only)
.to_pandas() -> pd.DataFrame Read all subscribed data as DataFrame (batch scanner only)

ScanRecords

Returned by LogScanner.poll(). Records are grouped by bucket.

Note: Flat iteration and integer indexing traverse buckets in an arbitrary order that is consistent within a single ScanRecords instance but may differ between poll() calls. Use per-bucket access (.items(), .records(bucket)) when bucket ordering matters.

scan_records = scanner.poll(timeout_ms=5000)

# Sequence access
scan_records[0]                              # first record
scan_records[-1]                             # last record
scan_records[:5]                             # first 5 records

# Per-bucket access
for bucket, records in scan_records.items():
    for record in records:
        print(f"bucket={bucket.bucket_id}, offset={record.offset}, row={record.row}")

# Flat iteration
for record in scan_records:
    print(record.row)

Methods

Method Description
.buckets() -> list[TableBucket] List of distinct buckets
.records(bucket) -> list[ScanRecord] Records for a specific bucket (empty list if bucket not present)
.count() -> int Total record count across all buckets
.is_empty() -> bool Check if empty

Indexing

Expression Returns Description
scan_records[0] ScanRecord Record by flat index
scan_records[-1] ScanRecord Negative indexing
scan_records[1:5] list[ScanRecord] Slice
scan_records[bucket] list[ScanRecord] Records for a bucket

Mapping Protocol

Method / Protocol Description
.keys() Same as .buckets()
.values() Lazy iterator over record lists, one per bucket
.items() Lazy iterator over (bucket, records) pairs
len(scan_records) Same as .count()
bucket in scan_records Membership test
for record in scan_records Flat iteration over all records

ScanRecord

Property Description
.offset -> int Record offset in the log
.timestamp -> int Record timestamp
.change_type -> ChangeType Change type (AppendOnly, Insert, UpdateBefore, UpdateAfter, Delete)
.row -> dict Row data as {column_name: value}

RecordBatch

Property Description
.batch -> pa.RecordBatch Arrow RecordBatch data
.bucket -> TableBucket Bucket this batch belongs to
.base_offset -> int First record offset
.last_offset -> int Last record offset

Schema

Method Description
Schema(schema: pa.Schema, primary_keys=None) Create from PyArrow schema
.get_column_names() -> list[str] Get column names
.get_column_types() -> list[str] Get column type names
.get_columns() -> list[tuple[str, str]] Get (name, type) pairs
.get_primary_keys() -> list[str] Get primary key columns

TableDescriptor

Method Description
TableDescriptor(schema, *, partition_keys=None, bucket_count=None, bucket_keys=None, comment=None, log_format=None, kv_format=None, properties=None, custom_properties=None) Create table descriptor
.get_schema() -> Schema Get the schema

TablePath

Method / Property Description
TablePath(database, table) Create a table path
.database_name -> str Database name
.table_name -> str Table name

TableInfo

Property / Method Description
.table_id -> int Table ID
.table_path -> TablePath Table path
.num_buckets -> int Number of buckets
.schema_id -> int Schema ID
.comment -> str | None Table comment
.created_time -> int Creation timestamp
.modified_time -> int Last modification timestamp
.get_primary_keys() -> list[str] Primary key columns
.get_partition_keys() -> list[str] Partition columns
.get_bucket_keys() -> list[str] Bucket key columns
.has_primary_key() -> bool Has primary key?
.is_partitioned() -> bool Is partitioned?
.get_schema() -> Schema Get table schema
.get_column_names() -> list[str] Column names
.get_column_count() -> int Number of columns
.get_properties() -> dict All table properties
.get_custom_properties() -> dict Custom properties only

PartitionInfo

Property Description
.partition_id -> int Partition ID
.partition_name -> str Partition name

DatabaseDescriptor

Method / Property Description
DatabaseDescriptor(comment=None, custom_properties=None) Create descriptor
.comment -> str | None Database comment
.get_custom_properties() -> dict Custom properties

DatabaseInfo

Property / Method Description
.database_name -> str Database name
.created_time -> int Creation timestamp
.modified_time -> int Last modification timestamp
.get_database_descriptor() -> DatabaseDescriptor Get descriptor

LakeSnapshot

Property / Method Description
.snapshot_id -> int Snapshot ID
.table_buckets_offset -> dict[TableBucket, int] All bucket offsets
.get_bucket_offset(bucket) -> int | None Get offset for a bucket
.get_table_buckets() -> list[TableBucket] Get all buckets

TableBucket

Method / Property Description
TableBucket(table_id, bucket) Create non-partitioned bucket
TableBucket.with_partition(table_id, partition_id, bucket) Create partitioned bucket
.table_id -> int Table ID
.bucket_id -> int Bucket ID
.partition_id -> int | None Partition ID (None if non-partitioned)

FlussError

Property Description
.message -> str Error message
.error_code -> int Error code (ErrorCode.CLIENT_ERROR for client-side errors, server code otherwise)

Raised for all Fluss-specific errors (connection failures, table not found, schema mismatches, etc.). Inherits from Exception. See Error Handling for details on matching specific error codes.

Constants

Constant Value Description
fluss.EARLIEST_OFFSET -2 Start reading from earliest available offset

OffsetSpec

Method Description
OffsetSpec.earliest() Earliest available offset
OffsetSpec.latest() Latest offset
OffsetSpec.timestamp(ts) Offset at or after the given timestamp (millis)

To start reading from the latest offset (only new records), resolve the current offset via list_offsets before subscribing:

offsets = await admin.list_offsets(table_path, [0], fluss.OffsetSpec.latest())
scanner.subscribe(bucket_id=0, start_offset=offsets[0])

ChangeType

Value Short String Description
ChangeType.AppendOnly (0) +A Append-only
ChangeType.Insert (1) +I Insert
ChangeType.UpdateBefore (2) -U Previous value of updated row
ChangeType.UpdateAfter (3) +U New value of updated row
ChangeType.Delete (4) -D Delete