pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [pulsar] branch branch-2.4 updated: Revert "[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed (#5603)" (#5733)
Date Mon, 25 Nov 2019 09:20:29 GMT
This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 811dd45  Revert "[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata
failed (#5603)" (#5733)
811dd45 is described below

commit 811dd45e87600d7163c8f74b40d377831f64bbfb
Author: 冉小龙 <rxl@apache.org>
AuthorDate: Mon Nov 25 17:20:18 2019 +0800

    Revert "[Issue 5597][pulsar-client-java] retry when getPartitionedTopicMetadata failed
(#5603)" (#5733)
    
    This reverts commit ee11e100d8a05296f1ddf0da6c4e52f63ca02294.
---
 .../stats/client/PulsarBrokerStatsClientTest.java  | 63 ----------------------
 .../org/apache/pulsar/client/impl/HttpClient.java  |  6 +--
 .../pulsar/client/impl/PulsarClientImpl.java       | 32 +----------
 3 files changed, 4 insertions(+), 97 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
index 00d95d1..fca1485 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java
@@ -31,8 +31,6 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
 import org.slf4j.Logger;
@@ -49,7 +47,6 @@ import java.util.concurrent.TimeUnit;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
 
 public class PulsarBrokerStatsClientTest extends ProducerConsumerBase {
 
@@ -135,65 +132,5 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase
{
         log.info("-- Exiting {} test --", methodName);
     }
 
-    @Test
-    public void testGetPartitionedTopicMetaData() throws Exception {
-        log.info("-- Starting {} test --", methodName);
-
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
-        final String subscriptionName = "my-subscriber-name";
-
-
-
-        try {
-            String url = "http://localhost:51000,localhost:" + BROKER_WEBSERVICE_PORT;
-            if (isTcpLookup) {
-                url = "pulsar://localhost:51000,localhost:" + BROKER_PORT;
-            }
-            PulsarClient client = newPulsarClient(url, 0);
-
-            Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName)
-                    .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
-            Producer<byte[]> producer = client.newProducer().topic(topicName).create();
-
-            consumer.close();
-            producer.close();
-            client.close();
-        } catch (PulsarClientException pce) {
-            log.error("create producer or consumer error: ", pce);
-            fail();
-        }
-
-        log.info("-- Exiting {} test --", methodName);
-    }
-
-    @Test (timeOut = 4000)
-    public void testGetPartitionedTopicDataTimeout() {
-        log.info("-- Starting {} test --", methodName);
-
-        final String topicName = "persistent://my-property/my-ns/my-topic1";
-
-        String url = "http://localhost:51000,localhost:51001";
-        if (isTcpLookup) {
-            url = "pulsar://localhost:51000,localhost:51001";
-        }
-
-        PulsarClient client;
-        try {
-            client = PulsarClient.builder()
-                    .serviceUrl(url)
-                    .statsInterval(0, TimeUnit.SECONDS)
-                    .operationTimeout(3, TimeUnit.SECONDS)
-                    .build();
-
-            Producer<byte[]> producer = client.newProducer().topic(topicName).create();
-
-            fail();
-        } catch (PulsarClientException pce) {
-            log.error("create producer error: ", pce);
-        }
-
-        log.info("-- Exiting {} test --", methodName);
-    }
-
     private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 845c741..96c62d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
 import java.io.Closeable;
 import java.io.IOException;
 import java.net.HttpURLConnection;
-import java.net.URI;
 import java.net.URL;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -128,9 +127,8 @@ public class HttpClient implements Closeable {
     public <T> CompletableFuture<T> get(String path, Class<T> clazz) {
         final CompletableFuture<T> future = new CompletableFuture<>();
         try {
-            URI hostUri = serviceNameResolver.resolveHostUri();
-            String requestUrl = new URL(hostUri.toURL(), path).toString();
-            String remoteHostName = hostUri.getHost();
+            String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString();
+            String remoteHostName = serviceNameResolver.resolveHostUri().getHost();
             AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName);
 
             CompletableFuture<Map<String, String>>  authFuture = new CompletableFuture<>();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index abad54b..53468d3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -641,45 +641,17 @@ public class PulsarClientImpl implements PulsarClient {
 
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(String
topic) {
 
-        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
+        CompletableFuture<PartitionedTopicMetadata> metadataFuture;
 
         try {
             TopicName topicName = TopicName.get(topic);
-            AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs());
-            Backoff backoff = new BackoffBuilder()
-                    .setInitialTime(100, TimeUnit.MILLISECONDS)
-                    .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
-                    .setMax(0, TimeUnit.MILLISECONDS)
-                    .useUserConfiguredIntervals(conf.getDefaultBackoffIntervalNanos(),
-                            conf.getMaxBackoffIntervalNanos())
-                    .create();
-            getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture);
+            metadataFuture = lookup.getPartitionedTopicMetadata(topicName);
         } catch (IllegalArgumentException e) {
             return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage()));
         }
         return metadataFuture;
     }
 
-    private void getPartitionedTopicMetadata(TopicName topicName,
-                                             Backoff backoff,
-                                             AtomicLong remainingTime,
-                                             CompletableFuture<PartitionedTopicMetadata>
future) {
-        lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e
-> {
-            long nextDelay = Math.min(backoff.next(), remainingTime.get());
-            if (nextDelay <= 0) {
-                future.completeExceptionally(new PulsarClientException
-                        .TimeoutException("Could not getPartitionedTopicMetadata within configured
timeout."));
-                return null;
-            }
-
-            timer.newTimeout( task -> {
-                remainingTime.addAndGet(-nextDelay);
-                getPartitionedTopicMetadata(topicName, backoff, remainingTime, future);
-            }, nextDelay, TimeUnit.MILLISECONDS);
-            return null;
-        });
-    }
-
     @Override
     public CompletableFuture<List<String>> getPartitionsForTopic(String topic)
{
         return getPartitionedTopicMetadata(topic).thenApply(metadata -> {


Mime
View raw message