pinot-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xian...@apache.org
Subject [incubator-pinot] 17/23: Refactor: get shard iterator methods
Date Sun, 03 Jan 2021 01:22:23 GMT
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 47b166408d40bcbe56e95f5d864113c2d7ee4bfe
Author: KKcorps <kharekartik@gmail.com>
AuthorDate: Mon Dec 21 14:25:25 2020 +0530

    Refactor: get shard iterator methods
---
 .../plugin/stream/kinesis/KinesisConsumer.java     | 25 ++++++++++++----------
 1 file changed, 14 insertions(+), 11 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index fd48a92..3263f87 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -163,21 +163,24 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
   }
 
   private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) {
-    GetShardIteratorResponse getShardIteratorResponse;
-
     if (kinesisStartCheckpoint.getSequenceNumber() != null) {
-      String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber();
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
-              .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER)
-              .startingSequenceNumber(kinesisStartSequenceNumber).build());
+      return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber());
     } else {
-      getShardIteratorResponse = _kinesisClient.getShardIterator(
-          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
-              .shardIteratorType(ShardIteratorType.LATEST).build());
+      return getShardIterator(ShardIteratorType.LATEST, null);
     }
+  }
 
-    return getShardIteratorResponse.shardIterator();
+  public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){
+    if(sequenceNumber == null){
+      return _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream)
+              .shardIteratorType(shardIteratorType).build()).shardIterator();
+    }else{
+      return _kinesisClient.getShardIterator(
+          GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId)
+              .shardIteratorType(shardIteratorType)
+              .startingSequenceNumber(sequenceNumber).build()).shardIterator();
+    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Mime
View raw message