From commits-return-28253-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 02:17:31 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id ABC061806C7 for ; Sun, 3 Jan 2021 03:17:29 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id E7087455C8 for ; Sun, 3 Jan 2021 02:17:28 +0000 (UTC) Received: (qmail 10471 invoked by uid 500); 3 Jan 2021 02:17:28 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 10393 invoked by uid 99); 3 Jan 2021 02:17:28 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 02:17:28 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 8859081F9F; Sun, 3 Jan 2021 02:17:28 +0000 (UTC) Date: Sun, 03 Jan 2021 02:17:41 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 17/23: Refactor: get shard iterator methods MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160964024402.15888.3151012730775503781@gitbox.apache.org> References: <160964024402.15888.3151012730775503781@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: 6fcdf3ef1ae899631690f2ff191328f21a50f8b4 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103021728.8859081F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit 6fcdf3ef1ae899631690f2ff191328f21a50f8b4 Author: KKcorps AuthorDate: Mon Dec 21 14:25:25 2020 +0530 Refactor: get shard iterator methods --- .../plugin/stream/kinesis/KinesisConsumer.java | 25 ++++++++++++---------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index fd48a92..3263f87 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -163,21 +163,24 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } private String getShardIterator(KinesisCheckpoint kinesisStartCheckpoint) { - GetShardIteratorResponse getShardIteratorResponse; - if (kinesisStartCheckpoint.getSequenceNumber() != null) { - String kinesisStartSequenceNumber = kinesisStartCheckpoint.getSequenceNumber(); - getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId) - .shardIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER) - .startingSequenceNumber(kinesisStartSequenceNumber).build()); + return getShardIterator(ShardIteratorType.AT_SEQUENCE_NUMBER, kinesisStartCheckpoint.getSequenceNumber()); } else { - getShardIteratorResponse = _kinesisClient.getShardIterator( - GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream) - .shardIteratorType(ShardIteratorType.LATEST).build()); + return getShardIterator(ShardIteratorType.LATEST, null); } + } - return getShardIteratorResponse.shardIterator(); + public String getShardIterator(ShardIteratorType shardIteratorType, String sequenceNumber){ + if(sequenceNumber == null){ + return _kinesisClient.getShardIterator( + GetShardIteratorRequest.builder().shardId(_shardId).streamName(_stream) + .shardIteratorType(shardIteratorType).build()).shardIterator(); + }else{ + return _kinesisClient.getShardIterator( + GetShardIteratorRequest.builder().streamName(_stream).shardId(_shardId) + .shardIteratorType(shardIteratorType) + .startingSequenceNumber(sequenceNumber).build()).shardIterator(); + } } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org