Skip to content

[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources#54807

Open
jiteshsoni wants to merge 3 commits intoapache:masterfrom
jiteshsoni:SPARK-55450-admission-control-docs
Open

[SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources#54807
jiteshsoni wants to merge 3 commits intoapache:masterfrom
jiteshsoni:SPARK-55450-admission-control-docs

Conversation

@jiteshsoni
Copy link
Contributor

@jiteshsoni jiteshsoni commented Mar 14, 2026

What changes were proposed in this pull request?

This PR adds comprehensive documentation and examples for admission control and Trigger.AvailableNow support in PySpark custom streaming data sources (SPARK-55304).

Changes include:

  1. New tutorial documentation (python/docs/source/tutorial/sql/streaming_admission_control.rst):

    • Complete step-by-step guide for implementing admission control
  2. Example file (structured_blockchain_admission_control.py):

    • Demonstrates admission control via getDefaultReadLimit() and latestOffset()
    • Shows parallel partitioning (50 batches × 4 partitions = 200 tasks) and processes 1,000 simulated blockchain blocks with controlled batch sizes
    • Implements both continuous processing and Trigger.AvailableNow modes

Why are the changes needed?

Users need documentation and practical examples to implement admission control in custom streaming sources (introduced in SPARK-55304).

Does this PR introduce any user-facing change?

No. Documentation and examples only.

How was this patch tested?

Testing approach:

  • Ran both the continuous processing and Trigger.AvailableNow examples
  • Used the Spark Streaming UI to verify everything worked correctly
  • Screenshots attached below showing the actual runs

What I checked:

  1. Micro-batch creation: Confirmed multiple micro-batches were getting created for both modes
  2. Batch sizes: Verified each micro-batch processed exactly 20 blocks (admission control working correctly)
  3. Trigger.AvailableNow behavior: Confirmed it processed all 50 batches and stopped automatically - no manual intervention needed
  4. Continuous processing behavior: Had to manually kill the query as expected since it runs indefinitely
  5. Edge case - running out of data: Let the continuous processing query run longer to see what happens when there's no new data to process. It kept running but processed empty batches, which is the expected behavior.

Was this patch authored or co-authored using generative AI tooling?

Yes (Claude Sonnet 4.5)

🤖 Generated with Claude Code

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Mar 15, 2026

Thanks for your contribution! Could you make sure you have run the examples on your own? If you already did that, you can update the PR description (specifically How was this patch tested?) to include it.

Also we are asked to put the "model name" if you use LLM to generate the code, not the tool name. That is described in the PR template, specifically the form of string as well. Please update it.

@jiteshsoni jiteshsoni force-pushed the SPARK-55450-admission-control-docs branch from a2988f9 to ec89c64 Compare March 16, 2026 20:45
@jiteshsoni
Copy link
Contributor Author

jiteshsoni commented Mar 16, 2026

✅ Testing Verification - Examples Manually Tested on Databricks

I've successfully tested both examples on Databricks Dogfood Staging. Screenshots attached below showing the streaming query statistics.

Example 1: Continuous Processing

Query Name: blockchain_continuous

  • Trigger: Default trigger with 3-second processing intervals
  • Result: ✅ Admission control successfully limits each batch to 20 blocks
  • Termination: I used a timer to kill the query

Key Observations:

  • Input Rows: Consistently 20 records per batch (admission control working)
  • Batch Duration: Stable around 200-300ms per batch
  • Input Rate histogram shows steady processing across multiple batches
Cont

Example 2: Trigger.AvailableNow - Finite Processing

Query Name: blockchain_available_now

  • Batches Completed: 50 batches ✅
  • Total Blocks Processed: 1,000 blocks (50 batches × 20 blocks each)
  • Batch Size: 20 blocks per batch (admission control via getDefaultReadLimit())
  • Trigger: availableNow=True
  • Result: ✅ All 1,000 blocks processed in exactly 50 batches as designed
  • Termination: Automatic completion when all data processed ✅

Key Observations:

  • Input Rows histogram shows 50 batches total (exactly as expected)
  • prepareForTriggerAvailableNow() successfully captured target offset of 1,000 blocks
  • Query automatically terminated after processing all available data
Admission Control 50 batches

Test Environment

Verified Functionality

Admission Control: getDefaultReadLimit() returns ReadMaxRows(20) successfully limits batch size
Parallel Partitioning: 4 partitions per batch (5 blocks each) working correctly
Continuous Processing: Default trigger processes batches continuously until stopped
Trigger.AvailableNow: Finite processing mode completes all 50 batches and terminates automatically
SupportsTriggerAvailableNow: Mixin implementation working correctly
5 Core Functions: initialOffset(), latestOffset(), partitions(), read(), commit() all working


Screenshots below show the Streaming Query Statistics from both runs:


This comment was generated with GitHub MCP.

@jiteshsoni
Copy link
Contributor Author

jiteshsoni commented Mar 16, 2026

Thanks for your contribution! Could you make sure you have run the examples on your own? If you already did that, you can update the PR description (specifically How was this patch tested?) to include it.

Also we are asked to put the "model name" if you use LLM to generate the code, not the tool name. That is described in the PR template, specifically the form of string as well. Please update it.

Thanks for the feedback! I've addressed both items: #54807 (comment) (screenshot attached)

✅ Testing verification: Updated the PR description to confirm examples were manually tested on Databricks. I've also added a detailed comment below with screenshots showing

…r.AvailableNow in PySpark streaming data sources

This patch adds comprehensive documentation and examples for the new admission control
and Trigger.AvailableNow features in Python streaming data sources (added in SPARK-55304).

Changes:
- New tutorial: streaming_admission_control.rst with step-by-step guide
- Example: structured_blockchain_admission_control.py demonstrating:
  - Admission control via getDefaultReadLimit() and latestOffset()
  - Parallel partitioning (50 batches × 4 partitions = 200 tasks)
  - Trigger.AvailableNow for finite processing
  - SupportsTriggerAvailableNow mixin implementation

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@jiteshsoni jiteshsoni force-pushed the SPARK-55450-admission-control-docs branch 3 times, most recently from 67a457f to dd0f52f Compare March 16, 2026 21:51
@jiteshsoni jiteshsoni changed the title [SPARK-55450][SS][PYTHON][DOCS] Document admission control and Trigger.AvailableNow in PySpark streaming data sources [SPARK-55450][SS][PYTHON][DOCS] Document admission control in PySpark streaming data sources Mar 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants