This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 86fd01428f4ccd9abac3ceefb81c66e3eaf40d40
Author: KKcorps <kharekartik@gmail.com>
AuthorDate: Fri Dec 11 23:57:29 2020 +0530
Refactor kinesis shard metadata interface and add shardId to the metadata
---
.../kinesis/KinesisPartitionGroupMetadataMap.java | 20 +++++++++++++-------
.../plugin/stream/kinesis/KinesisShardMetadata.java | 6 ++++++
2 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index bc3fef2..87f7235 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -1,8 +1,7 @@
package org.apache.pinot.plugin.stream.kinesis;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@@ -11,7 +10,7 @@ import software.amazon.awssdk.services.kinesis.model.Shard;
public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements
PartitionGroupMetadataMap {
- private Map<String, PartitionGroupMetadata> _stringPartitionGroupMetadataMap = new
HashMap<>();
+ private final List<PartitionGroupMetadata> _stringPartitionGroupMetadataIndex = new
ArrayList<>();
public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){
super(awsRegion);
@@ -20,12 +19,19 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler
i
for(Shard shard : shardList){
String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber();
KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream);
- shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
- _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata);
+ shardMetadata.setStartCheckpoint(new KinesisCheckpoint(endingSequenceNumber));
+ _stringPartitionGroupMetadataIndex.add(shardMetadata);
}
}
- public Map<String, PartitionGroupMetadata> getPartitionMetadata(){
- return _stringPartitionGroupMetadataMap;
+ @Override
+ public List<PartitionGroupMetadata> getMetadataList() {
+ return _stringPartitionGroupMetadataIndex;
}
+
+ @Override
+ public PartitionGroupMetadata getPartitionGroupMetadata(int index) {
+ return _stringPartitionGroupMetadataIndex.get(index);
+ }
+
}
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index d50d821..4a19285 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -8,6 +8,7 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata
{
+ String _shardId;
Checkpoint _startCheckpoint;
Checkpoint _endCheckpoint;
@@ -16,6 +17,11 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements
Pa
ShardIteratorType.LATEST).streamName(streamName).build());
_startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator());
_endCheckpoint = null;
+ _shardId = shardId;
+ }
+
+ public String getShardId() {
+ return _shardId;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org
|