pinot-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xian...@apache.org
Subject [incubator-pinot] 06/23: Refactor kinesis shard metadata interface and add shardId to the metadata
Date Sun, 03 Jan 2021 02:17:30 GMT
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 86fd01428f4ccd9abac3ceefb81c66e3eaf40d40
Author: KKcorps <kharekartik@gmail.com>
AuthorDate: Fri Dec 11 23:57:29 2020 +0530

    Refactor kinesis shard metadata interface and add shardId to the metadata
---
 .../kinesis/KinesisPartitionGroupMetadataMap.java    | 20 +++++++++++++-------
 .../plugin/stream/kinesis/KinesisShardMetadata.java  |  6 ++++++
 2 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index bc3fef2..87f7235 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -1,8 +1,7 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -11,7 +10,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
 public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements
PartitionGroupMetadataMap {
-  private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new
HashMap<>();
+  private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new
ArrayList<>();
 
   public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
     super(awsRegion);
@@ -20,12 +19,19 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler
i
     for(Shard shard : shardList){
       String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
       KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
-      shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
-      _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+      shardMetadata.setStartCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+      _stringPartitionGroupMetadataIndex.add(shardMetadata);
     }
   }
 
-  public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
-      return _stringPartitionGroupMetadataMap;
+  @Override
+  public List<PartitionGroupMetadata> getMetadataList() {
+    return _stringPartitionGroupMetadataIndex;
   }
+
+  @Override
+  public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
+    return _stringPartitionGroupMetadataIndex.get(index);
+  }
+
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index d50d821..4a19285 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -8,6 +8,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata
{
+  String _shardId;
   Checkpoint _startCheckpoint;
   Checkpoint _endCheckpoint;
 
@@ -16,6 +17,11 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements
Pa
         ShardIteratorType.LATEST).streamName(streamName).build());
     _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
     _endCheckpoint = null;
+    _shardId = shardId;
+  }
+
+  public String getShardId() {
+    return _shardId;
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Mime
View raw message