From commits-return-28256-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 02:17:32 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 25E25180674 for ; Sun, 3 Jan 2021 03:17:30 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 95ED1652D1 for ; Sun, 3 Jan 2021 02:17:29 +0000 (UTC) Received: (qmail 10597 invoked by uid 500); 3 Jan 2021 02:17:29 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 10577 invoked by uid 99); 3 Jan 2021 02:17:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 02:17:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D33D781FB7; Sun, 3 Jan 2021 02:17:28 +0000 (UTC) Date: Sun, 03 Jan 2021 02:17:43 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 19/23: Add test code for kinesis MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160964024402.15888.3151012730775503781@gitbox.apache.org> References: <160964024402.15888.3151012730775503781@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: 0af84b13afc3ea4d9ccb95a8e67fd6464e49d1bf X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103021728.D33D781FB7@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit 0af84b13afc3ea4d9ccb95a8e67fd6464e49d1bf Author: KKcorps AuthorDate: Tue Dec 22 22:05:02 2020 +0530 Add test code for kinesis --- .../pinot/plugin/stream/kinesis/KinesisConfig.java | 17 +++++-- .../kinesis/KinesisPartitionGroupMetadataMap.java | 16 +++---- .../plugin/stream/kinesis/KinesisConsumerTest.java | 54 ++++++++++++++++++++++ 3 files changed, 74 insertions(+), 13 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java index d2e8715..a81d11f 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConfig.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.plugin.stream.kinesis; +import java.util.Map; import org.apache.pinot.spi.stream.StreamConfig; public class KinesisConfig { - private final StreamConfig _streamConfig; + private final Map _props; + + public static final String STREAM = "stream"; private static final String AWS_REGION = "aws-region"; private static final String MAX_RECORDS_TO_FETCH = "max-records-to-fetch"; @@ -30,18 +33,22 @@ public class KinesisConfig { private static final String DEFAULT_MAX_RECORDS = "20"; public KinesisConfig(StreamConfig streamConfig) { - _streamConfig = streamConfig; + _props = streamConfig.getStreamConfigsMap(); + } + + public KinesisConfig(Map props) { + _props = props; } public String getStream(){ - return _streamConfig.getTopicName(); + return _props.get(STREAM); } public String getAwsRegion(){ - return _streamConfig.getStreamConfigsMap().getOrDefault(AWS_REGION, DEFAULT_AWS_REGION); + return _props.getOrDefault(AWS_REGION, DEFAULT_AWS_REGION); } public Integer maxRecordsToFetch(){ - return Integer.parseInt(_streamConfig.getStreamConfigsMap().getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS)); + return Integer.parseInt(_props.getOrDefault(MAX_RECORDS_TO_FETCH, DEFAULT_MAX_RECORDS)); } } diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java index d77579e..626c8ea 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisPartitionGroupMetadataMap.java @@ -35,28 +35,28 @@ public class KinesisPartitionGroupMetadataMap extends KinesisConnectionHandler i private final List _stringPartitionGroupMetadataIndex = new ArrayList<>(); public KinesisPartitionGroupMetadataMap(String stream, String awsRegion, - PartitionGroupMetadataMap partitionGroupMetadataMap) { + PartitionGroupMetadataMap currentPartitionGroupMetadataMap) { //TODO: Handle child shards. Do not consume data from child shard unless parent is finished. //Return metadata only for shards in current metadata super(stream, awsRegion); KinesisPartitionGroupMetadataMap currentPartitionMeta = - (KinesisPartitionGroupMetadataMap) partitionGroupMetadataMap; + (KinesisPartitionGroupMetadataMap) currentPartitionGroupMetadataMap; List currentMetaList = currentPartitionMeta.getMetadataList(); List shardList = getShards(); - Map metadataMap = new HashMap<>(); + Map currentMetadataMap = new HashMap<>(); for (PartitionGroupMetadata partitionGroupMetadata : currentMetaList) { KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; - metadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata); + currentMetadataMap.put(kinesisShardMetadata.getShardId(), kinesisShardMetadata); } for (Shard shard : shardList) { - if (metadataMap.containsKey(shard.shardId())) { + if (currentMetadataMap.containsKey(shard.shardId())) { //Return existing shard metadata - _stringPartitionGroupMetadataIndex.add(metadataMap.get(shard.shardId())); - } else if (metadataMap.containsKey(shard.parentShardId())) { - KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) metadataMap.get(shard.parentShardId()); + _stringPartitionGroupMetadataIndex.add(currentMetadataMap.get(shard.shardId())); + } else if (currentMetadataMap.containsKey(shard.parentShardId())) { + KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) currentMetadataMap.get(shard.parentShardId()); if (isProcessingFinished(kinesisShardMetadata)) { //Add child shards for processing since parent has finished appendShardMetadata(stream, awsRegion, shard); diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java new file mode 100644 index 0000000..f8a0551 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/test/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumerTest.java @@ -0,0 +1,54 @@ +package org.apache.pinot.plugin.stream.kinesis; /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import software.amazon.awssdk.services.kinesis.model.Record; +import software.amazon.awssdk.services.kinesis.model.Shard; + + +public class KinesisConsumerTest { + public static void main(String[] args) { + Map props = new HashMap<>(); + props.put("stream", "kinesis-test"); + props.put("aws-region", "us-west-2"); + props.put("maxRecords", "10"); + + KinesisConfig kinesisConfig = new KinesisConfig(props); + + KinesisConnectionHandler kinesisConnectionHandler = new KinesisConnectionHandler("kinesis-test", "us-west-2"); + + List shardList = kinesisConnectionHandler.getShards(); + + for(Shard shard : shardList) { + KinesisConsumer kinesisConsumer = new KinesisConsumer(kinesisConfig, new KinesisShardMetadata(shard.shardId(), "kinesis-test", "us-west-2")); + + KinesisCheckpoint kinesisCheckpoint = new KinesisCheckpoint(shard.sequenceNumberRange().startingSequenceNumber()); + KinesisFetchResult fetchResult = kinesisConsumer.fetch(kinesisCheckpoint, null, 6 * 10 * 1000L); + + List list = fetchResult.getMessages(); + + System.out.println("SHARD: " + shard.shardId()); + for (Record record : list) { + System.out.println("SEQ-NO: " + record.sequenceNumber() + ", DATA: " + record.data().asUtf8String()); + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org