kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qhz...@apache.org
Subject [2/2] incubator-kylin git commit: increase retry wait time exponentially when failed to fetch Kafka Response, max wait time is about one minute
Date Mon, 06 Jul 2015 08:40:20 GMT
increase retry wait time exponentially when failed to fetch Kafka
Response, max wait time is about one minute


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1105c554
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1105c554
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1105c554

Branch: refs/heads/0.8
Commit: 1105c554397f27e2533fb880d32f143635c6d653
Parents: 876f34d
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Mon Jul 6 16:38:54 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Mon Jul 6 16:38:54 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/streaming/StreamingUtil.java   | 28 +++++++++++++-------
 1 file changed, 19 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1105c554/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 0019a10..4e7234f 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -21,6 +21,8 @@ public final class StreamingUtil {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamingUtil.class);
 
+    private static final int MAX_RETRY_TIMES = 6;
+
     private StreamingUtil() {
     }
 
@@ -33,31 +35,39 @@ public final class StreamingUtil {
         }
     }
 
+    private static void sleep(int retryTimes) {
+        int seconds = (int) Math.pow(2, retryTimes);
+        logger.info("retry times:" + retryTimes + " sleep:" + seconds + " seconds");
+        try {
+            Thread.sleep(seconds * 1000);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig,
int partitionId, long offset) {
         final String topic = kafkaClusterConfig.getTopic();
         int retry = 0;
-        while (retry++ < 4) {
+        while (retry < MAX_RETRY_TIMES) {//max sleep time 63 seconds
             final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
             if (leadBroker == null) {
                 logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig
+ " partitionId:" + partitionId);
+                sleep(retry++);
                 continue;
             }
             final FetchResponse response = KafkaRequester.fetchResponse(topic, partitionId,
offset, leadBroker, kafkaClusterConfig);
             if (response.errorCode(topic, partitionId) != 0) {
                 logger.warn("errorCode of FetchResponse is:" + response.errorCode(topic,
partitionId));
+                sleep(retry++);
                 continue;
             }
             final Iterator<MessageAndOffset> iterator = response.messageSet(topic,
partitionId).iterator();
-            if (iterator.hasNext()) {
-                return iterator.next();
-            } else {
-                try {
-                    Thread.sleep((long) (Math.pow(2, retry) * 1000));
-                } catch (InterruptedException e) {
-                    throw new RuntimeException(e);
-                }
+            if (!iterator.hasNext()) {
+                logger.warn("messageSet is empty");
+                sleep(retry++);
                 continue;
             }
+            return iterator.next();
         }
         throw new IllegalStateException(String.format("try to get timestamp of topic: %s,
partitionId: %d, offset: %d, failed to get StreamMessage from kafka", topic, partitionId,
offset));
     }


Mime
View raw message