Skip to content

Commit 41836f9

Browse files
committed
Data: Add TCK tests for ReadBuilder in BaseFormatModelTests
1 parent 9528f85 commit 41836f9

1 file changed

Lines changed: 315 additions & 18 deletions

File tree

data/src/test/java/org/apache/iceberg/data/BaseFormatModelTests.java

Lines changed: 315 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH;
2222
import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS;
2323
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
25+
import static org.assertj.core.api.Assumptions.assumeThat;
2426

2527
import java.io.IOException;
2628
import java.nio.file.Path;
2729
import java.util.Arrays;
30+
import java.util.Comparator;
2831
import java.util.List;
2932
import java.util.stream.Collectors;
3033
import org.apache.iceberg.DataFile;
@@ -38,6 +41,10 @@
3841
import org.apache.iceberg.encryption.EncryptedFiles;
3942
import org.apache.iceberg.encryption.EncryptedOutputFile;
4043
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
44+
import org.apache.iceberg.exceptions.NotFoundException;
45+
import org.apache.iceberg.exceptions.ValidationException;
46+
import org.apache.iceberg.expressions.Expression;
47+
import org.apache.iceberg.expressions.Expressions;
4148
import org.apache.iceberg.formats.FileWriterBuilder;
4249
import org.apache.iceberg.formats.FormatModelRegistry;
4350
import org.apache.iceberg.inmemory.InMemoryFileIO;
@@ -46,6 +53,7 @@
4653
import org.apache.iceberg.io.DeleteSchemaUtil;
4754
import org.apache.iceberg.io.InputFile;
4855
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
56+
import org.apache.iceberg.types.Types;
4957
import org.junit.jupiter.api.AfterEach;
5058
import org.junit.jupiter.api.BeforeEach;
5159
import org.junit.jupiter.api.io.TempDir;
@@ -89,7 +97,12 @@ void before() {
8997

9098
@AfterEach
9199
void after() {
92-
fileIO.deleteFile(encryptedFile.encryptingOutputFile());
100+
try {
101+
fileIO.deleteFile(encryptedFile.encryptingOutputFile());
102+
} catch (NotFoundException ignored) {
103+
// ignore if file not create
104+
}
105+
93106
this.encryptedFile = null;
94107
if (fileIO != null) {
95108
fileIO.close();
@@ -146,25 +159,9 @@ void testDataWriterEngineWriteGenericRead(FileFormat fileFormat, DataGenerator d
146159
void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator dataGenerator)
147160
throws IOException {
148161
Schema schema = dataGenerator.schema();
149-
FileWriterBuilder<DataWriter<Record>, Object> writerBuilder =
150-
FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile);
151-
152-
DataWriter<Record> writer =
153-
writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();
154162

155163
List<Record> genericRecords = dataGenerator.generateRecords();
156-
157-
try (writer) {
158-
for (Record record : genericRecords) {
159-
writer.write(record);
160-
}
161-
}
162-
163-
DataFile dataFile = writer.toDataFile();
164-
165-
assertThat(dataFile).isNotNull();
166-
assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size());
167-
assertThat(dataFile.format()).isEqualTo(fileFormat);
164+
writeGenericRecords(fileFormat, schema, genericRecords);
168165

169166
// Read back and verify
170167
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
@@ -317,6 +314,306 @@ void testPositionDeleteWriterEngineWriteGenericRead(FileFormat fileFormat) throw
317314
DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), records, readRecords);
318315
}
319316

