pinot-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xian...@apache.org
Subject [incubator-pinot] 16/23: Handle closed connections
Date Sun, 03 Jan 2021 01:22:22 GMT
This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit bb8e08fda6da218f57882c348ca61ddc5a53e451
Author: KKcorps <kharekartik@gmail.com>
AuthorDate: Mon Dec 21 14:21:55 2020 +0530

    Handle closed connections
---
 .../plugin/stream/kinesis/KinesisConnectionHandler.java      | 12 +++++++++---
 .../apache/pinot/plugin/stream/kinesis/KinesisConsumer.java  |  8 ++++++++
 2 files changed, 17 insertions(+), 3 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
index ba94b0a..3607787 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConnectionHandler.java
@@ -49,9 +49,7 @@ public class KinesisConnectionHandler {
   public KinesisConnectionHandler(String stream, String awsRegion) {
     _stream = stream;
     _awsRegion = awsRegion;
-    _kinesisClient =
-        KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
-            .build();
+    createConnection();
   }
 
   public List<Shard> getShards() {
@@ -60,9 +58,17 @@ public class KinesisConnectionHandler {
     return listShardsResponse.shards();
   }
 
+  public void createConnection(){
+    if(_kinesisClient == null) {
+      _kinesisClient = KinesisClient.builder().region(Region.of(_awsRegion)).credentialsProvider(DefaultCredentialsProvider.create())
+          .build();
+    }
+  }
+
   public void close(){
     if(_kinesisClient != null) {
       _kinesisClient.close();
+      _kinesisClient = null;
     }
   }
 }
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
index 24810ba..fd48a92 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisConsumer.java
@@ -79,6 +79,10 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
 
     try {
 
+      if(_kinesisClient == null){
+        createConnection();
+      }
+
       String shardIterator = getShardIterator(kinesisStartCheckpoint);
 
       String kinesisEndSequenceNumber = null;
@@ -176,4 +180,8 @@ public class KinesisConsumer extends KinesisConnectionHandler implements
Consume
     return getShardIteratorResponse.shardIterator();
   }
 
+  @Override
+  public void close() {
+    super.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Mime
View raw message