From commits-return-28214-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 01:22:12 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id F2579180680 for ; Sun, 3 Jan 2021 02:22:11 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 65B2B6405C for ; Sun, 3 Jan 2021 01:22:11 +0000 (UTC) Received: (qmail 88357 invoked by uid 500); 3 Jan 2021 01:22:10 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 88347 invoked by uid 99); 3 Jan 2021 01:22:10 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 01:22:10 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id A570A81F9F; Sun, 3 Jan 2021 01:22:10 +0000 (UTC) Date: Sun, 03 Jan 2021 01:22:08 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 02/23: Add initial implementation of Kinesis consumer MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160963692560.9549.9640350278609407605@gitbox.apache.org> References: <160963692560.9549.9640350278609407605@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: 60195b016b07b9d0305cc4e4e8e5a6e424f5b76f X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103012210.A570A81F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit 60195b016b07b9d0305cc4e4e8e5a6e424f5b76f Author: KKcorps AuthorDate: Thu Dec 10 19:08:41 2020 +0530 Add initial implementation of Kinesis consumer --- .../pinot-stream-ingestion/pinot-kinesis/pom.xml | 39 ++++++++++++++++++ .../plugin/stream/kinesis/KinesisCheckpoint.java | 28 +++++++++++++ .../stream/kinesis/KinesisConnectionHandler.java | 25 ++++++++++++ .../plugin/stream/kinesis/KinesisConsumer.java | 40 ++++++++++++++++++ .../plugin/stream/kinesis/KinesisFetchResult.java | 25 ++++++++++++ .../stream/kinesis/KinesisShardMetadata.java | 47 ++++++++++++++++++++++ pinot-plugins/pinot-stream-ingestion/pom.xml | 1 + 7 files changed, 205 insertions(+) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml new file mode 100644 index 0000000..97e5eef --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml @@ -0,0 +1,39 @@ + + + + pinot-stream-ingestion + org.apache.pinot + 0.7.0-SNAPSHOT + .. + + 4.0.0 + + pinot-kinesis + + + ${basedir}/../../.. + package + 2.15.42 + + + + + software.amazon.awssdk + kinesis + ${aws.version} + + + org.apache.pinot + pinot-json + ${project.version} + test + + + org.apache.pinot + pinot-spi + + + + \ No newline at end of file diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java new file mode 100644 index 0000000..a330e78 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisCheckpoint.java @@ -0,0 +1,28 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.v2.Checkpoint; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; + + +public class KinesisCheckpoint implements Checkpoint { + String _shardIterator; + + public KinesisCheckpoint(String shardIterator){ + _shardIterator = shardIterator; + } + + public String getShardIterator() { + return _shardIterator; + } + + @Override + public byte[] serialize() { + return _shardIterator.getBytes(); + } + + @Override + public Checkpoint deserialize(byte[] blob) { + return new KinesisCheckpoint(new String(blob)); + } + +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java new file mode 100644 index 0000000..7ea24c0 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java @@ -0,0 +1,25 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.v2.ConsumerV2; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; + + +public class KinesisConnectionHandler { + String _awsRegion = ""; + KinesisClient _kinesisClient; + + public KinesisConnectionHandler(){ + + } + + public KinesisConnectionHandler(String awsRegion){ + _awsRegion = awsRegion; + _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create()).build(); + } + +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java new file mode 100644 index 0000000..251d831 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -0,0 +1,40 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.Collections; +import org.apache.pinot.spi.stream.v2.Checkpoint; +import org.apache.pinot.spi.stream.v2.ConsumerV2; +import org.apache.pinot.spi.stream.v2.FetchResult; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.Record; + + +public class KinesisConsumer extends KinesisConnectionHandler implements ConsumerV2 { + + //TODO: Fetch AWS region from Stream Config. + public KinesisConsumer(String awsRegion) { + super(awsRegion); + } + + @Override + public FetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { + KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; + KinesisCheckpoint kinesisEndCheckpoint = (KinesisCheckpoint) end; + + String kinesisShardIteratorStart = kinesisStartCheckpoint.getShardIterator(); + + GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(kinesisShardIteratorStart).build(); + GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); + + String kinesisNextShardIterator = getRecordsResponse.nextShardIterator(); + + if(!getRecordsResponse.hasRecords()){ + return new KinesisFetchResult(kinesisNextShardIterator, Collections.emptyList()); + } + + KinesisFetchResult kinesisFetchResult = new KinesisFetchResult(kinesisNextShardIterator, + getRecordsResponse.records()); + + return kinesisFetchResult; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java new file mode 100644 index 0000000..5ef4e30 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisFetchResult.java @@ -0,0 +1,25 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import java.util.List; +import org.apache.pinot.spi.stream.v2.Checkpoint; +import org.apache.pinot.spi.stream.v2.FetchResult; +import software.amazon.awssdk.services.kinesis.model.Record; + + +public class KinesisFetchResult implements FetchResult { + private String _nextShardIterator; + + public KinesisFetchResult(String nextShardIterator, List recordList){ + _nextShardIterator = nextShardIterator; + } + + @Override + public Checkpoint getLastCheckpoint() { + return new KinesisCheckpoint(_nextShardIterator); + } + + @Override + public byte[] getMessages() { + return new byte[0]; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java new file mode 100644 index 0000000..07ede73 --- /dev/null +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisShardMetadata.java @@ -0,0 +1,47 @@ +package org.apache.pinot.plugin.stream.kinesis; + +import org.apache.pinot.spi.stream.v2.Checkpoint; +import org.apache.pinot.spi.stream.v2.PartitionGroupMetadata; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; + + +public class KinesisShardMetadata extends KinesisConnectionHandler implements PartitionGroupMetadata { + Checkpoint _startCheckpoint; + Checkpoint _endCheckpoint; + + public KinesisShardMetadata(String shardId, String streamName) { + GetShardIteratorResponse getShardIteratorResponse = _kinesisClient.getShardIterator(GetShardIteratorRequest.builder().shardId(shardId).streamName(streamName).build()); + _startCheckpoint = new KinesisCheckpoint(getShardIteratorResponse.shardIterator()); + } + + @Override + public Checkpoint getStartCheckpoint() { + return _startCheckpoint; + } + + @Override + public Checkpoint getEndCheckpoint() { + return _endCheckpoint; + } + + @Override + public void setStartCheckpoint(Checkpoint startCheckpoint) { + _startCheckpoint = startCheckpoint; + } + + @Override + public void setEndCheckpoint(Checkpoint endCheckpoint) { + _endCheckpoint = endCheckpoint; + } + + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public PartitionGroupMetadata deserialize(byte[] blob) { + return null; + } +} diff --git a/pinot-plugins/pinot-stream-ingestion/pom.xml b/pinot-plugins/pinot-stream-ingestion/pom.xml index 3a51626..e7b9a46 100644 --- a/pinot-plugins/pinot-stream-ingestion/pom.xml +++ b/pinot-plugins/pinot-stream-ingestion/pom.xml @@ -42,6 +42,7 @@ pinot-kafka-base pinot-kafka-0.9 pinot-kafka-2.0 + pinot-kinesis --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org