317+
@ParameterizedTest
318+
@FieldSource("FORMAT_AND_GENERATOR")
319+
/** Write with Generic Record, read with projected engine type T (narrow schema) */
320+
void testReaderBuilderProjection(FileFormat fileFormat, DataGenerator dataGenerator)
321+
throws IOException {
322+
Schema fullSchema = dataGenerator.schema();
323+
324+
List<Types.NestedField> columns = fullSchema.columns();
325+
Schema projectedSchema = new Schema(columns.get(columns.size() - 1));
326+
327+
List<Record> genericRecords = dataGenerator.generateRecords();
328+
writeGenericRecords(fileFormat, fullSchema, genericRecords);
329+
330+
List<Record> projectedGenericRecords = projectRecords(genericRecords, projectedSchema);
331+
List<T> expectedEngineRecords =
332+
convertToEngineRecords(projectedGenericRecords, projectedSchema);
333+
334+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
335+
List<T> readRecords;
336+
try (CloseableIterable<T> reader =
337+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
338+
.project(projectedSchema)
339+
.engineProjection(engineSchema(projectedSchema))
340+
.build()) {
341+
readRecords = ImmutableList.copyOf(reader);
342+
}
343+
344+
assertEquals(projectedSchema, expectedEngineRecords, readRecords);
345+
}
346+
347+
@ParameterizedTest
348+
@FieldSource("FORMAT_AND_GENERATOR")
349+
void testReaderBuilderFilter(FileFormat fileFormat, DataGenerator dataGenerator)
350+
throws IOException {
351+
352+
// Avro does not support filter push down
353+
// Skip this test for Avro to avoid false failures.
354+
assumeThat(fileFormat != FileFormat.AVRO).isTrue();
355+
356+
Schema schema = dataGenerator.schema();
357+
358+
List<Record> genericRecords = dataGenerator.generateRecords();
359+
writeGenericRecords(fileFormat, schema, genericRecords);
360+
361+
// Construct a filter condition that is smaller than the minimum value to achieve file-level
362+
// filtering.
363+
Types.NestedField firstField = schema.columns().get(0);
364+
Expression filter = filterFieldExpression(firstField, schema, genericRecords);
365+
366+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
367+
List<T> readRecords;
368+
try (CloseableIterable<T> reader =
369+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
370+
.project(schema)
371+
.engineProjection(engineSchema(schema))
372+
.filter(filter)
373+
.build()) {
374+
readRecords = ImmutableList.copyOf(reader);
375+
}
376+
377+
assertThat(readRecords).isEmpty();
378+
}
379+
380+
@ParameterizedTest
381+
@FieldSource("FORMAT_AND_GENERATOR")
382+
/**
383+
* Write with Generic Record, then read using an upper-cased column name in the filter to verify
384+
* caseSensitive behavior.
385+
*/
386+
void testReaderBuilderCaseSensitive(FileFormat fileFormat, DataGenerator dataGenerator)
387+
throws IOException {
388+
389+
// Avro does not support filter push down; caseSensitive has no effect on it.
390+
// Skip this test for Avro to avoid false failures.
391+
assumeThat(fileFormat != FileFormat.AVRO).isTrue();
392+
393+
Schema schema = dataGenerator.schema();
394+
395+
List<Record> genericRecords = dataGenerator.generateRecords();
396+
writeGenericRecords(fileFormat, schema, genericRecords);
397+
398+
// Build a filter using the upper-cased name of the first column.
399+
Types.NestedField firstField = schema.columns().get(0);
400+
Object filterValue = genericRecords.get(0).getField(firstField.name());
401+
Expression upperCaseFilter = Expressions.equal(firstField.name().toUpperCase(), filterValue);
402+
403+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
404+
405+
// caseSensitive=false: upper-cased column name must be resolved correctly.
406+
List<T> readRecords;
407+
try (CloseableIterable<T> reader =
408+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
409+
.project(schema)
410+
.engineProjection(engineSchema(schema))
411+
.filter(upperCaseFilter)
412+
.caseSensitive(false)
413+
.build()) {
414+
readRecords = ImmutableList.copyOf(reader);
415+
}
416+
417+
assertThat(readRecords).isNotEmpty();
418+
419+
// caseSensitive=true: upper-cased column name cannot be resolved → must throw.
420+
assertThatThrownBy(
421+
() -> {
422+
try (CloseableIterable<T> reader =
423+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
424+
.project(schema)
425+
.engineProjection(engineSchema(schema))
426+
.filter(upperCaseFilter)
427+
.caseSensitive(true)
428+
.build()) {
429+
ImmutableList.copyOf(reader);
430+
}
431+
})
432+
.isInstanceOf(ValidationException.class)
433+
.hasMessageContaining("Cannot find field '%s'", firstField.name().toUpperCase());
434+
}
435+
436+
@ParameterizedTest
437+
@FieldSource("FORMAT_AND_GENERATOR")
438+
/**
439+
* Write with Generic Record, then read using split to verify that the split range is respected.
440+
* Reading with a zero-length split at the end of the file should return no records, while reading
441+
* with the full file range should return all records.
442+
*/
443+
void testReaderBuilderSplit(FileFormat fileFormat, DataGenerator dataGenerator)
444+
throws IOException {
445+
Schema schema = dataGenerator.schema();
446+
447+
List<Record> genericRecords = dataGenerator.generateRecords();
448+
writeGenericRecords(fileFormat, schema, genericRecords);
449+
450+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
451+
long fileLength = inputFile.getLength();
452+
453+
// split(fileLength, 0): empty range at the end of the file → no records should be returned
454+
List<T> emptyReadRecords;
455+
try (CloseableIterable<T> reader =
456+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
457+
.project(schema)
458+
.engineProjection(engineSchema(schema))
459+
.split(fileLength, 0)
460+
.build()) {
461+
emptyReadRecords = ImmutableList.copyOf(reader);
462+
}
463+
464+
assertThat(emptyReadRecords).isEmpty();
465+
466+
// split(0, fileLength): full file range → all records should be returned
467+
List<T> fullReadRecords;
468+
try (CloseableIterable<T> reader =
469+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
470+
.project(schema)
471+
.engineProjection(engineSchema(schema))
472+
.split(0, fileLength)
473+
.build()) {
474+
fullReadRecords = ImmutableList.copyOf(reader);
475+
}
476+
477+
assertEquals(schema, convertToEngineRecords(genericRecords, schema), fullReadRecords);
478+
}
479+
480+
@ParameterizedTest
481+
@FieldSource("FORMAT_AND_GENERATOR")
482+
/**
483+
* Verifies the contract of recordsPerBatch: recordsPerBatch is a hint for vectorized readers. The
484+
* total number of records returned must be unaffected regardless of the batch size value.
485+
*/
486+
void testReaderBuilderRecordsPerBatch(FileFormat fileFormat, DataGenerator dataGenerator)
487+
throws IOException {
488+
489+
// Avro does not support batch reading.
490+
assumeThat(fileFormat != FileFormat.AVRO).isTrue();
491+
492+
Schema schema = dataGenerator.schema();
493+
494+
List<Record> genericRecords = dataGenerator.generateRecords();
495+
writeGenericRecords(fileFormat, schema, genericRecords);
496+
497+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
498+
List<T> expectedEngineRecords = convertToEngineRecords(genericRecords, schema);
499+
500+
List<T> smallBatchRecords;
501+
try (CloseableIterable<T> reader =
502+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
503+
.project(schema)
504+
.engineProjection(engineSchema(schema))
505+
.recordsPerBatch(1)
506+
.build()) {
507+
smallBatchRecords = ImmutableList.copyOf(reader);
508+
}
509+
510+
assertEquals(schema, expectedEngineRecords, smallBatchRecords);
511+
512+
List<T> largeBatchRecords;
513+
try (CloseableIterable<T> reader =
514+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
515+
.project(schema)
516+
.engineProjection(engineSchema(schema))
517+
.recordsPerBatch(genericRecords.size() + 1)
518+
.build()) {
519+
largeBatchRecords = ImmutableList.copyOf(reader);
520+
}
521+
522+
assertEquals(schema, expectedEngineRecords, largeBatchRecords);
523+
}
524+
525+
@ParameterizedTest
526+
@FieldSource("FORMAT_AND_GENERATOR")
527+
/** Verifies the contract of reuseContainers */
528+
void testReaderBuilderReuseContainers(FileFormat fileFormat, DataGenerator dataGenerator)
529+
throws IOException {
530+
531+
// Orc does not support batch reading.
532+
assumeThat(fileFormat != FileFormat.ORC).isTrue();
533+
534+
Schema schema = dataGenerator.schema();
535+
536+
List<Record> genericRecords = dataGenerator.generateRecords();
537+
// Need at least 2 records to verify container reuse
538+
assumeThat(genericRecords.size() >= 2).isTrue();
539+
writeGenericRecords(fileFormat, schema, genericRecords);
540+
541+
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
542+
543+
// Without reuseContainers: every record must be a distinct object instance
544+
List<T> noReuseRecords;
545+
try (CloseableIterable<T> reader =
546+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
547+
.project(schema)
548+
.engineProjection(engineSchema(schema))
549+
.build()) {
550+
noReuseRecords = ImmutableList.copyOf(reader);
551+
}
552+
553+
for (int i = 0; i < noReuseRecords.size() - 1; i++) {
554+
assertThat(noReuseRecords.get(i)).isNotSameAs(noReuseRecords.get(i + 1));
555+
}
556+
557+
// With reuseContainers: all collected elements must be the same object instance
558+
List<T> reuseRecords;
559+
try (CloseableIterable<T> reader =
560+
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
561+
.project(schema)
562+
.engineProjection(engineSchema(schema))
563+
.reuseContainers()
564+
.build()) {
565+
reuseRecords = ImmutableList.copyOf(reader);
566+
}
567+
568+
T first = reuseRecords.get(0);
569+
for (int i = 1; i < reuseRecords.size(); i++) {
570+
assertThat(reuseRecords.get(i)).isSameAs(first);
571+
}
572+
}
573+
574+
private void writeGenericRecords(FileFormat fileFormat, Schema schema, List<Record> records)
575+
throws IOException {
576+
FileWriterBuilder<DataWriter<Record>, Object> writerBuilder =
577+
FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile);
578+
579+
DataWriter<Record> writer =
580+
writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build();
581+
582+
try (writer) {
583+
for (Record record : records) {
584+
writer.write(record);
585+
}
586+
}
587+
588+
DataFile dataFile = writer.toDataFile();
589+
assertThat(dataFile).isNotNull();
590+
assertThat(dataFile.recordCount()).isEqualTo(records.size());
591+
assertThat(dataFile.format()).isEqualTo(fileFormat);
592+
}
593+
594+
private List<Record> projectRecords(List<Record> records, Schema projectedSchema) {
595+
return records.stream()
596+
.map(
597+
record -> {
598+
Record projected = GenericRecord.create(projectedSchema.asStruct());
599+
for (Types.NestedField field : projectedSchema.columns()) {
600+
projected.setField(field.name(), record.getField(field.name()));
601+
}
602+
return projected;
603+
})
604+
.collect(Collectors.toList());
605+
}
606+
607+
private Expression filterFieldExpression(
608+
Types.NestedField firstField, Schema schema, List<Record> records) {
609+
Object minValue =
610+
records.stream()
611+
.map(r -> (Comparable) r.getField(firstField.name()))
612+
.min(Comparator.naturalOrder())
613+
.get();
614+
return Expressions.lessThan(firstField.name(), minValue);
615+
}
616+
320617
private List<T> convertToEngineRecords(List<Record> records, Schema schema) {
321618
return records.stream().map(r -> convertToEngine(r, schema)).collect(Collectors.toList());
322619
}

0 commit comments

Comments
 (0)