pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: In PulsarKafkaProducer use flush() from pulsar API (#3549)
Date Mon, 11 Feb 2019 03:45:48 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 894146a  In PulsarKafkaProducer use flush() from pulsar API (#3549)
894146a is described below

commit 894146af259de25390906c0ae34e89451d55306a
Author: Matteo Merli <mmerli@apache.org>
AuthorDate: Sun Feb 10 19:45:36 2019 -0800

    In PulsarKafkaProducer use flush() from pulsar API (#3549)
---
 .../clients/producer/PulsarKafkaProducer.java      | 23 ++++++----------------
 1 file changed, 6 insertions(+), 17 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index b9fbd5e..3fd00c2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Metric;
@@ -66,9 +67,6 @@ public class PulsarKafkaProducer<K, V> implements Producer<K, V>
{
     private final Partitioner partitioner;
     private volatile Cluster cluster = Cluster.empty();
 
-    /** Map that contains the last future for each producer */
-    private final ConcurrentMap<String, CompletableFuture<MessageId>> lastSendFuture
= new ConcurrentHashMap<>();
-
     public PulsarKafkaProducer(Map<String, Object> configs) {
         this(configs, null, null);
     }
@@ -174,10 +172,7 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
         int messageSize = buildMessage(messageBuilder, record);;
 
         CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
-        CompletableFuture<MessageId> sendFuture = messageBuilder.sendAsync();
-        lastSendFuture.put(record.topic(), sendFuture);
-
-        sendFuture.thenAccept((messageId) -> {
+        messageBuilder.sendAsync().thenAccept((messageId) -> {
             future.complete(getRecordMetadata(record.topic(), messageBuilder, messageId,
messageSize));
         }).exceptionally(ex -> {
             future.completeExceptionally(ex);
@@ -197,16 +192,10 @@ public class PulsarKafkaProducer<K, V> implements Producer<K,
V> {
 
     @Override
     public void flush() {
-        lastSendFuture.forEach((topic, future) -> {
-            try {
-                future.get();
-            } catch (InterruptedException | ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-
-            // Remove the futures to remove eventually failed operations in order to trigger
errors only once
-            lastSendFuture.remove(topic, future);
-        });
+        producers.values().stream()
+                .map(p -> p.flushAsync())
+                .collect(Collectors.toList())
+                .forEach(CompletableFuture::join);
     }
 
     @Override


Mime
View raw message