kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [3/3] incubator-kylin git commit: KYLIN-805 Drop useless HTables in the last step of cube merge
Date Mon, 01 Jun 2015 05:32:32 GMT
KYLIN-805 Drop useless HTables in the last step of cube merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b470ead3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b470ead3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b470ead3

Branch: refs/heads/0.8.0
Commit: b470ead3fa70e07f130d115a96061e7f5f18af17
Parents: fa6ea77
Author: shaofengshi <shaofengshi@apache.org>
Authored: Mon Jun 1 13:30:25 2015 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Mon Jun 1 13:32:06 2015 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/cube/CubeManager.java | 44 ++++++---
 .../kylin/job/constant/ExecutableConstants.java |  1 +
 .../apache/kylin/job/cube/CubingJobBuilder.java | 12 +++
 .../kylin/job/cube/DropOldHTableStep.java       | 96 ++++++++++++++++++++
 .../job/cube/UpdateCubeInfoAfterMergeStep.java  |  2 +
 .../kylin/job/streaming/CubeStreamBuilder.java  |  2 +-
 6 files changed, 141 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b470ead3/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index b986c36..3821616 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -266,16 +266,19 @@ public class CubeManager implements IRealizationProvider {
         CubeSegment mergeSegment = newSegment(cube, startDate, endDate);
 
         validateNewSegments(cube, mergeSegment);
-        cube.getSegments().add(appendSegment);
-        cube.getSegments().add(mergeSegment);
-        Collections.sort(cube.getSegments());
-        updateCube(cube, false);
+        saveCubeSegmentChange(cube, Lists.newArrayList(appendSegment, mergeSegment), null);
 
         return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment);
     }
 
+
     public CubeSegment appendSegments(CubeInstance cube, long endDate) throws IOException
{
-        checkNoBuildingSegment(cube);
+        return appendSegments(cube, endDate, true);
+    }
+
+    public CubeSegment appendSegments(CubeInstance cube, long endDate, boolean checkNoBuilding)
throws IOException {
+        if (checkNoBuilding)
+            checkNoBuildingSegment(cube);
 
         CubeSegment newSegment;
         if (cube.getDescriptor().getModel().getPartitionDesc().isPartitioned()) {
@@ -286,9 +289,8 @@ public class CubeManager implements IRealizationProvider {
         }
 
         validateNewSegments(cube, newSegment);
-        cube.getSegments().add(newSegment);
-        Collections.sort(cube.getSegments());
-        updateCube(cube, false);
+        saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+
 
         return newSegment;
     }
@@ -297,9 +299,7 @@ public class CubeManager implements IRealizationProvider {
         checkNoBuildingSegment(cube);
 
         CubeSegment newSegment = newSegment(cube, startDate, endDate);
-        cube.getSegments().add(newSegment);
-        Collections.sort(cube.getSegments());
-        updateCube(cube, false);
+        saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
 
         return newSegment;
     }
@@ -312,11 +312,23 @@ public class CubeManager implements IRealizationProvider {
         CubeSegment newSegment = newSegment(cube, range.getFirst(), range.getSecond());
 
         validateNewSegments(cube, newSegment);
-        cube.getSegments().add(newSegment);
+        saveCubeSegmentChange(cube, Lists.newArrayList(newSegment), null);
+
+        return newSegment;
+    }
+
+    protected void saveCubeSegmentChange(CubeInstance cube, List<CubeSegment> toAdd,
List<CubeSegment> toRemove) throws IOException {
+        cube = this.reloadCubeLocal(cube.getName());
+
+        if (toAdd != null && toAdd.size() > 0)
+            cube.getSegments().addAll(toAdd);
+
+        if (toRemove != null && toRemove.size() > 0)
+            cube.getSegments().removeAll(toRemove);
+
         Collections.sort(cube.getSegments());
         updateCube(cube, false);
 
-        return newSegment;
     }
 
     private Pair<Long, Long> alignMergeRange(CubeInstance cube, long startDate, long
endDate) {
@@ -381,12 +393,14 @@ public class CubeManager implements IRealizationProvider {
      *
      * @param cubeName
      */
-    public void reloadCubeLocal(String cubeName) {
+    public CubeInstance reloadCubeLocal(String cubeName) {
         try {
-            reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
+            return reloadCubeLocalAt(CubeInstance.concatResourcePath(cubeName));
         } catch (IOException e) {
             logger.error(e.getLocalizedMessage(), e);
         }
+
+        return null;
     }
 
     public void removeCubeLocal(String cubeName) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b470ead3/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 1741692..49decd1 100644
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -57,6 +57,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
+    public static final String STEP_NAME_DROP_OLD_HBASE_TABLE = "Drop Old HTables";
     
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data
to HFile";

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b470ead3/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
index 1c34441..c13415e 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
@@ -131,6 +131,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds,
convertCuboidToHfileStep.getId(), jobId));
 
+        result.addTask(createDropUnusedHTableStep(mergeSegment, mergingHTables));
+
         return result;
     }
 
@@ -170,6 +172,8 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         // update cube info
         result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(),
jobId));
 
