2121import org .apache .paimon .CoreOptions .ChangelogProducer ;
2222import org .apache .paimon .CoreOptions .MergeEngine ;
2323import org .apache .paimon .KeyValueFileStore ;
24+ import org .apache .paimon .data .BinaryRow ;
2425import org .apache .paimon .fileindex .FileIndexPredicate ;
2526import org .apache .paimon .io .DataFileMeta ;
2627import org .apache .paimon .manifest .FilteredManifestEntry ;
3435import org .apache .paimon .stats .SimpleStatsEvolutions ;
3536import org .apache .paimon .table .source .ScanMode ;
3637import org .apache .paimon .types .RowType ;
38+ import org .apache .paimon .utils .ListUtils ;
3739import org .apache .paimon .utils .Pair ;
3840import org .apache .paimon .utils .SnapshotManager ;
3941
42+ import org .slf4j .Logger ;
43+ import org .slf4j .LoggerFactory ;
44+
4045import javax .annotation .Nullable ;
4146
4247import java .io .IOException ;
4348import java .util .ArrayList ;
4449import java .util .Collection ;
4550import java .util .Collections ;
51+ import java .util .Iterator ;
4652import java .util .LinkedHashMap ;
4753import java .util .List ;
4854import java .util .Map ;
5561/** {@link FileStoreScan} for {@link KeyValueFileStore}. */
5662public class KeyValueFileStoreScan extends AbstractFileStoreScan {
5763
64+ private static final Logger LOG = LoggerFactory .getLogger (KeyValueFileStoreScan .class );
65+
5866 private final SimpleStatsEvolutions fieldKeyStatsConverters ;
5967 private final SimpleStatsEvolutions fieldValueStatsConverters ;
6068 private final BucketSelectConverter bucketSelectConverter ;
@@ -203,6 +211,122 @@ private boolean isValueFilterEnabled() {
203211 }
204212 }
205213
214+ /**
215+ * Check if limit pushdown is supported for PK tables.
216+ *
217+ * <p>Not supported when merge engine is PARTIAL_UPDATE/AGGREGATE (need merge) or deletion
218+ * vectors are enabled (can't count deleted rows). For DEDUPLICATE/FIRST_ROW, per-bucket checks
219+ * (no overlapping, no delete rows) are done in applyLimitPushdownForBucket.
220+ */
221+ @ Override
222+ public boolean supportsLimitPushManifestEntries () {
223+ if (mergeEngine == PARTIAL_UPDATE || mergeEngine == AGGREGATE ) {
224+ return false ;
225+ }
226+
227+ return limit != null && limit > 0 && !deletionVectorsEnabled ;
228+ }
229+
230+ /**
231+ * Apply limit pushdown by grouping files by bucket and accumulating row counts until limit is
232+ * reached. For buckets that can't safely push down limit (overlapping files or delete rows),
233+ * include all files.
234+ */
235+ @ Override
236+ protected Iterator <ManifestEntry > limitPushManifestEntries (Iterator <ManifestEntry > entries ) {
237+ long startTime = System .nanoTime ();
238+ List <ManifestEntry > allEntries = ListUtils .toList (entries );
239+ Map <Pair <BinaryRow , Integer >, List <ManifestEntry >> buckets = groupByBucket (allEntries );
240+
241+ List <ManifestEntry > result = new ArrayList <>();
242+ long accumulatedRowCount = 0 ;
243+
244+ for (List <ManifestEntry > bucketEntries : buckets .values ()) {
245+ if (accumulatedRowCount >= limit ) {
246+ break ;
247+ }
248+
249+ long remainingLimit = limit - accumulatedRowCount ;
250+ List <ManifestEntry > processedBucket =
251+ applyLimitPushdownForBucket (bucketEntries , remainingLimit );
252+ if (processedBucket == null ) {
253+ result .addAll (bucketEntries );
254+ } else {
255+ result .addAll (processedBucket );
256+ for (ManifestEntry entry : processedBucket ) {
257+ long fileRowCount = entry .file ().rowCount ();
258+ accumulatedRowCount += fileRowCount ;
259+ }
260+ }
261+ }
262+
263+ long duration = (System .nanoTime () - startTime ) / 1_000_000 ;
264+ LOG .info (
265+ "Limit pushdown for PK table completed in {} ms. Limit: {}, InputFiles: {}, OutputFiles: {}, "
266+ + "MergeEngine: {}, ScanMode: {}, DeletionVectorsEnabled: {}" ,
267+ duration ,
268+ limit ,
269+ allEntries .size (),
270+ result .size (),
271+ mergeEngine ,
272+ scanMode ,
273+ deletionVectorsEnabled );
274+ return result .iterator ();
275+ }
276+
277+ /**
278+ * Apply limit pushdown for a single bucket. Returns files to include, or null if unsafe.
279+ *
280+ * <p>Returns null if files overlap (LSM level 0 or different levels) or have delete rows. For
281+ * non-overlapping files with no delete rows, accumulates row counts until limit is reached.
282+ *
283+ * @param bucketEntries files in the same bucket
284+ * @param limit the limit to apply
285+ * @return files to include, or null if we can't safely push down limit
286+ */
287+ @ Nullable
288+ private List <ManifestEntry > applyLimitPushdownForBucket (
289+ List <ManifestEntry > bucketEntries , long limit ) {
290+ // Check if this bucket has overlapping files (LSM property)
291+ boolean hasOverlapping = !noOverlapping (bucketEntries );
292+
293+ if (hasOverlapping ) {
294+ // For buckets with overlapping, we can't safely push down limit because files
295+ // need to be merged and we can't accurately calculate the merged row count.
296+ return null ;
297+ }
298+
299+ // For buckets without overlapping and with merge engines that don't require
300+ // merge (DEDUPLICATE or FIRST_ROW), we can safely accumulate row count
301+ // and stop when limit is reached, but only if files have no delete rows.
302+ List <ManifestEntry > result = new ArrayList <>();
303+ long accumulatedRowCount = 0 ;
304+
305+ for (ManifestEntry entry : bucketEntries ) {
306+ long fileRowCount = entry .file ().rowCount ();
307+ // Check if file has delete rows - if so, we can't accurately calculate
308+ // the merged row count, so we need to stop limit pushdown
309+ boolean hasDeleteRows =
310+ entry .file ().deleteRowCount ().map (count -> count > 0L ).orElse (false );
311+
312+ if (hasDeleteRows ) {
313+ // If file has delete rows, we can't accurately calculate merged row count
314+ // without reading the actual data. Can't safely push down limit.
315+ return null ;
316+ }
317+
318+ // File has no delete rows, no overlapping, and merge engine doesn't require merge.
319+ // Safe to count rows.
320+ result .add (entry );
321+ accumulatedRowCount += fileRowCount ;
322+ if (accumulatedRowCount >= limit ) {
323+ break ;
324+ }
325+ }
326+
327+ return result ;
328+ }
329+
206330 @ Override
207331 protected boolean postFilterManifestEntriesEnabled () {
208332 return valueFilter != null && scanMode == ScanMode .ALL ;
@@ -214,15 +338,8 @@ protected List<ManifestEntry> postFilterManifestEntries(List<ManifestEntry> file
214338 // Why do this: because in primary key table, we can't just filter the value
215339 // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
216340 // but we can do this by filter the whole bucket files
217- return files .stream ()
218- .collect (
219- Collectors .groupingBy (
220- // we use LinkedHashMap to avoid disorder
221- file -> Pair .of (file .partition (), file .bucket ()),
222- LinkedHashMap ::new ,
223- Collectors .toList ()))
224- .values ()
225- .stream ()
341+ Map <Pair <BinaryRow , Integer >, List <ManifestEntry >> buckets = groupByBucket (files );
342+ return buckets .values ().stream ()
226343 .map (this ::doFilterWholeBucketByStats )
227344 .flatMap (Collection ::stream )
228345 .collect (Collectors .toList ());
@@ -316,4 +433,19 @@ private static boolean noOverlapping(List<ManifestEntry> entries) {
316433
317434 return true ;
318435 }
436+
437+ /**
438+ * Group manifest entries by (partition, bucket) while preserving order. This is a common
439+ * operation used by both limitPushManifestEntries and postFilterManifestEntries.
440+ */
441+ private Map <Pair <BinaryRow , Integer >, List <ManifestEntry >> groupByBucket (
442+ List <ManifestEntry > entries ) {
443+ return entries .stream ()
444+ .collect (
445+ Collectors .groupingBy (
446+ // we use LinkedHashMap to avoid disorder
447+ file -> Pair .of (file .partition (), file .bucket ()),
448+ LinkedHashMap ::new ,
449+ Collectors .toList ()));
450+ }
319451}
0 commit comments