From commits-return-28213-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 01:22:11 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 77BB0180670 for ; Sun, 3 Jan 2021 02:22:11 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id B04F944F8F for ; Sun, 3 Jan 2021 01:22:10 +0000 (UTC) Received: (qmail 88304 invoked by uid 500); 3 Jan 2021 01:22:10 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 88258 invoked by uid 99); 3 Jan 2021 01:22:10 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 01:22:10 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 66A4B81F9F; Sun, 3 Jan 2021 01:22:10 +0000 (UTC) Date: Sun, 03 Jan 2021 01:22:07 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 01/23: Add interfaces for V2 consumers MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160963692560.9549.9640350278609407605@gitbox.apache.org> References: <160963692560.9549.9640350278609407605@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: abc65886a3d09535135f16d21ae388ae95665cb0 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103012210.66A4B81F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit abc65886a3d09535135f16d21ae388ae95665cb0 Author: KKcorps AuthorDate: Thu Dec 10 19:08:15 2020 +0530 Add interfaces for V2 consumers --- .../org/apache/pinot/spi/stream/v2/Checkpoint.java | 6 ++++++ .../org/apache/pinot/spi/stream/v2/ConsumerV2.java | 6 ++++++ .../org/apache/pinot/spi/stream/v2/FetchResult.java | 7 +++++++ .../pinot/spi/stream/v2/PartitionGroupMetadata.java | 16 ++++++++++++++++ .../pinot/spi/stream/v2/SegmentNameGenerator.java | 7 +++++++ .../pinot/spi/stream/v2/StreamConsumerFactoryV2.java | 19 +++++++++++++++++++ 6 files changed, 61 insertions(+) diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java new file mode 100644 index 0000000..0856454 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/Checkpoint.java @@ -0,0 +1,6 @@ +package org.apache.pinot.spi.stream.v2; + +public interface Checkpoint { + byte[] serialize(); + Checkpoint deserialize(byte[] blob); +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java new file mode 100644 index 0000000..afc8d38 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/ConsumerV2.java @@ -0,0 +1,6 @@ +package org.apache.pinot.spi.stream.v2; + +public interface ConsumerV2 { + FetchResult fetch(Checkpoint start, Checkpoint end, long timeout); +} + diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java new file mode 100644 index 0000000..b490835 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/FetchResult.java @@ -0,0 +1,7 @@ +package org.apache.pinot.spi.stream.v2; + +public interface FetchResult { + Checkpoint getLastCheckpoint(); + byte[] getMessages(); +} + diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java new file mode 100644 index 0000000..27c5ce7 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/PartitionGroupMetadata.java @@ -0,0 +1,16 @@ +package org.apache.pinot.spi.stream.v2; + +public interface PartitionGroupMetadata { + Checkpoint getStartCheckpoint(); // similar to getStartOffset + + Checkpoint getEndCheckpoint(); // similar to getEndOffset + + void setStartCheckpoint(Checkpoint startCheckpoint); + + void setEndCheckpoint(Checkpoint endCheckpoint); + + byte[] serialize(); + + PartitionGroupMetadata deserialize(byte[] blob); +} + diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java new file mode 100644 index 0000000..689c686 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/SegmentNameGenerator.java @@ -0,0 +1,7 @@ +package org.apache.pinot.spi.stream.v2; + +public interface SegmentNameGenerator { + // generates a unique name for a partition group based on the metadata + String generateSegmentName(PartitionGroupMetadata metadata); + +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java new file mode 100644 index 0000000..bd3017d --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/v2/StreamConsumerFactoryV2.java @@ -0,0 +1,19 @@ +package org.apache.pinot.spi.stream.v2; + +import java.util.Map; +import org.apache.pinot.spi.stream.StreamConfig; + + +public interface StreamConsumerFactoryV2 { + void init(StreamConfig streamConfig); + + // takes the current state of partition groups (groupings of shards, the state of the consumption) and creates the new state + Map getPartitionGroupsMetadata(Map currentPartitionGroupsMetadata); + + // creates a name generator which generates segment name for a partition group + SegmentNameGenerator getSegmentNameGenerator(); + + // creates a consumer which consumes from a partition group + ConsumerV2 createConsumer(PartitionGroupMetadata metadata); + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org