From commits-return-28224-archive-asf-public=cust-asf.ponee.io@pinot.apache.org Sun Jan 3 01:22:14 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id C0E6218079E for ; Sun, 3 Jan 2021 02:22:13 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 1A9F165270 for ; Sun, 3 Jan 2021 01:22:12 +0000 (UTC) Received: (qmail 88883 invoked by uid 500); 3 Jan 2021 01:22:12 -0000 Mailing-List: contact commits-help@pinot.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pinot.apache.org Delivered-To: mailing list commits@pinot.apache.org Received: (qmail 88874 invoked by uid 99); 3 Jan 2021 01:22:12 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 Jan 2021 01:22:12 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 46E1281F9F; Sun, 3 Jan 2021 01:22:12 +0000 (UTC) Date: Sun, 03 Jan 2021 01:22:18 +0000 To: "commits@pinot.apache.org" Subject: [incubator-pinot] 12/23: fetch records with timeout MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: xiangfu@apache.org In-Reply-To: <160963692560.9549.9640350278609407605@gitbox.apache.org> References: <160963692560.9549.9640350278609407605@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: incubator-pinot X-Git-Refname: refs/heads/sharded_consumer_type_support_with_kinesis X-Git-Reftype: branch X-Git-Rev: 900450da05347fd6ce958053141b701c7e09d0e9 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20210103012212.46E1281F9F@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git commit 900450da05347fd6ce958053141b701c7e09d0e9 Author: KKcorps AuthorDate: Sun Dec 20 11:44:38 2020 +0530 fetch records with timeout --- .../plugin/stream/kinesis/KinesisConsumer.java | 30 ++++++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java index 96241d4..910b9ee 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java @@ -21,6 +21,12 @@ package org.apache.pinot.plugin.stream.kinesis; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.v2.Checkpoint; import org.apache.pinot.spi.stream.v2.ConsumerV2; @@ -39,6 +45,7 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume String _stream; Integer _maxRecords; String _shardId; + ExecutorService _executorService; public KinesisConsumer(KinesisConfig kinesisConfig, PartitionGroupMetadata partitionGroupMetadata) { super(kinesisConfig.getStream(), kinesisConfig.getAwsRegion()); @@ -46,10 +53,27 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume _maxRecords = kinesisConfig.maxRecordsToFetch(); KinesisShardMetadata kinesisShardMetadata = (KinesisShardMetadata) partitionGroupMetadata; _shardId = kinesisShardMetadata.getShardId(); + _executorService = Executors.newSingleThreadExecutor(); } @Override public KinesisFetchResult fetch(Checkpoint start, Checkpoint end, long timeout) { + Future kinesisFetchResultFuture = _executorService.submit(new Callable() { + @Override + public KinesisFetchResult call() + throws Exception { + return getResult(start, end); + } + }); + + try { + return kinesisFetchResultFuture.get(timeout, TimeUnit.MILLISECONDS); + } catch(Exception e){ + return null; + } + } + + private KinesisFetchResult getResult(Checkpoint start, Checkpoint end) { try { KinesisCheckpoint kinesisStartCheckpoint = (KinesisCheckpoint) start; @@ -65,9 +89,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume } String nextStartSequenceNumber = null; - Long startTimestamp = System.currentTimeMillis(); - while (shardIterator != null && !isTimedOut(startTimestamp, timeout)) { + while (shardIterator != null) { GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder().shardIterator(shardIterator).build(); GetRecordsResponse getRecordsResponse = _kinesisClient.getRecords(getRecordsRequest); @@ -119,7 +142,4 @@ public class KinesisConsumer extends KinesisConnectionHandler implements Consume return getShardIteratorResponse.shardIterator(); } - private boolean isTimedOut(Long startTimestamp, Long timeout) { - return (System.currentTimeMillis() - startTimestamp) >= timeout; - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org For additional commands, e-mail: commits-help@pinot.apache.org