+        result.addTask(createDropUnusedHTableStep(seg, mergingHTables));
+
         return result;
     }
 
@@ -575,4 +579,12 @@ public final class CubingJobBuilder extends AbstractJobBuilder {
         return result;
     }
 
+
+    private DropOldHTableStep createDropUnusedHTableStep(CubeSegment seg, List<String>
oldHtables) {
+        DropOldHTableStep result = new DropOldHTableStep();
+        result.setName(ExecutableConstants.STEP_NAME_DROP_OLD_HBASE_TABLE);
+        result.setOldHTables(oldHtables);
+        return result;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b470ead3/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java b/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java
new file mode 100644
index 0000000..48b9bca
--- /dev/null
+++ b/job/src/main/java/org/apache/kylin/job/cube/DropOldHTableStep.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.job.cube;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ */
+public class DropOldHTableStep extends AbstractExecutable {
+
+    private static final String OLD_HTABLES = "oldHTables";
+
+    private static final Logger logger = LoggerFactory.getLogger(DropOldHTableStep.class);
+
+    public DropOldHTableStep() {
+        super();
+    }
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
+        Configuration conf = HBaseConfiguration.create();
+        try {
+            HBaseAdmin admin = new HBaseAdmin(conf);
+
+            List<String> oldTables = getOldHTables();
+
+            for(String table : oldTables) {
+                admin.disableTable(table);
+                admin.deleteTable(table);
+                logger.debug("Dropped htable: " + table);
+            }
+
+        } catch (IOException e) {
+            logger.error("Failed to drop old htables;", e);
+            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+        }
+
+        return new ExecuteResult(ExecuteResult.State.SUCCEED);
+    }
+
+    public void setOldHTables(List<String> ids) {
+        setParam(OLD_HTABLES, StringUtils.join(ids, ","));
+    }
+
+    private List<String> getOldHTables() {
+        final String ids = getParam(OLD_HTABLES);
+        if (ids != null) {
+            final String[] splitted = StringUtils.split(ids, ",");
+            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
+            for (String id: splitted) {
+                result.add(id);
+            }
+            return result;
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b470ead3/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
index ea52c36..1836e17 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
@@ -36,6 +36,7 @@ import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
 import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
 
 /**
  */
@@ -84,6 +85,7 @@ public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
         mergedSegment.setInputRecordsSize(sourceSize);
         mergedSegment.setLastBuildJobID(getCubingJobId());
         mergedSegment.setLastBuildTime(System.currentTimeMillis());
+        mergedSegment.setStatus(SegmentStatusEnum.READY);
         
         try {
             cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b470ead3/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
index be44e18..07e19b8 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamBuilder.java
@@ -105,7 +105,7 @@ public class CubeStreamBuilder extends StreamBuilder {
 
         final CubeInstance cubeInstance = cubeManager.getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName),
System.currentTimeMillis());
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName),
System.currentTimeMillis(), false);
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(),
parsedStreamMessages);
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();


Mime
View raw message