From commits-return-28260-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 02:17:33 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id DFA651807A1 for ; Sun, 3 Jan 2021 03:17:30 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 3ABBD65244 for ; Sun, 3 Jan 2021 02:17:30 +0000 (UTC) Received: (qmail 10844 invoked by uid 500); 3 Jan 2021 02:17:29 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 10835 invoked by uid 99); 3 Jan 2021 02:17:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 02:17:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 72C2F81F9F; Sun, 3 Jan 2021 02:17:29 +0000 (UTC) Date: Sun, 03 Jan 2021 02:17:47 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 23/23: fixing compilation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160964024402.15888.3151012730775503781@gitbox.apache.org> References: <160964024402.15888.3151012730775503781@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: 6240808f4fc098a162da42722cfcb22d43850e87 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103021729.72C2F81F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit 6240808f4fc098a162da42722cfcb22d43850e87 Author: Xiang Fu AuthorDate: Sat Jan 2 17:14:31 2021 -0800 fixing compilation --- pinot-distribution/pinot-assembly.xml | 4 ++ pinot-distribution/pom.xml | 4 ++ .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 64 ++++++++++++++++++++-- .../plugin/stream/kinesis/KinesisCheckpoint.java | 1 + .../pinot/plugin/stream/kinesis/KinesisConfig.java | 23 ++++---- .../stream/kinesis/KinesisConnectionHandler.java | 26 +++------ .../plugin/stream/kinesis/KinesisConsumer.java | 50 +++++++---------- .../stream/kinesis/KinesisConsumerFactory.java | 4 +- .../plugin/stream/kinesis/KinesisFetchResult.java | 3 - .../kinesis/KinesisPartitionGroupMetadataMap.java | 7 +-- .../plugin/stream/kinesis/KinesisRecordsBatch.java | 18 ++++++ .../stream/kinesis/KinesisShardMetadata.java | 13 ++--- .../plugin/stream/kinesis/KinesisConsumerTest.java | 39 +++++++------ 13 files changed, 152 insertions(+), 104 deletions(-) diff --git a/pinot-distribution/pinot-assembly.xml b/pinot-distribution/pinot-assembly.xml index 2dfb36e..de7329f 100644 --- a/pinot-distribution/pinot-assembly.xml +++ b/pinot-distribution/pinot-assembly.xml @@ -55,6 +55,10 @@ ${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/target/pinot-kafka-${kafka.version}-${project.version}-shaded.jar plugins/pinot-stream-ingestion/pinot-kafka-${kafka.version}/pinot-kafka-${kafka.version}-${project.version}-shaded.jar + + ${pinot.root}/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/target/pinot-kinesis-${project.version}-shaded.jar + plugins/pinot-stream-ingestion/pinot-kinesis/pinot-kinesis-${project.version}-shaded.jar + diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml index 1a3f106..f29cae0 100644 --- a/pinot-distribution/pom.xml +++ b/pinot-distribution/pom.xml @@ -86,6 +86,10 @@ org.apache.pinot + pinot-kinesis + + + org.apache.pinot pinot-batch-ingestion-standalone diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 0c9ae0b..4fce169 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -19,19 +19,20 @@ under the License. --> - + 4.0.0 pinot-stream-ingestion org.apache.pinot 0.7.0-SNAPSHOT .. - 4.0.0 pinot-kinesis - + Pinot Kinesis + https://pinot.apache.org/ ${basedir}/../../.. package @@ -43,6 +44,32 @@ software.amazon.awssdk kinesis ${aws.version} + + + com.fasterxml.jackson.core + jackson-core + + + org.reactivestreams + reactive-streams + + + io.netty + netty-codec + + + io.netty + netty-buffer + + + io.netty + netty-transport + + + io.netty + netty-common + + @@ -52,8 +79,33 @@ - org.apache.pinot - pinot-spi + org.reactivestreams + reactive-streams + 1.0.2 + + + + io.netty + netty-codec + 4.1.42.Final + + + + io.netty + netty-buffer + 4.1.42.Final + + + + io.netty + netty-transport + 4.1.42.Final + + + + io.netty + netty-common + 4.1.42.Final diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 54e26d0..f3a7a49 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -20,6 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import org.apache.pinot.spi.stream.v2.Checkpoint; + public class KinesisCheckpoint implements Checkpoint { String _sequenceNumber; Boolean _isEndOfPartition = false; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index 82fc438..529f34f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -24,16 +24,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConfig { - private final Map _props; - public static final String STREAM = "stream"; - private static final String AWS_REGION = "aws-region"; - private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; public static final String SHARD_ITERATOR_TYPE = "shard-iterator-type"; - - private static final String DEFAULT_AWS_REGION = "us-central-1"; - private static final String DEFAULT_MAX_RECORDS = "20"; - private static final String DEFAULT_SHARD_ITERATOR_TYPE = "LATEST"; + public static final String AWS_REGION = "aws-region"; + public static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; + public static final String DEFAULT_AWS_REGION = "us-central-1"; + public static final String DEFAULT_MAX_RECORDS = "20"; + public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString(); + private final Map _props; public KinesisConfig(StreamConfig streamConfig) { _props = streamConfig.getStreamConfigsMap(); @@ -43,20 +41,19 @@ public class KinesisConfig { _props = props; } - public String getStream(){ + public String getStream() { return _props.get(STREAM); } - public String getAwsRegion(){ + public String getAwsRegion() { return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION); } - public Integer maxRecordsToFetch(){ + public Integer maxRecordsToFetch() { return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS)); } - public ShardIteratorType getShardIteratorType(){ + public ShardIteratorType getShardIteratorType() { return ShardIteratorType.fromValue(_props.getOrDefault(SHARD_ITERATOR_TYPE, DEFAULT_SHARD_ITERATOR_TYPE)); } - } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java index 0cf4787..4d968f6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -19,28 +19,18 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.List; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.stream.v2.ConsumerV2; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; -import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; -import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; -import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; -import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; -import software.amazon.awssdk.services.kinesis.model.PutRecordResponse; import software.amazon.awssdk.services.kinesis.model.Shard; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -import software.amazon.awssdk.services.kinesis.model.StreamDescription; public class KinesisConnectionHandler { + KinesisClient _kinesisClient; private String _stream; private String _awsRegion; - KinesisClient _kinesisClient; public KinesisConnectionHandler() { @@ -58,18 +48,18 @@ public class KinesisConnectionHandler { return listShardsResponse.shards(); } - public void createConnection(){ - if(_kinesisClient == null) { - _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) - .build(); + public void createConnection() { + if (_kinesisClient == null) { + _kinesisClient = + KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()) + .build(); } } - public void close(){ - if(_kinesisClient != null) { + public void close() { + if (_kinesisClient != null) { _kinesisClient.close(); _kinesisClient = null; } } - } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 336468a..fb414f0 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -19,18 +19,13 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; -import org.apache.pinot.spi.stream.v2.FetchResult; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +33,6 @@ import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.InvalidArgumentException; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.ProvisionedThroughputExceededException; @@ -46,13 +40,14 @@ import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { + private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); String _stream; Integer _maxRecords; String _shardId; ExecutorService _executorService; ShardIteratorType _shardIteratorType; - private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); @@ -67,12 +62,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume @Override public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { List recordList = new ArrayList<>(); - Future kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList)); + Future kinesisFetchResultFuture = + _executorService.submit(() -> getResult(start, end, recordList)); try { return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS); - } catch(Exception e){ - return handleException((KinesisCheckpoint) start, recordList); + } catch (Exception e) { + return handleException((KinesisCheckpoint) start, recordList); } } @@ -81,7 +77,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume try { - if(_kinesisClient == null){ + if (_kinesisClient == null) { createConnection(); } @@ -105,7 +101,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume recordList.addAll(getRecordsResponse.records()); nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - if (kinesisEndSequenceNumber != null && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { + if (kinesisEndSequenceNumber != null + && kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0) { nextStartSequenceNumber = kinesisEndSequenceNumber; break; } @@ -115,14 +112,13 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } } - if(getRecordsResponse.hasChildShards()){ + if (getRecordsResponse.hasChildShards()) { //This statement returns true only when end of current shard has reached. isEndOfShard = true; break; } shardIterator = getRecordsResponse.nextShardIterator(); - } if (nextStartSequenceNumber == null && recordList.size() > 0) { @@ -133,28 +129,20 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); return kinesisFetchResult; - }catch (ProvisionedThroughputExceededException e) { - LOG.warn( - "The request rate for the stream is too high" - , e); + } catch (ProvisionedThroughputExceededException e) { + LOG.warn("The request rate for the stream is too high", e); return handleException(kinesisStartCheckpoint, recordList); - } - catch (ExpiredIteratorException e) { - LOG.warn( - "ShardIterator expired while trying to fetch records",e - ); + } catch (ExpiredIteratorException e) { + LOG.warn("ShardIterator expired while trying to fetch records", e); return handleException(kinesisStartCheckpoint, recordList); - } - catch (ResourceNotFoundException | InvalidArgumentException e) { + } catch (ResourceNotFoundException | InvalidArgumentException e) { // aws errors LOG.error("Encountered AWS error while attempting to fetch records", e); return handleException(kinesisStartCheckpoint, recordList); - } - catch (KinesisException e) { + } catch (KinesisException e) { LOG.warn("Encountered unknown unrecoverable AWS exception", e); throw new RuntimeException(e); - } - catch (Throwable e) { + } catch (Throwable e) { // non transient errors LOG.error("Unknown fetchRecords exception", e); throw new RuntimeException(e); @@ -162,11 +150,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } private KinesisFetchResult handleException(KinesisCheckpoint start, List recordList) { - if(recordList.size() > 0){ + if (recordList.size() > 0) { String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); return new KinesisFetchResult(kinesisCheckpoint, recordList); - }else{ + } else { KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber()); return new KinesisFetchResult(kinesisCheckpoint, recordList); } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index acac1fb..9bb4d0c 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import java.util.Map; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; @@ -38,7 +37,8 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public PartitionGroupMetadataMap getPartitionGroupsMetadata( PartitionGroupMetadataMap currentPartitionGroupsMetadata) { - return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), currentPartitionGroupsMetadata); + return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), + currentPartitionGroupsMetadata); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index 39561f3..8da3d2e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -18,10 +18,7 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import java.util.ArrayList; import java.util.List; -import org.apache.pinot.spi.stream.MessageBatch; -import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.FetchResult; import software.amazon.awssdk.services.kinesis.model.Record; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index 626c8ea..f96533f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -22,12 +22,8 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; -import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; -import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -56,7 +52,8 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i //Return existing shard metadata _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId())); } else if (currentMetadataMap.containsKey(shard.parentShardId())) { - KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId()); + KinesisShardMetadata kinesisShardMetadata = + (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId()); if (isProcessingFinished(kinesisShardMetadata)) { //Add child shards for processing since parent has finished appendShardMetadata(stream, awsRegion, shard); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java index ed51f8f..04bf4e6 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisRecordsBatch.java @@ -1,3 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.pinot.plugin.stream.kinesis; import java.util.List; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 1d753c3..e24121b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -20,10 +20,7 @@ package org.apache.pinot.plugin.stream.kinesis; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; -import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + //TODO: Implement shardId as Array and have unique id public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { @@ -48,13 +45,13 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa } @Override - public KinesisCheckpoint getEndCheckpoint() { - return _endCheckpoint; + public void setStartCheckpoint(Checkpoint startCheckpoint) { + _startCheckpoint = (KinesisCheckpoint) startCheckpoint; } @Override - public void setStartCheckpoint(Checkpoint startCheckpoint) { - _startCheckpoint = (KinesisCheckpoint) startCheckpoint; + public KinesisCheckpoint getEndCheckpoint() { + return _endCheckpoint; } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java index 6f660f7..f853875 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -20,40 +20,43 @@ package org.apache.pinot.plugin.stream.kinesis; /** import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import software.amazon.awssdk.services.kinesis.model.Record; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConsumerTest { + + private static final String STREAM_NAME = "kinesis-test"; + private static final String AWS_REGION = "us-west-2"; + public static void main(String[] args) { Map props = new HashMap<>(); - props.put("stream", "kinesis-test"); - props.put("aws-region", "us-west-2"); - props.put("max-records-to-fetch", "2000"); - props.put("shard-iterator-type", "AT-SEQUENCE-NUMBER"); - + props.put(KinesisConfig.STREAM, STREAM_NAME); + props.put(KinesisConfig.AWS_REGION, AWS_REGION); + props.put(KinesisConfig.MAX_RECORDS_TO_FETCH, "10"); + props.put(KinesisConfig.SHARD_ITERATOR_TYPE, ShardIteratorType.AT_SEQUENCE_NUMBER.toString()); KinesisConfig kinesisConfig = new KinesisConfig(props); - - KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2"); - + KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler(STREAM_NAME, AWS_REGION); List shardList = kinesisConnectionHandler.getShards(); - - for(Shard shard : shardList) { + for (Shard shard : shardList) { System.out.println("SHARD: " + shard.shardId()); - KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2")); - + KinesisConsumer kinesisConsumer = + new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME, AWS_REGION)); + System.out.println( + "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard + .sequenceNumberRange().endingSequenceNumber() + " >"); KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber()); - KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L); - + KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60 * 1000L); KinesisRecordsBatch list = fetchResult.getMessages(); int n = list.getMessageCount(); - for (int i=0;i