From commits-return-28216-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 01:22:12 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 7111E180674 for ; Sun, 3 Jan 2021 02:22:12 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id BD5AE65197 for ; Sun, 3 Jan 2021 01:22:11 +0000 (UTC) Received: (qmail 88461 invoked by uid 500); 3 Jan 2021 01:22:11 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 88450 invoked by uid 99); 3 Jan 2021 01:22:11 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 01:22:11 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 0A65681F9F; Sun, 3 Jan 2021 01:22:11 +0000 (UTC) Date: Sun, 03 Jan 2021 01:22:10 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 04/23: Add kinesis code to handle offsets MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160963692560.9549.9640350278609407605@gitbox.apache.org> References: <160963692560.9549.9640350278609407605@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: 76cfcf1816d3d36974841081346edb8c95bf73fc X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103012211.0A65681F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit 76cfcf1816d3d36974841081346edb8c95bf73fc Author: KKcorps AuthorDate: Fri Dec 11 13:57:25 2020 +0530 Add kinesis code to handle offsets --- .../plugin/stream/kinesis/KinesisCheckpoint.java | 13 ++++--- .../plugin/stream/kinesis/KinesisConsumer.java | 42 +++++++++++++++++++--- .../stream/kinesis/KinesisConsumerFactory.java | 36 +++++++++++++++++++ .../plugin/stream/kinesis/KinesisFetchResult.java | 11 +++--- .../kinesis/KinesisPartitionGroupMetadataMap.java | 31 ++++++++++++++++ .../stream/kinesis/KinesisShardMetadata.java | 5 ++- 6 files changed, 121 insertions(+), 17 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index a330e78..77f790b 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -1,23 +1,22 @@ package org.apache.pinot.plugin.stream.kinesis; import org.apache.pinot.spi.stream.v2.Checkpoint; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; public class KinesisCheckpoint implements Checkpoint { - String _shardIterator; + String _sequenceNumber; - public KinesisCheckpoint(String shardIterator){ - _shardIterator = shardIterator; + public KinesisCheckpoint(String sequenceNumber){ + _sequenceNumber = sequenceNumber; } - public String getShardIterator() { - return _shardIterator; + public String getSequenceNumber() { + return _sequenceNumber; } @Override public byte[] serialize() { - return _shardIterator.getBytes(); + return _sequenceNumber.getBytes(); } @Override diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 251d831..dc44079 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -1,19 +1,26 @@ package org.apache.pinot.plugin.stream.kinesis; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.FetchResult; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { + String _stream; //TODO: Fetch AWS region from Stream Config. - public KinesisConsumer(String awsRegion) { + public KinesisConsumer(String stream, String awsRegion) { super(awsRegion); + _stream = stream; } @Override @@ -21,18 +28,43 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; - String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator(); + String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); + String kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); - GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build(); + GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().streamName(_stream).shardIteratorType( + ShardIteratorType.AFTER_SEQUENCE_NUMBER).startingSequenceNumber(kinesisStartSequenceNumber).build()); + + String shardIterator = getShardIteratorResponse.shardIterator(); + GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); String kinesisNextShardIterator = getRecordsResponse.nextShardIterator(); + //TODO: Get records in the loop and stop when end sequence number is reached or there is an exception. if(!getRecordsResponse.hasRecords()){ - return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList()); + return new KinesisFetchResult(kinesisStartSequenceNumber, Collections.emptyList()); + } + + List recordList = new ArrayList<>(); + recordList.addAll(getRecordsResponse.records()); + + String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); + while(kinesisNextShardIterator != null){ + getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisNextShardIterator).build(); + getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); + if(getRecordsResponse.hasRecords()){ + recordList.addAll(getRecordsResponse.records()); + nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); + } + + if(kinesisEndSequenceNumber.compareTo(recordList.get(recordList.size() - 1).sequenceNumber()) <= 0 ) { + nextStartSequenceNumber = kinesisEndSequenceNumber; + break; + } + kinesisNextShardIterator = getRecordsResponse.nextShardIterator(); } - KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator, + KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(nextStartSequenceNumber, getRecordsResponse.records()); return kinesisFetchResult; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java new file mode 100644 index 0000000..6bd1e3a --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -0,0 +1,36 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.Map; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.v2.ConsumerV2; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; +import org.apache.pinot.spi.stream.v2.SegmentNameGenerator; +import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2; + + +public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { + private StreamConfig _streamConfig; + private final String AWS_REGION = "aws-region"; + + @Override + public void init(StreamConfig streamConfig) { + _streamConfig = streamConfig; + } + + @Override + public PartitionGroupMetadataMap getPartitionGroupsMetadata( + PartitionGroupMetadataMap currentPartitionGroupsMetadata) { + return new KinesisPartitionGroupMetadataMap(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1")); + } + + @Override + public SegmentNameGenerator getSegmentNameGenerator() { + return null; + } + + @Override + public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) { + return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1")); + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java index 5ef4e30..dc8e764 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -1,16 +1,19 @@ package org.apache.pinot.plugin.stream.kinesis; +import java.util.ArrayList; import java.util.List; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.FetchResult; import software.amazon.awssdk.services.kinesis.model.Record; -public class KinesisFetchResult implements FetchResult { - private String _nextShardIterator; +public class KinesisFetchResult implements FetchResult { + private final String _nextShardIterator; + private final List _recordList; public KinesisFetchResult(String nextShardIterator, List recordList){ _nextShardIterator = nextShardIterator; + _recordList = recordList; } @Override @@ -19,7 +22,7 @@ public class KinesisFetchResult implements FetchResult { } @Override - public byte[] getMessages() { - return new byte[0]; + public List getMessages() { + return _recordList; } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java new file mode 100644 index 0000000..bc3fef2 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -0,0 +1,31 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; +import software.amazon.awssdk.services.kinesis.model.Shard; + + +public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap { + private Map _stringPartitionGroupMetadataMap = new HashMap<>(); + + public KinesisPartitionGroupMetadataMap(String stream, String awsRegion){ + super(awsRegion); + ListShardsResponse listShardsResponse = _kinesisClient.listShards(ListShardsRequest.builder().streamName(stream).build()); + List shardList = listShardsResponse.shards(); + for(Shard shard : shardList){ + String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); + KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream); + shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); + _stringPartitionGroupMetadataMap.put(shard.shardId(), shardMetadata); + } + } + + public Map getPartitionMetadata(){ + return _stringPartitionGroupMetadataMap; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 07ede73..d50d821 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -4,6 +4,7 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { @@ -11,8 +12,10 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa Checkpoint _endCheckpoint; public KinesisShardMetadata(String shardId, String streamName) { - GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build()); + GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).shardIteratorType( + ShardIteratorType.LATEST).streamName(streamName).build()); _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator()); + _endCheckpoint = null; } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org