Skip to content

Conversation

@coderzc
Copy link
Member

@coderzc coderzc commented Dec 24, 2025

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Modifications

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added PIP doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. labels Dec 24, 2025
@coderzc coderzc changed the title PIP-451: Support label-based topic subscription [feat][broker] PIP-451: Support label-based topic subscription Dec 24, 2025
@coderzc coderzc changed the title [feat][broker] PIP-451: Support label-based topic subscription [feat][broker] PIP-451: Support Label-based Topic Subscription Dec 24, 2025
Copy link
Member

@shibd shibd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I support pushing this feature forward. It effectively simplifies the pattern subscription issue and provides a multi-dimensional approach to topic management.

However, there are some implementation details that require further discussion.


```java
// Map<Namespace, Map<LabelKey, Map<LabelValue, Set<TopicName>>>>
Map<String, Map<String, Map<String, Set<String>>>> labelTopicInvertedIndex;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SystemTopicBasedTopicPoliciesService constructs the cache for a specific namespace based on the assigned namespace bundles.

In production scenarios, the bundles of a single namespace are highly likely to be evenly distributed across all brokers. This implies that every broker will effectively maintain a full copy of the this cache. I am concerned about the potential memory footprint in this scenario

public void onLabelsUpdate(TopicName topicName, @Nullable Map<String, String> allLabels) {
// allLabels can be null if the topic is deleted
boolean wasMatching = matchedTopics.contains(topicName);
boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does the data structure of labelMatcher look like? Could you provide an example?

* Multi-Watcher Orchestration:

When `subscribe()` is called, the Client iterates over the provided namespaces.
For each namespace, it initiates a CommandWatchTopicList request.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If can provide a exmaples for user side, that be better.

* @param namespaces The set of namespaces to look for topics.
* If empty, defaults to the current consumer's namespace.
*/
ConsumerBuilder<T> topicsByLabel(Map<String, String> labels, Set<String> namespaces);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This input parameter cannot represent specific labels for a specific namespace. Perhaps we need to wrap the parameter as:

List {
   String namespace,
   Map<String, String> labels
}


### Topic Labels Observation and Notification Mechanism

### TopicPoliciesService Interface Changes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
### TopicPoliciesService Interface Changes
#### TopicPoliciesService Interface Changes


#### Query topic associated with specific labels:

* CLI: pulsar-admin topics list <namespace> --custome-labels "k1:v1,k2:v2"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how we should implement this feature, since not every broker maintains a full labelTopicInvertedIndex cache. In the REST API implementation, we would first need to determine which broker is assigned to any bundle of that namespace, and then send the request there.

Is this lookup operation easy to achieve?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc Your PR contains doc changes, no matter whether the changes are in markdown or code files. PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants