From commits-return-28262-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 03:50:31 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 98577180670 for ; Sun, 3 Jan 2021 04:50:31 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id CC9BD641BD for ; Sun, 3 Jan 2021 03:50:30 +0000 (UTC) Received: (qmail 63983 invoked by uid 500); 3 Jan 2021 03:50:29 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 63974 invoked by uid 99); 3 Jan 2021 03:50:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 03:50:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6E4FC81F9F; Sun, 3 Jan 2021 03:50:27 +0000 (UTC) Date: Sun, 03 Jan 2021 03:50:26 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] branch sharded_consumer_type_support_with_kinesis updated: Implementation fixes MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <160964582576.28881.17175137581458986501@gitbox.apache.org> From: nehapawar@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Oldrev: 6240808f4fc098a162da42722cfcb22d43850e87 X-Git-Newrev: a341b285016b1b478824de2a721e19d38186089f X-Git-Rev: a341b285016b1b478824de2a721e19d38186089f X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push: new a341b28 Implementation fixes a341b28 is described below commit a341b285016b1b478824de2a721e19d38186089f Author: Neha Pawar AuthorDate: Sat Jan 2 19:49:34 2021 -0800 Implementation fixes --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 2 +- .../plugin/stream/kinesis/KinesisCheckpoint.java | 47 +++++++---- .../plugin/stream/kinesis/KinesisConsumer.java | 46 +++++------ .../stream/kinesis/KinesisConsumerFactory.java | 39 +++++---- .../plugin/stream/kinesis/KinesisFetchResult.java | 44 ---------- .../kinesis/KinesisPartitionGroupMetadataMap.java | 93 ---------------------- .../stream/kinesis/KinesisShardMetadata.java | 71 ----------------- .../kinesis/KinesisStreamMetadataProvider.java | 53 ++++++++++++ .../plugin/stream/kinesis/KinesisConsumerTest.java | 18 +++-- .../pinot/spi/stream/PartitionGroupMetadata.java | 3 + .../org/apache/pinot/spi/stream/v2/Checkpoint.java | 25 ------ .../org/apache/pinot/spi/stream/v2/ConsumerV2.java | 24 ------ .../apache/pinot/spi/stream/v2/FetchResult.java | 29 ------- .../spi/stream/v2/PartitionGroupMetadata.java | 34 -------- .../spi/stream/v2/PartitionGroupMetadataMap.java | 30 ------- .../pinot/spi/stream/v2/SegmentNameGenerator.java | 25 ------ .../spi/stream/v2/StreamConsumerFactoryV2.java | 37 --------- pinot-tools/pom.xml | 5 ++ pom.xml | 2 +- 19 files changed, 148 insertions(+), 479 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml index 4fce169..38d4f73 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -109,4 +109,4 @@ - \ No newline at end of file + diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java index f3a7a49..1b8f86e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -18,38 +18,51 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import org.apache.pinot.spi.stream.v2.Checkpoint; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import java.io.IOException; +import java.util.Map; +import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.utils.JsonUtils; public class KinesisCheckpoint implements Checkpoint { - String _sequenceNumber; - Boolean _isEndOfPartition = false; + private Map _shardToStartSequenceMap; - public KinesisCheckpoint(String sequenceNumber) { - _sequenceNumber = sequenceNumber; + public KinesisCheckpoint(Map shardToStartSequenceMap) { + _shardToStartSequenceMap = shardToStartSequenceMap; } - public KinesisCheckpoint(String sequenceNumber, Boolean isEndOfPartition) { - _sequenceNumber = sequenceNumber; - _isEndOfPartition = isEndOfPartition; + public KinesisCheckpoint(String checkpointStr) + throws IOException { + _shardToStartSequenceMap = JsonUtils.stringToObject(checkpointStr, new TypeReference>() { + }); } - @Override - public boolean isEndOfPartition() { - return _isEndOfPartition; + public Map getShardToStartSequenceMap() { + return _shardToStartSequenceMap; } - public String getSequenceNumber() { - return _sequenceNumber; + @Override + public String serialize() { + try { + return JsonUtils.objectToString(_shardToStartSequenceMap); + } catch (JsonProcessingException e) { + throw new IllegalStateException(); + } } @Override - public byte[] serialize() { - return _sequenceNumber.getBytes(); + public KinesisCheckpoint deserialize(String blob) { + try { + return new KinesisCheckpoint(blob); + } catch (IOException e) { + throw new IllegalStateException(); + } } @Override - public KinesisCheckpoint deserialize(byte[] blob) { - return new KinesisCheckpoint(new String(blob)); + public int compareTo(Object o) { + return this._shardToStartSequenceMap.values().iterator().next().compareTo(((KinesisCheckpoint) o)._shardToStartSequenceMap.values().iterator().next()); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index fb414f0..8a24208 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -19,14 +19,16 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.pinot.spi.stream.v2.Checkpoint; -import org.apache.pinot.spi.stream.v2.ConsumerV2; -import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.Checkpoint; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException; @@ -41,28 +43,25 @@ import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; -public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { +public class KinesisConsumer extends KinesisConnectionHandler implements PartitionGroupConsumer { private final Logger LOG = LoggerFactory.getLogger(KinesisConsumer.class); String _stream; Integer _maxRecords; - String _shardId; ExecutorService _executorService; ShardIteratorType _shardIteratorType; - public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { + public KinesisConsumer(KinesisConfig kinesisConfig) { super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); _stream = kinesisConfig.getStream(); _maxRecords = kinesisConfig.maxRecordsToFetch(); - KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; - _shardId = kinesisShardMetadata.getShardId(); _shardIteratorType = kinesisConfig.getShardIteratorType(); _executorService = Executors.newSingleThreadExecutor(); } @Override - public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { + public KinesisRecordsBatch fetchMessages(Checkpoint start, Checkpoint end, int timeout) { List recordList = new ArrayList<>(); - Future kinesisFetchResultFuture = + Future kinesisFetchResultFuture = _executorService.submit(() -> getResult(start, end, recordList)); try { @@ -72,7 +71,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } } - private KinesisFetchResult getResult(Checkpoint start, Checkpoint end, List recordList) { + private KinesisRecordsBatch getResult(Checkpoint start, Checkpoint end, List recordList) { KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; try { @@ -81,13 +80,14 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume createConnection(); } - String shardIterator = getShardIterator(kinesisStartCheckpoint.getSequenceNumber()); + Map.Entry next = kinesisStartCheckpoint.getShardToStartSequenceMap().entrySet().iterator().next(); + String shardIterator = getShardIterator(next.getKey(), next.getValue()); String kinesisEndSequenceNumber = null; if (end != null) { KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; - kinesisEndSequenceNumber = kinesisEndCheckpoint.getSequenceNumber(); + kinesisEndSequenceNumber = kinesisEndCheckpoint.getShardToStartSequenceMap().values().iterator().next(); } String nextStartSequenceNumber = null; @@ -125,10 +125,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); } - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber, isEndOfShard); - KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisCheckpoint, recordList); - - return kinesisFetchResult; + return new KinesisRecordsBatch(recordList); } catch (ProvisionedThroughputExceededException e) { LOG.warn("The request rate for the stream is too high", e); return handleException(kinesisStartCheckpoint, recordList); @@ -149,21 +146,22 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } } - private KinesisFetchResult handleException(KinesisCheckpoint start, List recordList) { + private KinesisRecordsBatch handleException(KinesisCheckpoint start, List recordList) { if (recordList.size() > 0) { String nextStartSequenceNumber = recordList.get(recordList.size() - 1).sequenceNumber(); - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(nextStartSequenceNumber); - return new KinesisFetchResult(kinesisCheckpoint, recordList); + Map newCheckpoint = new HashMap<>(start.getShardToStartSequenceMap()); + newCheckpoint.put(newCheckpoint.keySet().iterator().next(), nextStartSequenceNumber); + return new KinesisRecordsBatch(recordList); } else { - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(start.getSequenceNumber()); - return new KinesisFetchResult(kinesisCheckpoint, recordList); + return new KinesisRecordsBatch(recordList); + } } - public String getShardIterator(String sequenceNumber) { + public String getShardIterator(String shardId, String sequenceNumber) { GetShardIteratorRequest.Builder requestBuilder = - GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId).shardIteratorType(_shardIteratorType); + GetShardIteratorRequest.builder().streamName(_stream).shardId(shardId).shardIteratorType(_shardIteratorType); if (sequenceNumber != null) { requestBuilder = requestBuilder.startingSequenceNumber(sequenceNumber); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java index 9bb4d0c..aa90812 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerFactory.java @@ -18,36 +18,41 @@ */ package org.apache.pinot.plugin.stream.kinesis; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.stream.v2.ConsumerV2; -import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; -import org.apache.pinot.spi.stream.v2.SegmentNameGenerator; -import org.apache.pinot.spi.stream.v2.StreamConsumerFactoryV2; +import java.util.Set; +import org.apache.pinot.spi.stream.PartitionGroupConsumer; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionLevelConsumer; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamLevelConsumer; +import org.apache.pinot.spi.stream.StreamMetadataProvider; -public class KinesisConsumerFactory implements StreamConsumerFactoryV2 { - private KinesisConfig _kinesisConfig; +public class KinesisConsumerFactory extends StreamConsumerFactory { @Override - public void init(StreamConfig streamConfig) { - _kinesisConfig = new KinesisConfig(streamConfig); + public PartitionLevelConsumer createPartitionLevelConsumer(String clientId, int partition) { + throw new UnsupportedOperationException(); } @Override - public PartitionGroupMetadataMap getPartitionGroupsMetadata( - PartitionGroupMetadataMap currentPartitionGroupsMetadata) { - return new KinesisPartitionGroupMetadataMap(_kinesisConfig.getStream(), _kinesisConfig.getAwsRegion(), - currentPartitionGroupsMetadata); + public StreamLevelConsumer createStreamLevelConsumer(String clientId, String tableName, Set fieldsToRead, + String groupId) { + throw new UnsupportedOperationException(); } @Override - public SegmentNameGenerator getSegmentNameGenerator() { + public StreamMetadataProvider createPartitionMetadataProvider(String clientId, int partition) { return null; } @Override - public ConsumerV2 createConsumer(PartitionGroupMetadata metadata) { - return new KinesisConsumer(_kinesisConfig, metadata); + public StreamMetadataProvider createStreamMetadataProvider(String clientId) { + return new KinesisStreamMetadataProvider(clientId, new KinesisConfig(_streamConfig)); } + + @Override + public PartitionGroupConsumer createPartitionGroupConsumer(String clientId, PartitionGroupMetadata metadata) { + return new KinesisConsumer(new KinesisConfig(_streamConfig)); + } + } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java deleted file mode 100644 index 8da3d2e..0000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java +++ /dev/null @@ -1,44 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.stream.kinesis; - -import java.util.List; -import org.apache.pinot.spi.stream.v2.FetchResult; -import software.amazon.awssdk.services.kinesis.model.Record; - - -public class KinesisFetchResult implements FetchResult { - private final KinesisCheckpoint _kinesisCheckpoint; - private final List _recordList; - - public KinesisFetchResult(KinesisCheckpoint kinesisCheckpoint, List recordList) { - _kinesisCheckpoint = kinesisCheckpoint; - _recordList = recordList; - } - - @Override - public KinesisCheckpoint getLastCheckpoint() { - return _kinesisCheckpoint; - } - - @Override - public KinesisRecordsBatch getMessages() { - return new KinesisRecordsBatch(_recordList); - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java deleted file mode 100644 index f96533f..0000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.stream.kinesis; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.v2.PartitionGroupMetadataMap; -import software.amazon.awssdk.services.kinesis.model.Shard; - - -public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler implements PartitionGroupMetadataMap { - private final List _stringPartitionGroupMetadataIndex = new ArrayList<>(); - - public KinesisPartitionGroupMetadataMap(String stream, String awsRegion, - PartitionGroupMetadataMap currentPartitionGroupMetadataMap) { - //TODO: Handle child shards. Do not consume data from child shard unless parent is finished. - //Return metadata only for shards in current metadata - super(stream, awsRegion); - KinesisPartitionGroupMetadataMap currentPartitionMeta = - (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap; - List currentMetaList = currentPartitionMeta.getMetadataList(); - - List shardList = getShards(); - - Map currentMetadataMap = new HashMap<>(); - for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) { - KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; - currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata); - } - - for (Shard shard : shardList) { - if (currentMetadataMap.containsKey(shard.shardId())) { - //Return existing shard metadata - _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId())); - } else if (currentMetadataMap.containsKey(shard.parentShardId())) { - KinesisShardMetadata kinesisShardMetadata = - (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId()); - if (isProcessingFinished(kinesisShardMetadata)) { - //Add child shards for processing since parent has finished - appendShardMetadata(stream, awsRegion, shard); - } else { - //Do not process this shard unless the parent shard is finished or expired - } - } else { - //This is a new shard with no parents. We can start processing this shard. - appendShardMetadata(stream, awsRegion, shard); - } - } - } - - private boolean isProcessingFinished(KinesisShardMetadata kinesisShardMetadata) { - return kinesisShardMetadata.getEndCheckpoint().getSequenceNumber() != null && kinesisShardMetadata - .getStartCheckpoint().getSequenceNumber().equals(kinesisShardMetadata.getEndCheckpoint().getSequenceNumber()); - } - - private void appendShardMetadata(String stream, String awsRegion, Shard shard) { - String startSequenceNumber = shard.sequenceNumberRange().startingSequenceNumber(); - String endingSequenceNumber = shard.sequenceNumberRange().endingSequenceNumber(); - KinesisShardMetadata shardMetadata = new KinesisShardMetadata(shard.shardId(), stream, awsRegion); - shardMetadata.setStartCheckpoint(new KinesisCheckpoint(startSequenceNumber)); - shardMetadata.setEndCheckpoint(new KinesisCheckpoint(endingSequenceNumber)); - _stringPartitionGroupMetadataIndex.add(shardMetadata); - } - - @Override - public List getMetadataList() { - return _stringPartitionGroupMetadataIndex; - } - - @Override - public PartitionGroupMetadata getPartitionGroupMetadata(int index) { - return _stringPartitionGroupMetadataIndex.get(index); - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java deleted file mode 100644 index e24121b..0000000 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.plugin.stream.kinesis; - -import org.apache.pinot.spi.stream.v2.Checkpoint; -import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; - - -//TODO: Implement shardId as Array and have unique id -public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { - String _shardId; - KinesisCheckpoint _startCheckpoint; - KinesisCheckpoint _endCheckpoint; - - public KinesisShardMetadata(String shardId, String streamName, String awsRegion) { - super(streamName, awsRegion); - _startCheckpoint = null; - _endCheckpoint = null; - _shardId = shardId; - } - - public String getShardId() { - return _shardId; - } - - @Override - public KinesisCheckpoint getStartCheckpoint() { - return _startCheckpoint; - } - - @Override - public void setStartCheckpoint(Checkpoint startCheckpoint) { - _startCheckpoint = (KinesisCheckpoint) startCheckpoint; - } - - @Override - public KinesisCheckpoint getEndCheckpoint() { - return _endCheckpoint; - } - - @Override - public void setEndCheckpoint(Checkpoint endCheckpoint) { - _endCheckpoint = (KinesisCheckpoint) endCheckpoint; - } - - @Override - public byte[] serialize() { - return new byte[0]; - } - - @Override - public KinesisShardMetadata deserialize(byte[] blob) { - return null; - } -} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java new file mode 100644 index 0000000..ba9d2b6 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java @@ -0,0 +1,53 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeoutException; +import javax.annotation.Nonnull; +import org.apache.pinot.spi.stream.OffsetCriteria; +import org.apache.pinot.spi.stream.PartitionGroupInfo; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamMetadataProvider; +import software.amazon.awssdk.services.kinesis.model.Shard; + + +public class KinesisStreamMetadataProvider implements StreamMetadataProvider { + private final KinesisConfig _kinesisConfig; + private KinesisConnectionHandler _kinesisConnectionHandler; + + public KinesisStreamMetadataProvider(String clientId, KinesisConfig kinesisConfig) { + _kinesisConfig = kinesisConfig; + _kinesisConnectionHandler = new KinesisConnectionHandler(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); + } + + @Override + public int fetchPartitionCount(long timeoutMillis) { + return 0; + } + + @Override + public long fetchPartitionOffset(@Nonnull OffsetCriteria offsetCriteria, long timeoutMillis) + throws TimeoutException { + return 0; + } + + @Override + public List getPartitionGroupInfoList(String clientId, StreamConfig streamConfig, + List currentPartitionGroupsMetadata, int timeoutMillis) + throws TimeoutException { + List partitionGroupInfos = new ArrayList<>(); + List shards = _kinesisConnectionHandler.getShards(); + for (Shard shard : shards) { + partitionGroupInfos.add(new PartitionGroupInfo(shard.shardId().hashCode(), shard.sequenceNumberRange().startingSequenceNumber())); + } + return partitionGroupInfos; + } + + @Override + public void close() + throws IOException { + + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java index f853875..57baae9 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -17,9 +17,11 @@ package org.apache.pinot.plugin.stream.kinesis; /** * under the License. */ +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; @@ -29,7 +31,8 @@ public class KinesisConsumerTest { private static final String STREAM_NAME = "kinesis-test"; private static final String AWS_REGION = "us-west-2"; - public static void main(String[] args) { + public static void main(String[] args) + throws IOException { Map props = new HashMap<>(); props.put(KinesisConfig.STREAM, STREAM_NAME); props.put(KinesisConfig.AWS_REGION, AWS_REGION); @@ -42,18 +45,19 @@ public class KinesisConsumerTest { System.out.println("SHARD: " + shard.shardId()); KinesisConsumer kinesisConsumer = - new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), STREAM_NAME, AWS_REGION)); + new KinesisConsumer(kinesisConfig); System.out.println( "Kinesis Checkpoint Range: < " + shard.sequenceNumberRange().startingSequenceNumber() + ", " + shard .sequenceNumberRange().endingSequenceNumber() + " >"); - KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber()); - KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 60 * 1000L); - KinesisRecordsBatch list = fetchResult.getMessages(); - int n = list.getMessageCount(); + Map shardIdToSeqNumMap = new HashMap<>(); + shardIdToSeqNumMap.put(shard.shardId(), shard.sequenceNumberRange().startingSequenceNumber()); + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shardIdToSeqNumMap); + KinesisRecordsBatch kinesisRecordsBatch = kinesisConsumer.fetchMessages(kinesisCheckpoint, null, 60 * 1000); + int n = kinesisRecordsBatch.getMessageCount(); System.out.println("Found " + n + " messages "); for (int i = 0; i < n; i++) { - System.out.println("SEQ-NO: " + list.getMessageOffsetAtIndex(i) + ", DATA: " + list.getMessageAtIndex(i)); + System.out.println("SEQ-NO: " + kinesisRecordsBatch.getMessageOffsetAtIndex(i) + ", DATA: " + kinesisRecordsBatch.getMessageAtIndex(i)); } kinesisConsumer.close(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java index f662d99..7c4e3ef 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.spi.stream; +import java.util.List; + + public class PartitionGroupMetadata { // fixme: Make partitionGroupId string everywhere (LLCSegmentName, StreamMetadataProvider) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java deleted file mode 100644 index 0195684..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -public interface Checkpoint { - boolean isEndOfPartition(); - byte[] serialize(); - Checkpoint deserialize(byte[] blob); -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java deleted file mode 100644 index 48b387d..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java +++ /dev/null @@ -1,24 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -public interface ConsumerV2 { - FetchResult fetch(Checkpoint start, Checkpoint end, long timeout); -} - diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java deleted file mode 100644 index 2188ac9..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -import java.util.List; -import org.apache.pinot.spi.stream.MessageBatch; - - -public interface FetchResult { - Checkpoint getLastCheckpoint(); - MessageBatch getMessages(); -} - diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java deleted file mode 100644 index d7c44d7..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -public interface PartitionGroupMetadata { - Checkpoint getStartCheckpoint(); // similar to getStartOffset - - Checkpoint getEndCheckpoint(); // similar to getEndOffset - - void setStartCheckpoint(Checkpoint startCheckpoint); - - void setEndCheckpoint(Checkpoint endCheckpoint); - - byte[] serialize(); - - PartitionGroupMetadata deserialize(byte[] blob); -} - diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java deleted file mode 100644 index ba37767..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadataMap.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -import java.util.List; - - -public interface PartitionGroupMetadataMap { - - List getMetadataList(); - - PartitionGroupMetadata getPartitionGroupMetadata(int index); - -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java deleted file mode 100644 index 6e65b25..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -public interface SegmentNameGenerator { - // generates a unique name for a partition group based on the metadata - String generateSegmentName(PartitionGroupMetadata metadata); - -} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java deleted file mode 100644 index 9e671aa..0000000 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.spi.stream.v2; - -import java.util.Map; -import org.apache.pinot.spi.stream.StreamConfig; - - -public interface StreamConsumerFactoryV2 { - void init(StreamConfig streamConfig); - - // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state - PartitionGroupMetadataMap getPartitionGroupsMetadata(PartitionGroupMetadataMap currentPartitionGroupsMetadata); - - // creates a name generator which generates segment name for a partition group - SegmentNameGenerator getSegmentNameGenerator(); - - // creates a consumer which consumes from a partition group - ConsumerV2 createConsumer(PartitionGroupMetadata metadata); - -} diff --git a/pinot-tools/pom.xml b/pinot-tools/pom.xml index cf3fe70..e3c21ef 100644 --- a/pinot-tools/pom.xml +++ b/pinot-tools/pom.xml @@ -87,6 +87,11 @@ org.apache.pinot + pinot-kinesis + ${project.version} + + + org.apache.pinot pinot-kafka-${kafka.version} ${project.version} runtime diff --git a/pom.xml b/pom.xml index 79dabf7..237b5c9 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ 1.8.0 0.9.8 0.7 - 2.9.8 + 2.12.0 1.9.21 2.28 2.4.4 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org