kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [37/43] kylin git commit: KYLIN-1415 Cube parallel merge
Date Fri, 04 Mar 2016 02:03:18 GMT
KYLIN-1415 Cube parallel merge


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/495ad927
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/495ad927
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/495ad927

Branch: refs/heads/helix-rebase
Commit: 495ad9278586d3fc4b7620779e77fcf511a7f2e9
Parents: 3bb345e
Author: shaofengshi <shaofengshi@apache.org>
Authored: Sun Feb 14 22:11:59 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Fri Mar 4 09:52:20 2016 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 50 ++++++++++++--------
 .../apache/kylin/rest/service/CubeService.java  |  2 +-
 2 files changed, 32 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/495ad927/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4951ce6..16922ac 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -18,13 +18,11 @@
 
 package org.apache.kylin.cube;
 
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.annotation.Nullable;
-
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.JsonSerializer;
@@ -46,9 +44,7 @@ import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.ProjectManager;
-import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.IRealizationConstants;
 import org.apache.kylin.metadata.realization.IRealizationProvider;
@@ -59,11 +55,11 @@ import org.apache.kylin.source.SourceFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * @author yangli9
@@ -460,8 +456,14 @@ public class CubeManager implements IRealizationProvider {
     }
 
     public CubeSegment mergeSegments(CubeInstance cube, final long startDate, final long
endDate, boolean forceMergeEmptySeg) throws IOException {
-        checkNoBuildingSegment(cube);
+        return mergeSegments(cube, startDate, endDate, forceMergeEmptySeg, true);
+    }
+
+    public CubeSegment mergeSegments(CubeInstance cube, final long startDate, final long
endDate, boolean forceMergeEmptySeg, boolean strictCheck) throws IOException {
         checkCubeIsPartitioned(cube);
+        
+        if (strictCheck)
+            checkNoBuildingSegment(cube);
 
         Pair<Long, Long> range = alignMergeRange(cube, startDate, endDate);
         CubeSegment newSegment = newSegment(cube, range.getFirst(), range.getSecond());
@@ -621,13 +623,23 @@ public class CubeManager implements IRealizationProvider {
             return null;
         }
 
-        if (cube.getBuildingSegments().size() > 0) {
-            logger.debug("Cube " + cube.getName() + " has bulding segment, will not trigger
merge at this moment");
+        List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
+
+        if (readySegments.size() == 0) {
+            logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
             return null;
         }
+        List<CubeSegment> buildingSegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.NEW));
+        List<CubeSegment> toSkipSegments = Lists.newArrayList();
+        for (CubeSegment building : buildingSegments) {
+            for (CubeSegment ready : readySegments) {
+                if (ready.getDateRangeStart() >= building.getDateRangeStart() &&
ready.getDateRangeEnd() <= building.getDateRangeEnd()) {
+                    toSkipSegments.add(ready);
+                }
+            }
+        }
 
-        List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
-
+        readySegments.removeAll(toSkipSegments);
         if (readySegments.size() == 0) {
             logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
             return null;

http://git-wip-us.apache.org/repos/asf/kylin/blob/495ad927/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index b2a278a..abc613b 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -653,7 +653,7 @@ public class CubeService extends BasicService {
                     cube = getCubeManager().getCube(cubeName);
                     CubeSegment newSeg = getCubeManager().autoMergeCubeSegments(cube);
                     if (newSeg != null) {
-                        newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(),
newSeg.getDateRangeEnd(), true);
+                        newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(),
newSeg.getDateRangeEnd(), true, false);
                         logger.debug("Will submit merge job on " + newSeg);
                         DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(newSeg,
"SYSTEM");
                         getExecutableManager().addJob(job);


Mime
View raw message