-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][broker] PIP-451: Support Label-based Topic Subscription #25113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
shibd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I support pushing this feature forward. It effectively simplifies the pattern subscription issue and provides a multi-dimensional approach to topic management.
However, there are some implementation details that require further discussion.
|
|
||
| ```java | ||
| // Map<Namespace, Map<LabelKey, Map<LabelValue, Set<TopicName>>>> | ||
| Map<String, Map<String, Map<String, Set<String>>>> labelTopicInvertedIndex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SystemTopicBasedTopicPoliciesService constructs the cache for a specific namespace based on the assigned namespace bundles.
In production scenarios, the bundles of a single namespace are highly likely to be evenly distributed across all brokers. This implies that every broker will effectively maintain a full copy of the this cache. I am concerned about the potential memory footprint in this scenario
| public void onLabelsUpdate(TopicName topicName, @Nullable Map<String, String> allLabels) { | ||
| // allLabels can be null if the topic is deleted | ||
| boolean wasMatching = matchedTopics.contains(topicName); | ||
| boolean matchesNow = allLabels != null && labelMatcher.matches(allLabels); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does the data structure of labelMatcher look like? Could you provide an example?
| * Multi-Watcher Orchestration: | ||
|
|
||
| When `subscribe()` is called, the Client iterates over the provided namespaces. | ||
| For each namespace, it initiates a CommandWatchTopicList request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If can provide a exmaples for user side, that be better.
| * @param namespaces The set of namespaces to look for topics. | ||
| * If empty, defaults to the current consumer's namespace. | ||
| */ | ||
| ConsumerBuilder<T> topicsByLabel(Map<String, String> labels, Set<String> namespaces); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This input parameter cannot represent specific labels for a specific namespace. Perhaps we need to wrap the parameter as:
List {
String namespace,
Map<String, String> labels
}
|
|
||
| ### Topic Labels Observation and Notification Mechanism | ||
|
|
||
| ### TopicPoliciesService Interface Changes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| ### TopicPoliciesService Interface Changes | |
| #### TopicPoliciesService Interface Changes |
|
|
||
| #### Query topic associated with specific labels: | ||
|
|
||
| * CLI: pulsar-admin topics list <namespace> --custome-labels "k1:v1,k2:v2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering how we should implement this feature, since not every broker maintains a full labelTopicInvertedIndex cache. In the REST API implementation, we would first need to determine which broker is assigned to any bundle of that namespace, and then send the request there.
Is this lookup operation easy to achieve?
Fixes #xyz
Main Issue: #xyz
PIP: #xyz
Motivation
Modifications
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: