pinot-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xian...@apache.org
Subject [incubator-pinot] 23/23: fixing compilation
Date Sun, 03 Jan 2021 02:17:47 GMT
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 6240808f4fc098a162da42722cfcb22d43850e87
Author: Xiang Fu <fx19880617@gmail.com>
AuthorDate: Sat Jan 2 17:14:31 2021 -0800

    fixing compilation
---
 pinot-distribution/pinot-assembly.xml              |  4 ++
 pinot-distribution/pom.xml                         |  4 ++
 .../pinot-stream-ingestion/pinot-kinesis/pom.xml   | 64 ++++++++++++++++++++--
 .../plugin/stream/kinesis/KinesisCheckpoint.java   |  1 +
 .../pinot/plugin/stream/kinesis/KinesisConfig.java | 23 ++++----
 .../stream/kinesis/KinesisConnectionHandler.java   | 26 +++------
 .../plugin/stream/kinesis/KinesisConsumer.java     | 50 +++++++----------
 .../stream/kinesis/KinesisConsumerFactory.java     |  4 +-
 .../plugin/stream/kinesis/KinesisFetchResult.java  |  3 -
 .../kinesis/KinesisPartitionGroupMetadataMap.java  |  7 +--
 .../plugin/stream/kinesis/KinesisRecordsBatch.java | 18 ++++++
 .../stream/kinesis/KinesisShardMetadata.java       | 13 ++---
 .../plugin/stream/kinesis/KinesisConsumerTest.java | 39 +++++++------
 13 files changed, 152 insertions(+), 104 deletions(-)

diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml
index 2dfb36e..de7329f 100644
--- a/pinot-distribution/pinot-assembly.xml
+++ b/pinot-distribution/pinot-assembly.xml
@@ -55,6 +55,10 @@
       <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/target/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</source>
       <destName>plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/pinot-kafka-${kafka.version}-${project.version}-shaded.jar</destName>
     </file>
+    <file>
+      <source>${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/target/pinot-kinesis-${project.version}-shaded.jar</source>
+      <destName>plugins/pinot-stream-ingestion/pinot-kinesis/pinot-kinesis-${project.version}-shaded.jar</destName>
+    </file>
     <!-- End Include Pinot Stream Ingestion Plugins-->
     <!-- Start Include Pinot Batch Ingestion Plugins-->
     <file>
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index 1a3f106..f29cae0 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -86,6 +86,10 @@
         </exclusion>
         <exclusion>
           <groupId>org.apache.pinot</groupId>
+          <artifactId>pinot-kinesis</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.pinot</groupId>
           <artifactId>pinot-batch-ingestion-standalone</artifactId>
         </exclusion>
         <exclusion>
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 0c9ae0b..4fce169 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -19,19 +19,20 @@
     under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
   <parent>
     <artifactId>pinot-stream-ingestion</artifactId>
     <groupId>org.apache.pinot</groupId>
     <version>0.7.0-SNAPSHOT</version>
     <relativePath>..</relativePath>
   </parent>
-  <modelVersion>4.0.0</modelVersion>
 
   <artifactId>pinot-kinesis</artifactId>
-
+  <name>Pinot Kinesis</name>
+  <url>https://pinot.apache.org/</url>
   <properties>
     <pinot.root>${basedir}/../../..</pinot.root>
     <phase.prop>package</phase.prop>
@@ -43,6 +44,32 @@
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>kinesis</artifactId>
       <version>${aws.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.reactivestreams</groupId>
+          <artifactId>reactive-streams</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-transport</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-common</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
     <dependency>
@@ -52,8 +79,33 @@
     </dependency>
 
     <dependency>
-      <groupId>org.apache.pinot</groupId>
-      <artifactId>pinot-spi</artifactId>
+      <groupId>org.reactivestreams</groupId>
+      <artifactId>reactive-streams</artifactId>
+      <version>1.0.2</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec</artifactId>
+      <version>4.1.42.Final</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>4.1.42.Final</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+      <version>4.1.42.Final</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-common</artifactId>
+      <version>4.1.42.Final</version>
     </dependency>
   </dependencies>
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
index 54e26d0..f3a7a49 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java
@@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 
+
 public class KinesisCheckpoint implements Checkpoint {
   String _sequenceNumber;
   Boolean _isEndOfPartition = false;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
index 82fc438..529f34f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java
@@ -24,16 +24,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConfig {
-  private final Map<String, String> _props;
-
   public static final String STREAM = "stream";
-  private static final String AWS_REGION = "aws-region";
-  private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
   public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type";
-
-  private static final String DEFAULT_AWS_REGION = "us-central-1";
-  private static final String DEFAULT_MAX_RECORDS = "20";
-  private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST";
+  public static final String AWS_REGION = "aws-region";
+  public static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch";
+  public static final String DEFAULT_AWS_REGION = "us-central-1";
+  public static final String DEFAULT_MAX_RECORDS = "20";
+  public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString();
+  private final Map<String, String> _props;
 
   public KinesisConfig(StreamConfig streamConfig) {
     _props = streamConfig.getStreamConfigsMap();
@@ -43,20 +41,19 @@ public class KinesisConfig {
     _props = props;
   }
 
-  public String getStream(){
+  public String getStream() {
     return _props.get(STREAM);
   }
 
-  public String getAwsRegion(){
+  public String getAwsRegion() {
     return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION);
   }
 
-  public Integer maxRecordsToFetch(){
+  public Integer maxRecordsToFetch() {
     return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS));
   }
 
-  public ShardIteratorType getShardIteratorType(){
+  public ShardIteratorType getShardIteratorType() {
     return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE));
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index 0cf4787..4d968f6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -19,28 +19,18 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.List;
-import org.apache.pinot.spi.stream.StreamConfig;
-import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
-import software.amazon.awssdk.core.SdkBytes;
 import software.amazon.awssdk.regions.Region;
 import software.amazon.awssdk.services.kinesis.KinesisClient;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
-import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
-import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
 import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
 import software.amazon.awssdk.services.kinesis.model.Shard;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
-import software.amazon.awssdk.services.kinesis.model.StreamDescription;
 
 
 public class KinesisConnectionHandler {
+  KinesisClient _kinesisClient;
   private String _stream;
   private String _awsRegion;
-  KinesisClient _kinesisClient;
 
   public KinesisConnectionHandler() {
 
@@ -58,18 +48,18 @@ public class KinesisConnectionHandler {
     return listShardsResponse.shards();
   }
 
-  public void createConnection(){
-    if(_kinesisClient == null) {
-      _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
-          .build();
+  public void createConnection() {
+    if (_kinesisClient == null) {
+      _kinesisClient =
+          KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+              .build();
     }
   }
 
-  public void close(){
-    if(_kinesisClient != null) {
+  public void close() {
+    if (_kinesisClient != null) {
       _kinesisClient.close();
       _kinesisClient = null;
     }
   }
-
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 336468a..fb414f0 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -19,18 +19,13 @@
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
-import org.apache.pinot.spi.stream.v2.FetchResult;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +33,6 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
 import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
 import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
 import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException;
 import software.amazon.awssdk.services.kinesis.model.KinesisException;
 import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException;
@@ -46,13 +40,14 @@ import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
 import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
+
 public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 {
+  private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
   String _stream;
   Integer _maxRecords;
   String _shardId;
   ExecutorService _executorService;
   ShardIteratorType _shardIteratorType;
-  private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class);
 
   public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata)
{
     super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion());
@@ -67,12 +62,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
   @Override
   public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) {
     List<Record> recordList = new ArrayList<>();
-    Future<KinesisFetchResult> kinesisFetchResultFuture = _executorService.submit(()
-> getResult(start, end, recordList));
+    Future<KinesisFetchResult> kinesisFetchResultFuture =
+        _executorService.submit(() -> getResult(start, end, recordList));
 
     try {
       return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS);
-    } catch(Exception e){
-        return handleException((KinesisCheckpoint) start, recordList);
+    } catch (Exception e) {
+      return handleException((KinesisCheckpoint) start, recordList);
     }
   }
 
@@ -81,7 +77,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
 
     try {
 
-      if(_kinesisClient == null){
+      if (_kinesisClient == null) {
         createConnection();
       }
 
@@ -105,7 +101,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
           recordList.addAll(getRecordsResponse.records());
           nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
 
-          if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size()
- 1).sequenceNumber()) <= 0) {
+          if (kinesisEndSequenceNumber != null
+              && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size()
- 1).sequenceNumber()) <= 0) {
             nextStartSequenceNumber = kinesisEndSequenceNumber;
             break;
           }
@@ -115,14 +112,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
           }
         }
 
-        if(getRecordsResponse.hasChildShards()){
+        if (getRecordsResponse.hasChildShards()) {
           //This statement returns true only when end of current shard has reached.
           isEndOfShard = true;
           break;
         }
 
         shardIterator = getRecordsResponse.nextShardIterator();
-
       }
 
       if (nextStartSequenceNumber == null && recordList.size() > 0) {
@@ -133,28 +129,20 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
       KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList);
 
       return kinesisFetchResult;
-    }catch (ProvisionedThroughputExceededException e) {
-      LOG.warn(
-          "The request rate for the stream is too high"
-      , e);
+    } catch (ProvisionedThroughputExceededException e) {
+      LOG.warn("The request rate for the stream is too high", e);
       return handleException(kinesisStartCheckpoint, recordList);
-    }
-    catch (ExpiredIteratorException e) {
-      LOG.warn(
-          "ShardIterator expired while trying to fetch records",e
-      );
+    } catch (ExpiredIteratorException e) {
+      LOG.warn("ShardIterator expired while trying to fetch records", e);
       return handleException(kinesisStartCheckpoint, recordList);
-    }
-    catch (ResourceNotFoundException | InvalidArgumentException e) {
+    } catch (ResourceNotFoundException | InvalidArgumentException e) {
       // aws errors
       LOG.error("Encountered AWS error while attempting to fetch records", e);
       return handleException(kinesisStartCheckpoint, recordList);
-    }
-    catch (KinesisException e) {
+    } catch (KinesisException e) {
       LOG.warn("Encountered unknown unrecoverable AWS exception", e);
       throw new RuntimeException(e);
-    }
-    catch (Throwable e) {
+    } catch (Throwable e) {
       // non transient errors
       LOG.error("Unknown fetchRecords exception", e);
       throw new RuntimeException(e);
@@ -162,11 +150,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
   }
 
   private KinesisFetchResult handleException(KinesisCheckpoint start, List<Record>
recordList) {
-    if(recordList.size() > 0){
+    if (recordList.size() > 0) {
       String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber();
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber);
       return new KinesisFetchResult(kinesisCheckpoint, recordList);
-    }else{
+    } else {
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber());
       return new KinesisFetchResult(kinesisCheckpoint, recordList);
     }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
index acac1fb..9bb4d0c 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.Map;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.v2.ConsumerV2;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
@@ -38,7 +37,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 {
   @Override
   public PartitionGroupMetadataMap getPartitionGroupsMetadata(
       PartitionGroupMetadataMap currentPartitionGroupsMetadata) {
-    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(),
currentPartitionGroupsMetadata);
+    return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(),
+        currentPartitionGroupsMetadata);
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
index 39561f3..8da3d2e 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java
@@ -18,10 +18,7 @@
  */
 package org.apache.pinot.plugin.stream.kinesis;
 
-import java.util.ArrayList;
 import java.util.List;
-import org.apache.pinot.spi.stream.MessageBatch;
-import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.FetchResult;
 import software.amazon.awssdk.services.kinesis.model.Record;
 
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
index 626c8ea..f96533f 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java
@@ -22,12 +22,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap;
-import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
-import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
 import software.amazon.awssdk.services.kinesis.model.Shard;
 
 
@@ -56,7 +52,8 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler
i
         //Return existing shard metadata
         _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId()));
       } else if (currentMetadataMap.containsKey(shard.parentShardId())) {
-        KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
+        KinesisShardMetadata kinesisShardMetadata =
+            (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId());
         if (isProcessingFinished(kinesisShardMetadata)) {
           //Add child shards for processing since parent has finished
           appendShardMetadata(stream, awsRegion, shard);
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
index ed51f8f..04bf4e6 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
 package org.apache.pinot.plugin.stream.kinesis;
 
 import java.util.List;
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
index 1d753c3..e24121b 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java
@@ -20,10 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis;
 
 import org.apache.pinot.spi.stream.v2.Checkpoint;
 import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
-import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
-import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
-import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
 
 //TODO: Implement shardId as Array and have unique id
 public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata
{
@@ -48,13 +45,13 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements
Pa
   }
 
   @Override
-  public KinesisCheckpoint getEndCheckpoint() {
-    return _endCheckpoint;
+  public void setStartCheckpoint(Checkpoint startCheckpoint) {
+    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
   }
 
   @Override
-  public void setStartCheckpoint(Checkpoint startCheckpoint) {
-    _startCheckpoint = (KinesisCheckpoint) startCheckpoint;
+  public KinesisCheckpoint getEndCheckpoint() {
+    return _endCheckpoint;
   }
 
   @Override
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
index 6f660f7..f853875 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java
@@ -20,40 +20,43 @@ package org.apache.pinot.plugin.stream.kinesis; /**
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import software.amazon.awssdk.services.kinesis.model.Record;
 import software.amazon.awssdk.services.kinesis.model.Shard;
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 
 
 public class KinesisConsumerTest {
+
+  private static final String STREAM_NAME = "kinesis-test";
+  private static final String AWS_REGION = "us-west-2";
+
   public static void main(String[] args) {
     Map<String, String> props = new HashMap<>();
-    props.put("stream", "kinesis-test");
-    props.put("aws-region", "us-west-2");
-    props.put("max-records-to-fetch", "2000");
-    props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER");
-
+    props.put(KinesisConfig.STREAM, STREAM_NAME);
+    props.put(KinesisConfig.AWS_REGION, AWS_REGION);
+    props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10");
+    props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString());
     KinesisConfig kinesisConfig = new KinesisConfig(props);
-
-    KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test",
"us-west-2");
-
+    KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME,
AWS_REGION);
     List<Shard> shardList = kinesisConnectionHandler.getShards();
-
-    for(Shard shard : shardList) {
+    for (Shard shard : shardList) {
       System.out.println("SHARD: " + shard.shardId());
 
-      KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(),
"kinesis-test", "us-west-2"));
-
+      KinesisConsumer kinesisConsumer =
+          new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME,
AWS_REGION));
+      System.out.println(
+          "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber()
+ ", " + shard
+              .sequenceNumberRange().endingSequenceNumber() + " >");
       KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber());
-      KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 *
10 * 1000L);
-
+      KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60
* 1000L);
       KinesisRecordsBatch list = fetchResult.getMessages();
       int n = list.getMessageCount();
 
-      for (int i=0;i<n;i++) {
+      System.out.println("Found " + n + " messages ");
+      for (int i = 0; i < n; i++) {
         System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i));
       }
+      kinesisConsumer.close();
     }
+    kinesisConnectionHandler.close();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Mime
View raw message