kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [30/34] kylin git commit: KYLIN-3175, update streaming segment's tsrange after merge
Date Fri, 26 Jan 2018 14:56:38 GMT
KYLIN-3175, update streaming segment's tsrange after merge


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/16236ef3
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/16236ef3
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/16236ef3

Branch: refs/heads/master
Commit: 16236ef30cde20e474740fdee693ce26dbd88155
Parents: 35979ff
Author: Cheng Wang <cheng.wang@kyligence.io>
Authored: Wed Jan 17 15:55:16 2018 +0800
Committer: Li Yang <liyang@apache.org>
Committed: Fri Jan 26 22:54:58 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 33 ++++++++++++--------
 .../kylin/provision/BuildCubeWithStream.java    |  2 +-
 2 files changed, 21 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/16236ef3/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index df1d95e..8bdb5aa 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -276,17 +276,17 @@ public class CubeManager implements IRealizationProvider {
             return updateCube(update);
         }
     }
-    
+
     public CubeInstance updateCubeSegStatus(CubeSegment seg, SegmentStatusEnum status) throws
IOException {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             CubeInstance cube = seg.getCubeInstance().latestCopyForWrite();
             seg = cube.getSegmentById(seg.getUuid());
-            
+
             CubeUpdate update = new CubeUpdate(cube);
             seg.setStatus(status);
             update.setToUpdateSegs(seg);
             return updateCube(update);
-        }        
+        }
     }
 
     private CubeInstance updateCubeWithRetry(CubeUpdate update, int retry) throws IOException
{
@@ -523,7 +523,7 @@ public class CubeManager implements IRealizationProvider {
             return segAssist.optimizeSegments(cube, cuboidsRecommend);
         }
     }
-    
+
     public CubeSegment mergeSegments(CubeInstance cube, TSRange tsRange, SegmentRange segRange,
boolean force)
             throws IOException {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
@@ -547,9 +547,9 @@ public class CubeManager implements IRealizationProvider {
             CubeSegment... optimizedSegments) throws IOException {
         try (AutoLock lock = cubeMapLock.lockForWrite()) {
             segAssist.promoteCheckpointOptimizeSegments(cube, recommendCuboids, optimizedSegments);
-        }        
+        }
     }
-    
+
     public List<CubeSegment> calculateHoles(String cubeName) {
         return segAssist.calculateHoles(cubeName);
     }
@@ -683,8 +683,13 @@ public class CubeManager implements IRealizationProvider {
                 tsRange = null;
                 Preconditions.checkArgument(segRange != null);
             } else {
-                if(tsRange == null) {
-                    tsRange = new TSRange((Long)segRange.start.v, (Long)segRange.end.v);
+                /**In case of non-streaming segment,
+                 * tsRange is the same as segRange,
+                 * either could fulfill the merge job,
+                 * so it needs to convert segRange to tsRange if tsRange is null.
+                 **/
+                if (tsRange == null) {
+                    tsRange = new TSRange((Long) segRange.start.v, (Long) segRange.end.v);
                 }
                 segRange = null;
             }
@@ -832,10 +837,11 @@ public class CubeManager implements IRealizationProvider {
             updateCube(update);
         }
 
-        public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments)
throws IOException {
+        public void promoteNewlyOptimizeSegments(CubeInstance cube, CubeSegment... optimizedSegments)
+                throws IOException {
             CubeInstance cubeCopy = cube.latestCopyForWrite();
             CubeSegment[] segCopy = cube.regetSegments(optimizedSegments);
-            
+
             for (CubeSegment seg : segCopy) {
                 seg.setStatus(SegmentStatusEnum.READY_PENDING);
             }
@@ -849,12 +855,12 @@ public class CubeManager implements IRealizationProvider {
                 CubeSegment... optimizedSegments) throws IOException {
             CubeInstance cubeCopy = cube.latestCopyForWrite();
             CubeSegment[] optSegCopy = cubeCopy.regetSegments(optimizedSegments);
-            
+
             if (cubeCopy.getSegments().size() != optSegCopy.length * 2) {
                 throw new IllegalStateException("For cube " + cubeCopy
                         + ", every READY segment should be optimized and all segments should
be READY before optimizing");
             }
-            
+
             CubeSegment[] originalSegments = new CubeSegment[optSegCopy.length];
             int i = 0;
             for (CubeSegment seg : optSegCopy) {
@@ -865,7 +871,8 @@ public class CubeManager implements IRealizationProvider {
                             "For cube " + cubeCopy + ", segment " + seg + " missing StorageLocationIdentifier");
 
                 if (StringUtils.isBlank(seg.getLastBuildJobID()))
-                    throw new IllegalStateException("For cube " + cubeCopy + ", segment "
+ seg + " missing LastBuildJobID");
+                    throw new IllegalStateException(
+                            "For cube " + cubeCopy + ", segment " + seg + " missing LastBuildJobID");
 
                 seg.setStatus(SegmentStatusEnum.READY);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/16236ef3/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
index f7b8275..181e8b9 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithStream.java
@@ -370,7 +370,7 @@ public class BuildCubeWithStream {
         } catch (Throwable e) {
             logger.error("error", e);
             exitCode = 1;
-        } finally{
+        } finally {
             if (buildCubeWithStream != null) {
                 buildCubeWithStream.after();
                 buildCubeWithStream.cleanup();


Mime
View raw message