kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qhz...@apache.org
Subject [1/2] incubator-kylin git commit: KYLIN-874
Date Mon, 06 Jul 2015 08:40:19 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 744daaabc -> 1105c5543


KYLIN-874


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/876f34d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/876f34d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/876f34d3

Branch: refs/heads/0.8
Commit: 876f34d329bde676354ca5cbd499b7517da1ae66
Parents: 744daaa
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Mon Jul 6 14:24:08 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Mon Jul 6 16:02:49 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/monitor/StreamingMonitor.java     | 42 ++++++++++++++++----
 .../kylin/job/streaming/BootstrapConfig.java    | 10 +++++
 .../kylin/job/streaming/StreamingBootstrap.java | 21 +++++++---
 .../kylin/job/streaming/StreamingCLI.java       |  3 ++
 .../apache/kylin/streaming/StreamingUtil.java   |  6 +--
 5 files changed, 66 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/876f34d3/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
index 77c9b34..c7dc1ba 100644
--- a/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ b/job/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
@@ -1,6 +1,7 @@
 package org.apache.kylin.job.monitor;
 
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
@@ -71,28 +72,53 @@ public class StreamingMonitor {
         sendMail(receivers, title, stringBuilder.toString());
     }
 
-    public void checkCube(List<String> receivers, String cubeName) {
-        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
+    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
+            }
         }
+        return gaps;
+    }
+
+    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        Preconditions.checkNotNull(cube);
         final List<CubeSegment> segments = cube.getSegment(SegmentStatusEnum.READY);
         logger.info("totally " + segments.size() + " cubeSegments");
         Collections.sort(segments);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        return segments;
+    }
+
+    public static final List<Pair<String, String>> findOverlaps(String cubeName)
{
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
         List<Pair<String, String>> overlaps = Lists.newArrayList();
         for (int i = 0; i < segments.size() - 1; ++i) {
             CubeSegment first = segments.get(i);
             CubeSegment second = segments.get(i + 1);
             if (first.getDateRangeEnd() == second.getDateRangeStart()) {
                 continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), second.getDateRangeStart()));
             } else {
                 overlaps.add(Pair.newPair(first.getName(), second.getName()));
             }
         }
+        return overlaps;
+    }
+
+    public void checkCube(List<String> receivers, String cubeName) {
+        final CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        if (cube == null) {
+            logger.info("cube:" + cubeName + " does not exist");
+            return;
+        }
+        List<Pair<Long, Long>> gaps = findGaps(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
         StringBuilder content = new StringBuilder();
         if (!gaps.isEmpty()) {
             content.append("all gaps:").append("\n").append(

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/876f34d3/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
index 302f455..e665cf4 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/BootstrapConfig.java
@@ -12,6 +12,8 @@ public class BootstrapConfig {
     private long end = 0L;
     private long margin = 0L;
 
+    private boolean fillGap;
+
 
     public long getMargin() {
         return margin;
@@ -60,4 +62,12 @@ public class BootstrapConfig {
     public void setPartitionId(int partitionId) {
         this.partitionId = partitionId;
     }
+
+    public boolean isFillGap() {
+        return fillGap;
+    }
+
+    public void setFillGap(boolean fillGap) {
+        this.fillGap = fillGap;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/876f34d3/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index d3013cb..b15bee6 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -45,6 +45,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.DaemonThreadFactory;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -53,6 +54,8 @@ import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.job.monitor.MonitorCLI;
+import org.apache.kylin.job.monitor.StreamingMonitor;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.streaming.*;
@@ -122,12 +125,20 @@ public class StreamingBootstrap {
             Preconditions.checkArgument(partitionId >= 0, "partitionId cannot be empty
for inverted index streaming");
             startIIStreaming(streamingConfig, partitionId);
         } else if (!StringUtils.isEmpty(streamingConfig.getCubeName())) {
-            if (bootstrapConfig.isOneOff()) {
-                Preconditions.checkArgument(bootstrapConfig.getStart() != 0);
-                Preconditions.checkArgument(bootstrapConfig.getEnd() != 0);
-                startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(), bootstrapConfig.getEnd(),
bootstrapConfig.getMargin());
+            if (bootstrapConfig.isFillGap()) {
+                final List<Pair<Long, Long>> gaps = StreamingMonitor.findGaps(streamingConfig.getCubeName());
+                logger.info("all gaps:" + StringUtils.join(gaps, ","));
+                for (Pair<Long, Long> gap : gaps) {
+                    startOneOffCubeStreaming(streamingConfig, gap.getFirst(), gap.getSecond(),
bootstrapConfig.getMargin());
+                }
             } else {
-                startCubeStreaming(streamingConfig);
+                if (bootstrapConfig.isOneOff()) {
+                    Preconditions.checkArgument(bootstrapConfig.getStart() != 0);
+                    Preconditions.checkArgument(bootstrapConfig.getEnd() != 0);
+                    startOneOffCubeStreaming(streamingConfig, bootstrapConfig.getStart(),
bootstrapConfig.getEnd(), bootstrapConfig.getMargin());
+                } else {
+                    startCubeStreaming(streamingConfig);
+                }
             }
         } else {
             throw new IllegalArgumentException("no cube or ii in kafka config");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/876f34d3/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
index 1eb23f1..c11c7b5 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingCLI.java
@@ -79,6 +79,9 @@ public class StreamingCLI {
                 case "-margin":
                     bootstrapConfig.setMargin(Long.parseLong(args[++i]));
                     break;
+                case "-fillGap":
+                    bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
+                    break;
                 default:
                     logger.warn("ignore this arg:" + argName);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/876f34d3/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
index 49ef227..0019a10 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingUtil.java
@@ -35,8 +35,8 @@ public final class StreamingUtil {
 
     private static MessageAndOffset getKafkaMessage(KafkaClusterConfig kafkaClusterConfig,
int partitionId, long offset) {
         final String topic = kafkaClusterConfig.getTopic();
-        int retry = 3;
-        while (retry-- > 0) {
+        int retry = 0;
+        while (retry++ < 4) {
             final Broker leadBroker = getLeadBroker(kafkaClusterConfig, partitionId);
             if (leadBroker == null) {
                 logger.warn("unable to find leadBroker with config:" + kafkaClusterConfig
+ " partitionId:" + partitionId);
@@ -52,7 +52,7 @@ public final class StreamingUtil {
                 return iterator.next();
             } else {
                 try {
-                    Thread.sleep(1000);
+                    Thread.sleep((long) (Math.pow(2, retry) * 1000));
                 } catch (InterruptedException e) {
                     throw new RuntimeException(e);
                 }


Mime
View raw message