pulsar-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mme...@apache.org
Subject [pulsar] branch master updated: Adding config auto.offset.reset to PulsarKafkaConsumer (#3273)
Date Sat, 05 Jan 2019 03:05:06 GMT
This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ef82f04  Adding config auto.offset.reset to PulsarKafkaConsumer (#3273)
ef82f04 is described below

commit ef82f045257ae839c0208c75b1b31ba0ad6693aa
Author: Richard Yu <yohan.richard.yu@gmail.com>
AuthorDate: Fri Jan 4 19:05:01 2019 -0800

    Adding config auto.offset.reset to PulsarKafkaConsumer (#3273)
    
    * Adding config auto.offset.reset to PulsarKafkaConsumer
    
    * Adding similar behavior to poll
    
    * Update pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
    
    Co-Authored-By: ConcurrencyPractitioner <yohan.richard.yu@gmail.com>
    
    * Adding some changes
    
    * Fixing indentation
    
    * Modifying postion method
    
    * Adding entry to compatibility matridx
    
    * Removing enum
    
    * Fixing indentation and policy
    
    * removing indents
    
    * Fixing indent
    
    * Adding log statements
    
    * Modifying seekToEnd behavior
    
    * Adding check
    
    * updating position
    
    * Reverting accidentaldeletion
---
 .../clients/consumer/PulsarKafkaConsumer.java      | 41 ++++++++++++++++++++--
 site2/docs/adaptors-kafka.md                       |  1 +
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
index cf1e716..5b5def2 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/consumer/PulsarKafkaConsumer.java
@@ -21,7 +21,9 @@ package org.apache.kafka.clients.consumer;
 import java.util.ArrayList;
 import java.util.Base64;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -53,6 +55,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -80,6 +83,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K, V>,
MessageListene
 
     private final Map<TopicPartition, Long> lastReceivedOffset = new ConcurrentHashMap<>();
     private final Map<TopicPartition, OffsetAndMetadata> lastCommittedOffset = new
ConcurrentHashMap<>();
+    private final Set<TopicPartition> unpolledPartitions = new HashSet<>();
+    private final SubscriptionInitialPosition strategy;
 
     private volatile boolean closed = false;
 
@@ -143,6 +148,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
 
         groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
         isAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        strategy = getStrategy(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+        log.info("Offset reset strategy has been assigned value {}", strategy);
 
         String serviceUrl = config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).get(0);
 
@@ -159,6 +166,15 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
         }
     }
 
+    private SubscriptionInitialPosition getStrategy(final String strategy) {
+        switch(strategy) {
+    	    case "earliest":
+    	        return SubscriptionInitialPosition.Earliest;
+    	    default:
+                return SubscriptionInitialPosition.Latest;
+    	}
+    }
+    
     @Override
     public void received(org.apache.pulsar.client.api.Consumer<byte[]> consumer, Message<byte[]>
msg) {
         // Block listener thread if the application is slowing down
@@ -243,7 +259,8 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
                     topicPartitions.add(tp);
                 }
             }
-
+            unpolledPartitions.addAll(topicPartitions);
+            
             // Wait for all consumers to be ready
             futures.forEach(CompletableFuture::join);
 
@@ -311,6 +328,10 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
                 long offset = MessageIdUtils.getOffset(msgId);
 
                 TopicPartition tp = new TopicPartition(topic, partition);
+                if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp))
{
+                	log.info("When polling offsets, invalid offsets were detected. Resetting
topic partition {}", tp);
+                	resetOffsets(tp);
+                }
 
                 K key = getKey(topic, msg);
                 V value = valueDeserializer.deserialize(topic, msg.getData());
@@ -331,6 +352,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
 
                 // Update last offset seen by application
                 lastReceivedOffset.put(tp, offset);
+                unpolledPartitions.remove(tp);
 
                 if (++numberOfRecords < MAX_RECORDS_IN_SINGLE_POLL) {
                     break;
@@ -452,7 +474,7 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
         }
         lastCommittedOffset.clear();
         lastReceivedOffset.clear();
-
+        
         for (TopicPartition tp : partitions) {
             org.apache.pulsar.client.api.Consumer<byte[]> c = consumers.get(tp);
             if (c == null) {
@@ -492,9 +514,22 @@ public class PulsarKafkaConsumer<K, V> implements Consumer<K,
V>, MessageListene
     @Override
     public long position(TopicPartition partition) {
         Long offset = lastReceivedOffset.get(partition);
-        return offset != null ? offset : -1l;
+        if (offset == null && !unpolledPartitions.contains(partition)) {
+            return resetOffsets(partition).getValue();
+        }
+        return unpolledPartitions.contains(partition) ? 0 : offset;
     }
 
+    private SubscriptionInitialPosition resetOffsets(final TopicPartition partition) {
+    	log.info("Resetting partition {} and seeking to {} position", partition, strategy);
+        if (strategy == SubscriptionInitialPosition.Earliest) {
+            seekToBeginning(Collections.singleton(partition));
+        } else {
+            seekToEnd(Collections.singleton(partition));
+        } 
+        return strategy;
+    }
+    
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
         return lastCommittedOffset.get(partition);
diff --git a/site2/docs/adaptors-kafka.md b/site2/docs/adaptors-kafka.md
index 36aa224..49ef3f3 100644
--- a/site2/docs/adaptors-kafka.md
+++ b/site2/docs/adaptors-kafka.md
@@ -129,6 +129,7 @@ Properties:
 | Config property                         | Supported | Notes                           
                                             |
 |:----------------------------------------|:----------|:------------------------------------------------------------------------------|
 | `acks`                                  | Ignored   | Durability and quorum writes are
configured at the namespace level            |
+| `auto.offset.reset`			  | Yes       | Will have a default value of 'latest' if user does
not give specific setting. |
 | `batch.size`                            | Ignored   |                                 
                                             |
 | `block.on.buffer.full`                  | Yes       | If true it will block producer, otherwise
give error                          |
 | `bootstrap.servers`                     | Yes       | Needs to point to a single Pulsar
service URL                                 |


Mime
View raw message