Skip to content

[core] Introduce blob v2#7216

Merged
JingsongLi merged 16 commits intoapache:masterfrom
leaves12138:blob_descriptor_adaptive_deserialization
Feb 26, 2026
Merged

[core] Introduce blob v2#7216
JingsongLi merged 16 commits intoapache:masterfrom
leaves12138:blob_descriptor_adaptive_deserialization

Conversation

@leaves12138
Copy link
Contributor

@leaves12138 leaves12138 commented Feb 5, 2026

Purpose

Blob v2 contains the following new features:

  • Write adaptive. Paimon could find out the descriptor or the blob itself you wrote, user does not need to set 'blob-as-descriptor' to true when write blob descriptor to paimon
  • Descriptor storage. User could set blob.stored-descriptor-fields = 'xxx' to store blob descriptor in normal (parquet/orc/avro rather than blob)file. It just store the descriptor as bytes, does not copy anything from descriptor to paimon table.

Tests

API and Format

Documentation

@leaves12138 leaves12138 changed the title [flink] [spark] Write blob with adaptive match blob-descriptor [WIP] [flink] [spark] Write blob with adaptive match blob-descriptor Feb 5, 2026
@leaves12138 leaves12138 changed the title [WIP] [flink] [spark] Write blob with adaptive match blob-descriptor [flink] [spark] Write blob with adaptive match blob-descriptor Feb 5, 2026
ByteBuffer buffer = ByteBuffer.allocate(totalSize);
buffer.order(ByteOrder.LITTLE_ENDIAN);

buffer.putLong(MAGIC);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in order to be compatible with older versions, we should design Magic Numbers after the version number, and Linux kernel images are designed in this way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@leaves12138 leaves12138 force-pushed the blob_descriptor_adaptive_deserialization branch from 0084314 to 58ab357 Compare February 11, 2026 05:25
@JingsongLi
Copy link
Contributor

Due to compatibility issues with this PR, I suggest that we modify all relevant content in this PR, including:

  1. Blob-descriptor V2 and write blob with adaptive match blob-descriptor.
  2. Support blob-store-descriptor.
  3. Modify PyPaimon to support *1 and *2

@leaves12138 leaves12138 marked this pull request as draft February 14, 2026 01:59
@leaves12138 leaves12138 force-pushed the blob_descriptor_adaptive_deserialization branch from c28fc68 to 226f6e8 Compare February 14, 2026 03:01
@leaves12138 leaves12138 changed the title [flink] [spark] Write blob with adaptive match blob-descriptor [core] Introduce blob v2 Feb 14, 2026
@leaves12138
Copy link
Contributor Author

Due to compatibility issues with this PR, I suggest that we modify all relevant content in this PR, including:

  1. Blob-descriptor V2 and write blob with adaptive match blob-descriptor.
  2. Support blob-store-descriptor.
  3. Modify PyPaimon to support *1 and *2

OK

@leaves12138 leaves12138 force-pushed the blob_descriptor_adaptive_deserialization branch from a26004b to 428e913 Compare February 24, 2026 06:51
@leaves12138 leaves12138 marked this pull request as ready for review February 24, 2026 07:03

@Immutable
public static final ConfigOption<String> BLOB_STORED_DESCRIPTOR_FIELDS =
key("blob.stored-descriptor-fields")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blob-descriptor-field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

this(vectorizedColumnBatch, null, rowId);
}

public ColumnarRow(VectorizedColumnBatch vectorizedColumnBatch, UriReader uriReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not modify constructors. Just use setFileIO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

this.rowId = 0;
}

public void setUriReader(UriReader uriReader) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setFileIO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

byte version = buffer.get();
if (version == 1) {
try {
deserialize(bytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove codes for v1, only works for v2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


/** A factory to create and cache {@link UriReader}. */
public class UriReaderFactory {
public class UriReaderFactory implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkRow may use this

.with_description("Whether to return blob values as serialized BlobDescriptor bytes when reading.")
)

BLOB_STORED_DESCRIPTOR_FIELDS: ConfigOption[str] = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe It's better for BLOB_STORED_DESCRIPTOR_FIELDS and blob-descriptor-field to be the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs):
try:
# Kept for API compatibility. Write behavior is adaptive and does not depend on this flag.
_ = blob_as_descriptor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the param blob_as_descriptor is not needed in this method, removing the param is better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

public static BlobDescriptor deserialize(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.LITTLE_ENDIAN);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

buffer.order(ByteOrder.LITTLE_ENDIAN);

byte version = buffer.get();
if (version != CURRENT_VERSION) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

V1 compatiblity and add tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

* fields.
*/
public static Pair<RowType, RowType> splitBlob(
RowType rowType, Set<String> blobStoredDescriptorFields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blobDescriptorFields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Repaired

boolean asyncFileWrite,
boolean statsDenseStore,
@Nullable BlobConsumer blobConsumer,
Set<String> blobStoredDescriptorFields,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blobDescriptorFields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Nullable BlobConsumer blobConsumer) {
RowType blobRowType = BlobType.splitBlob(writeSchema).getRight();
@Nullable BlobConsumer blobConsumer,
Set<String> blobStoredDescriptorFields) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

blobDescriptorFields

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Tests for table with blob. */
public class BlobTableTest extends TableTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test to validate orc and avro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Tests for table with blob. */
public class BlobTableTest extends TableTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test to validate orc and avro.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

MAGIC = 0x424C4F4244455343 # "BLOBDESC"

def __init__(self, uri: str, offset: int, length: int, version: int = CURRENT_VERSION):
if version != self.CURRENT_VERSION:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remember to sync to python.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@leaves12138
Copy link
Contributor Author

Hi @JingsongLi, thanks for the review comments. I have addressed all of them in this PR and synchronized the related Java/Python changes accordingly. Could you please take another look when you have time?

self.assertEqual(result.column('pic1').to_pylist()[0], pic1_data)
self.assertEqual(result.column('pic2').to_pylist()[0], pic2_data)

def test_blob_stored_descriptor_fields_rejects_non_descriptor_input(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all blob_stored_descriptor_fields to blob_descriptor_field

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@JingsongLi JingsongLi merged commit 4ca4544 into apache:master Feb 26, 2026
17 of 19 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants