pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [pulsar.wiki] branch master updated: Created PIP 35: Improve topic lookup for topics that have high number of partitions (markdown)
Date Thu, 11 Apr 2019 06:17:12 GMT
This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.wiki.git


The following commit(s) were added to refs/heads/master by this push:
     new dd7aedf  Created PIP 35: Improve topic lookup for topics that have high number of
partitions (markdown)
dd7aedf is described below

commit dd7aedf3a304529c0fd2637fdc0022dfec288ea8
Author: Sijie Guo <guosijie@gmail.com>
AuthorDate: Thu Apr 11 14:17:10 2019 +0800

    Created PIP 35: Improve topic lookup for topics that have high number of partitions (markdown)
---
 ...r-topics-that-have-high-number-of-partitions.md | 112 +++++++++++++++++++++
 1 file changed, 112 insertions(+)

diff --git a/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
b/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
new file mode 100644
index 0000000..09ba62d
--- /dev/null
+++ b/PIP-35:-Improve-topic-lookup-for-topics-that-have-high-number-of-partitions.md
@@ -0,0 +1,112 @@
+ * **Status**: Proposal
+ * **Author**: Zhengxin Cai
+ * **Pull Request**: 
+ * **Mailing List discussion**: http://mail-archives.apache.org/mod_mbox/pulsar-dev/201904.mbox/%3CCAHQi0xemFAGC6P9e240-8Ho0z4qtUv0uSBLtHfSs5ZeBDJXEug%40mail.gmail.com%3E
+
+## Motivation
+
+As this [github issue](https://github.com/apache/pulsar/issues/1088) brings up, for a partitioned
topic, client has to send LookUp request for each partition. Here we propose a solution to
make lookup easier for partitioned topic.
+
+We could either make current LookUp(getBroker) api smarter and it will check if topic is
a partitioned topic or not. If it’s a normal topic just return normal LookUp response which
will be Pair<InetSocketAddress, InetSocketAddress>, if it’s a partitioned topic then
it will return 
+List<Pair<InetSocketAddress, InetSocketAddress>> contains all brokers that serving
that partitioned topic.
+
+Or we could create a new BatchLookUp api, like getBrokers. It can take a list of topic as
input, where each topic has to be either a non partitioned topic or a partition of a partitioned
topic and it’ll return Map<TopicName, Pair<InetSocketAddress, InetSocketAddress>>
where it’s a map of input topic to corresponding address.
+
+Prefer to go with second approach as it not only support lookup for all partitions of partitioned
topic, it also support bulk lookup for many normal topics in one request.
+
+## Proposal
+
+In TopicLookupBase is where the core lookup logic will happens. 
+Add new lookupTopicBatchAsync,  within the method, after validation, for each topic, we’ll
fist find out the bundle it belong, then invoke NameSpaceService.findBrokerServiceUrl for
each bundle and create a Map<Bundle , List<TopicName>>.
+Then iterate through map and create response for each topicname.
+
+In LookupService, when findBrokers invoke newBatchLookup,  it’ll get a response of a list<lookupresponse>,
where some of response contains redirect, for these response will invoke findBrokers again
to contact correct cluster. Combine the result of 2 invocation to get a final result.
+
+Here’s proposed change to protocal buffer definition and apis:
+
+*PulsarApi.proto*
+
+```
+	Add 
+	Type enum:
+		BATCH_LOOKUP           = 23;
+		BATCH_LOOKUP_RESPONSE  = 24;
+
+	BaseBommand:
+		optional CommandBatchLookupTopic lookupTopic                    = 23;
+		optional CommandBatchLookupTopicResponse lookupTopicResponse    = 24;
+
+	CommandBatchLookupTopic:
+	{
+		repeated string topics           = 1;
+		required uint64 request_id       = 2;
+	}
+
+	CommandTopicResponse:
+	{	
+		repeated string topicname               = 10;
+	}
+```
+
+
+
+- Commands.java
+ 	- Add newBatchLookUp
+
+- ClientCnx.java
+	- Add newBatchLookup/handleBatchLookUpResponse
+
+- ServiceCnx.java
+	- Add handleBatchLookup
+
+- LookupProxyHandler.java   
+	- Add handleBatchLookup
+
+- ProxyConnection.java
+	- Add handleBatchLookup
+
+- PulsarDecoder.java
+	- Handle batchLookup
+
+- Add BatchLookupResult.java
+
+- TopicLookupBase.java
+	- Add lookupTopicBatchAsync
+```
+List<CompletableFuture<ByteBuf>> lookupTopicBatchAsync(list<topics> topics)
{
+	List<CompletableFuture> resultFutures;
+List<CompletableFuture> futures;
+	Map<Bundle.toString, List<Topic> map;
+            topics.foreach((topic) -> {
+CompletableFuture validateFuture;
+            CompletableFuture getNameSpaceBundleFuture;
+CompletableFuture lookupFuture;
+
+getClusterDataIfDifferentCluster.thenAccept(validateFuture.complete)
+	validateFuture.thenAccept(validaitonFailureResponse -> {
+		if(validaitonFailureResponse != null)
+			lookupFuture.complete(validaitonFailureResponse)
+pulsarService.getNamespaceService().getBundleAsync().
+thenAccept(bundle -> map.get(bundle).add(requestId))
+
+getNameSpaceBundleFuture.complete())
+
+futures.add(getNameSpaceBundleFuture)
+}}))
+FutureUtil.waitForAll(futures).then(map.entry.foreach((entry) -> {
+ 	entry.key.findBrokerServiceUrl().thenAccept(lookupResult -> {
+//Response will be address with success or redirect flag, and a list of topicnames in lookup
request that belong to this bundle
+		lookupFuture.complete(newLookupResponse()))
+		resultFutures.add(lookupFuture)
+})
+}))
+
+return resultFutures;
+}
+```
+
+- TopicLookup.java(v2)
+	- Add lookupTopicBatchAsync
+
+- BinaryProto/HttpLookupService.java
+	- Add getBrokers/findBrokers


Mime
View raw message