From commits-return-28245-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 02:17:29 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id DD5C01807A0 for ; Sun, 3 Jan 2021 03:17:28 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 563DE65223 for ; Sun, 3 Jan 2021 02:17:28 +0000 (UTC) Received: (qmail 9993 invoked by uid 500); 3 Jan 2021 02:17:27 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 9889 invoked by uid 99); 3 Jan 2021 02:17:27 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 02:17:27 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 3DDF081F9F; Sun, 3 Jan 2021 02:17:27 +0000 (UTC) Date: Sun, 03 Jan 2021 02:17:32 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 08/23: Move shardId out of checkpoint to partition group metadata MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160964024402.15888.3151012730775503781@gitbox.apache.org> References: <160964024402.15888.3151012730775503781@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: d8b18cda826b4958d6dadc296ddbe60d4a1596d8 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103021727.3DDF081F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit d8b18cda826b4958d6dadc296ddbe60d4a1596d8 Author: KKcorps AuthorDate: Sun Dec 20 01:25:13 2020 +0530 Move shardId out of checkpoint to partition group metadata --- .../plugin/stream/kinesis/KinesisCheckpoint.java | 14 ++------------ .../plugin/stream/kinesis/KinesisConsumer.java | 21 +++++++++------------ .../stream/kinesis/KinesisConsumerFactory.java | 2 +- .../kinesis/KinesisPartitionGroupMetadataMap.java | 4 +++- .../plugin/stream/kinesis/KinesisShardMetadata.java | 5 ++--- 5 files changed, 17 insertions(+), 29 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index 8448665..aa80b17 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -4,11 +4,9 @@ import org.apache.pinot.spi.stream.v2.Checkpoint; public class KinesisCheckpoint implements Checkpoint { - String _shardId; String _sequenceNumber; - public KinesisCheckpoint(String shardId, String sequenceNumber){ - _shardId = shardId; + public KinesisCheckpoint(String sequenceNumber){ _sequenceNumber = sequenceNumber; } @@ -16,14 +14,6 @@ public class KinesisCheckpoint implements Checkpoint { return _sequenceNumber; } - public String getShardId() { - return _shardId; - } - - public void setShardId(String shardId) { - _shardId = shardId; - } - @Override public byte[] serialize() { return _sequenceNumber.getBytes(); @@ -32,7 +22,7 @@ public class KinesisCheckpoint implements Checkpoint { @Override public Checkpoint deserialize(byte[] blob) { //TODO: Implement SerDe - return new KinesisCheckpoint("", new String(blob)); + return new KinesisCheckpoint(new String(blob)); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 7bc1006..d896d67 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -7,6 +7,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; import org.apache.pinot.spi.stream.v2.FetchResult; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; @@ -18,18 +19,14 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { String _stream; Integer _maxRecords; + String _shardId; - //TODO: Fetch AWS region from Stream Config. - public KinesisConsumer(String stream, String awsRegion) { - super(stream, awsRegion); - _stream = stream; - _maxRecords = 20; - } - - public KinesisConsumer(String stream, String awsRegion, StreamConfig streamConfig) { - super(stream, awsRegion); + public KinesisConsumer(String stream, StreamConfig streamConfig, PartitionGroupMetadata partitionGroupMetadata) { + super(stream, streamConfig.getStreamConfigsMap().getOrDefault("aws-region", "global")); _stream = stream; _maxRecords = Integer.parseInt(streamConfig.getStreamConfigsMap().getOrDefault("maxRecords", "20")); + KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; + _shardId = kinesisShardMetadata.getShardId(); } @Override @@ -73,7 +70,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(kinesisStartCheckpoint.getShardId(), nextStartSequenceNumber); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); @@ -86,11 +83,11 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume if(kinesisStartCheckpoint.getSequenceNumber() != null) { String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().streamName(_stream).shardId(kinesisStartCheckpoint.getShardId()).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) + GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) .startingSequenceNumber(kinesisStartSequenceNumber).build()); } else{ getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().shardId(kinesisStartCheckpoint.getShardId()).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build()); + GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream).shardIteratorType(ShardIteratorType.LATEST).build()); } return getShardIteratorResponse.shardIterator(); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index bdbc348..0608118 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -31,6 +31,6 @@ public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { @Override public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) { - return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, "us-central-1"), _streamConfig); + return new KinesisConsumer(_streamConfig.getTopicName(), _streamConfig, metadata); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index d15804e..700ec3f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -16,9 +16,11 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i super(stream, awsRegion); List shardList = getShards(); for(Shard shard : shardList){ + String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber(); String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); - shardMetadata.setStartCheckpoint(new KinesisCheckpoint(shard.shardId(), endingSequenceNumber)); + shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber)); + shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); _stringPartitionGroupMetadataIndex.add(shardMetadata); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java index 693b307..e1d23da 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -15,9 +15,8 @@ public class KinesisShardMetadata extends KinesisConnectionHandler implements Pa public KinesisShardMetadata(String shardId, String streamName, String awsRegion) { super(streamName, awsRegion); - - _startCheckpoint = new KinesisCheckpoint(shardId, null); - _endCheckpoint = new KinesisCheckpoint(shardId, null); + _startCheckpoint = null; + _endCheckpoint = null; _shardId = shardId; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org