kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [1/3] incubator-kylin git commit: KYLIN-876 Refactor CubingJobBuilder into engine.mr package; Stop support of build+merge job
Date Fri, 10 Jul 2015 08:39:04 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 cbf3550ce -> 3011f6fd9


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
deleted file mode 100644
index be52040..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Date;
-
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-/**
- */
-public class CubingJob extends DefaultChainedExecutable {
-
-    // KEYS of Output.extraInfo map, info passed across job steps
-    public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
-    public static final String SOURCE_SIZE_BYTES = "sourceSizeBytes";
-    public static final String CUBE_SIZE_BYTES = "byteSizeBytes";
-    public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-    
-    private static final String CUBE_INSTANCE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-
-    public CubingJob() {
-        super();
-    }
-
-    void setCubeName(String name) {
-        setParam(CUBE_INSTANCE_NAME, name);
-    }
-
-    public String getCubeName() {
-        return getParam(CUBE_INSTANCE_NAME);
-    }
-
-    void setSegmentId(String segmentId) {
-        setParam(SEGMENT_ID, segmentId);
-    }
-
-    public String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    @Override
-    protected Pair<String, String> formatNotifications(ExecutableContext context, ExecutableState state) {
-        CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(getCubeName());
-        final Output output = jobService.getOutput(getId());
-        String logMsg;
-        state = output.getState();
-        if (state != ExecutableState.ERROR &&
-                !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString().toLowerCase())) {
-            logger.info("state:" + state + " no need to notify users");
-            return null;
-        }
-        switch (state) {
-            case ERROR:
-                logMsg = output.getVerboseMsg();
-                break;
-            case DISCARDED:
-                logMsg = "job has been discarded";
-                break;
-            case SUCCEED:
-                logMsg = "job has succeeded";
-                break;
-            default:
-                return null;
-        }
-        if (logMsg == null) {
-            logMsg = "no error message";
-        }
-        String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
-        content = content.replaceAll("\\$\\{job_name\\}", getName());
-        content = content.replaceAll("\\$\\{result\\}", state.toString());
-        content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
-        content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
-        content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
-        content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
-        content = content.replaceAll("\\$\\{submitter\\}", getSubmitter());
-        content = content.replaceAll("\\$\\{error_log\\}", logMsg);
-
-        try {
-            InetAddress inetAddress = InetAddress.getLocalHost();
-            content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
-        } catch (UnknownHostException e) {
-            logger.warn(e.getLocalizedMessage(), e);
-        }
-
-        String title = "["+ state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
-        return Pair.of(title, content);
-    }
-
-    @Override
-    protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
-        long time = 0L;
-        for (AbstractExecutable task: getTasks()) {
-            final ExecutableState status = task.getStatus();
-            if (status != ExecutableState.SUCCEED) {
-                break;
-            }
-            if (task instanceof MapReduceExecutable) {
-                time += ((MapReduceExecutable) task).getMapReduceWaitTime();
-            }
-        }
-        setMapReduceWaitTime(time);
-        super.onExecuteFinished(result, executableContext);
-    }
-
-    public long getMapReduceWaitTime() {
-        return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
-    }
-
-    public void setMapReduceWaitTime(long t) {
-        addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
-    }
-    
-    public long findSourceRecordCount() {
-        return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
-    }
-    
-    public long findSourceSizeBytes() {
-        return Long.parseLong(findExtraInfo(SOURCE_SIZE_BYTES, "0"));
-    }
-    
-    public long findCubeSizeBytes() {
-        return Long.parseLong(findExtraInfo(CUBE_SIZE_BYTES, "0"));
-    }
-    
-    private String findExtraInfo(String key, String dft) {
-        for (AbstractExecutable child : getTasks()) {
-            Output output = executableManager.getOutput(child.getId());
-            String value = output.getExtra().get(key);
-            if (value != null)
-                return value;
-        }
-        return dft;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
deleted file mode 100644
index 21d1f8d..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.AbstractJobBuilder;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.cubev2.InMemCuboidJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeCuboidFromHBaseJob;
-import org.apache.kylin.job.hadoop.cubev2.MergeStatisticsStep;
-import org.apache.kylin.job.hadoop.cubev2.SaveStatisticsStep;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public final class CubingJobBuilder extends AbstractJobBuilder {
-
-
-    public CubingJobBuilder(JobEngineConfig engineConfig) {
-        super(engineConfig);
-    }
-
-    private boolean inMemoryCubing() {
-        return engineConfig.getConfig().isCubingInMem();
-    }
-
-    public CubingJob buildJob(CubeSegment seg) {
-        checkPreconditions(seg);
-        final CubingJob result = initialJob(seg, "BUILD");
-        final String jobId = result.getId();
-        final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
-
-        // cubing
-        addCubingSteps(seg, cuboidRootPath, result);
-
-        if (this.inMemoryCubing()) {
-            result.addTask(createUpdateCubeInfoAfterBuildStep(seg, jobId));
-        } else {
-            // convert htable
-            addHTableSteps(seg, cuboidRootPath, result);
-            // update cube info
-            result.addTask(createUpdateCubeInfoAfterBuildStep(seg, jobId));
-        }
-
-        final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-        final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
-        result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable));
-
-        return result;
-    }
-
-    public CubingJob buildAndMergeJob(CubeSegment appendSegment, CubeSegment mergeSegment) {
-        checkPreconditions(appendSegment, mergeSegment);
-
-        CubingJob result = initialJob(mergeSegment, "BUILD");
-        final String jobId = result.getId();
-        final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/";
-        final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
-
-        // cubing the incremental segment
-        addCubingSteps(appendSegment, appendRootPath, result);
-
-        // update the append segment info
-        result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, jobId));
-
-        List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        List<String> mergingSegmentIds = Lists.newArrayList();
-        List<String> mergingCuboidPaths = Lists.newArrayList();
-        List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-            if (merging.equals(appendSegment))
-                mergingCuboidPaths.add(appendRootPath + "*");
-            else
-                mergingCuboidPaths.add(getPathToMerge(merging));
-        }
-
-        if (this.inMemoryCubing()) {
-            // merge from HTable
-            addMergeFromHBaseSteps(mergeSegment, mergingSegmentIds, mergingHTables, mergedRootPath, result);
-            // bulk load step
-            result.addTask(createBulkLoadStep(mergeSegment, result.getId()));
-        } else {
-            // merge cuboid
-            addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
-            // convert htable
-            addHTableSteps(mergeSegment, mergedRootPath, result);
-        }
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, jobId));
-
-        result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null));
-
-        return result;
-    }
-
-    public CubingJob mergeJob(CubeSegment seg) {
-        checkPreconditions(seg);
-
-        CubingJob result = initialJob(seg, "MERGE");
-        final String jobId = result.getId();
-        final String mergedCuboidPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
-
-        List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
-        Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-        List<String> mergingSegmentIds = Lists.newArrayList();
-        List<String> mergingCuboidPaths = Lists.newArrayList();
-        List<String> mergingHTables = Lists.newArrayList();
-        for (CubeSegment merging : mergingSegments) {
-            mergingSegmentIds.add(merging.getUuid());
-            mergingCuboidPaths.add(getPathToMerge(merging));
-            mergingHTables.add(merging.getStorageLocationIdentifier());
-        }
-
-        if (this.inMemoryCubing()) {
-            // merge from HTable
-            addMergeFromHBaseSteps(seg, mergingSegmentIds, mergingHTables, mergedCuboidPath, result);
-            // bulk load step
-            result.addTask(createBulkLoadStep(seg, result.getId()));
-        } else {
-            // merge cuboid
-            addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
-            // convert htable
-            addHTableSteps(seg, mergedCuboidPath, result);
-        }
-
-        // update cube info
-        result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, jobId));
-
-        result.addTask(createGarbageCollectionStep(seg, mergingHTables, null));
-
-        return result;
-    }
-
-    void addMergeSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingCuboidPaths, String mergedCuboidPath, CubingJob result) {
-
-        result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
-
-        String formattedPath = StringUtils.join(mergingCuboidPaths, ",");
-        result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
-    }
-
-
-    void addMergeFromHBaseSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingHTables, String mergedCuboidPath, CubingJob result) {
-
-        result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
-
-        String mergedStatisticsFolder = getStatisticsPath(seg, result.getId());
-        result.addTask(createMergeStatisticsStep(seg, mergingSegmentIds, mergedStatisticsFolder));
-
-        // create htable step
-        result.addTask(createCreateHTableStep(seg));
-
-        String formattedTables = StringUtils.join(mergingHTables, ",");
-        String hFilePath = getHFilePath(seg, result.getId());
-        result.addTask(createMergeCuboidDataFromHBaseStep(seg, formattedTables, hFilePath));
-    }
-
-    void addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
-        final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
-        final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
-
-        final String jobId = result.getId();
-        final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
-        final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId);
-        final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId);
-        final String factDistinctColumnsPath = getFactDistinctColumnsPath(seg, jobId);
-        final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-
-        final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
-        result.addTask(intermediateHiveTableStep);
-        result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId));
-        
-        if (!inMemoryCubing()) {
-            result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-            // base cuboid step
-            result.addTask(createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath));
-
-            // n dim cuboid steps
-            for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
-                int dimNum = totalRowkeyColumnsCount - i;
-                result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
-            }
-        } else {
-            // create htable step
-            result.addTask(createSaveStatisticsStep(seg, getStatisticsPath(seg, jobId)));
-            result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-            result.addTask(createCreateHTableStep(seg));
-            result.addTask(createInMemCubingStep(seg, intermediateHiveTableLocation, intermediateHiveTableName, cuboidOutputTempPath, result.getId()));
-            // bulk load step
-            result.addTask(createBulkLoadStep(seg, result.getId()));
-        }
-    }
-
-    void addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
-        final String jobId = result.getId();
-        final String cuboidPath = cuboidRootPath + "*";
-
-        result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath));
-        // create htable step
-        result.addTask(createCreateHTableStep(seg));
-        // generate hfiles step
-        result.addTask(createConvertCuboidToHfileStep(seg, cuboidPath, jobId));
-        // bulk load step
-        result.addTask(createBulkLoadStep(seg, jobId));
-    }
-
-    private CubingJob initialJob(CubeSegment seg, String type) {
-        CubingJob result = new CubingJob();
-        SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
-        format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(submitter);
-        result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
-        return result;
-    }
-
-    private void checkPreconditions(CubeSegment... segments) {
-        for (CubeSegment seg : segments) {
-            Preconditions.checkNotNull(seg, "segment cannot be null");
-        }
-        Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
-    }
-
-    private String getPathToMerge(CubeSegment seg) {
-        return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*";
-    }
-
-    private String getRowkeyDistributionOutputPath(CubeSegment seg) {
-        return engineConfig.getHdfsWorkingDirectory() + "/" + seg.getCubeInstance().getName() + "/rowkey_stats";
-    }
-
-    private void appendMapReduceParameters(StringBuilder builder, CubeSegment seg) {
-        try {
-            String jobConf = engineConfig.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
-            if (jobConf != null && jobConf.length() > 0) {
-                builder.append(" -conf ").append(jobConf);
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
-        String[] paths = new String[groupRowkeyColumnsCount + 1];
-        for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
-            int dimNum = totalRowkeyColumnCount - i;
-            if (dimNum == totalRowkeyColumnCount) {
-                paths[i] = cuboidRootPath + "base_cuboid";
-            } else {
-                paths[i] = cuboidRootPath + dimNum + "d_cuboid";
-            }
-        }
-        return paths;
-    }
-
-    private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
-    }
-
-
-    private String getStatisticsPath(CubeSegment seg, String jobUuid) {
-        return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/statistics";
-    }
-
-
-    private String getHFilePath(CubeSegment seg, String jobId) {
-        return getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/";
-    }
-
-    private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) {
-        MapReduceExecutable result = new MapReduceExecutable();
-        result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        result.setMapReduceJobClass(FactDistinctColumnsJob.class);
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(seg, jobId));
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(seg, jobId));
-        appendExecCmdParameters(cmd, "statisticssamplingpercent", String.valueOf(engineConfig.getConfig().getCubingInMemSamplingPercent()));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName);
-
-        result.setMapReduceParams(cmd.toString());
-        return result;
-    }
-
-    private HadoopShellExecutable createBuildDictionaryStep(CubeSegment seg, String factDistinctColumnsPath) {
-        // base cuboid job
-        HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
-        buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
-        buildDictionaryStep.setJobParams(cmd.toString());
-        buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
-        return buildDictionaryStep;
-    }
-
-    private MapReduceExecutable createInMemCubingStep(CubeSegment seg, String intermediateHiveTableLocation, String intermediateHiveTableName, String[] cuboidOutputTempPath, String jobId) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation);
-        appendExecCmdParameters(cmd, "statisticsoutput", getStatisticsPath(seg, jobId));
-        // appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-        appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName);
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(InMemCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-    private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableLocation, String[] cuboidOutputTempPath) {
-        // base cuboid job
-        MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
-        StringBuilder cmd = new StringBuilder();
-        appendMapReduceParameters(cmd, seg);
-
-        baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
-
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "level", "0");
-
-        baseCuboidStep.setMapReduceParams(cmd.toString());
-        baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
-        baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
-        return baseCuboidStep;
-    }
-
-    private MapReduceExecutable createNDimensionCuboidStep(CubeSegment seg, String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
-        // ND cuboid job
-        MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
-
-        ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
-        appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
-        appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
-
-        ndCuboidStep.setMapReduceParams(cmd.toString());
-        ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
-        return ndCuboidStep;
-    }
-
-    private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath) {
-        MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
-        rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg));
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
-
-        rowkeyDistributionStep.setMapReduceParams(cmd.toString());
-        rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
-        return rowkeyDistributionStep;
-    }
-
-    private HadoopShellExecutable createCreateHTableStep(CubeSegment seg) {
-        HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
-        createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg) + "/part-r-00000");
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "statisticsenabled", String.valueOf(inMemoryCubing()));
-
-        createHtableStep.setJobParams(cmd.toString());
-        createHtableStep.setJobClass(CreateHTableJob.class);
-
-        return createHtableStep;
-    }
-
-    private MapReduceExecutable createConvertCuboidToHfileStep(CubeSegment seg, String inputPath, String jobId) {
-        MapReduceExecutable createHFilesStep = new MapReduceExecutable();
-        createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
-        createHFilesStep.setMapReduceParams(cmd.toString());
-        createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
-        createHFilesStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-
-        return createHFilesStep;
-    }
-
-    private HadoopShellExecutable createBulkLoadStep(CubeSegment seg, String jobId) {
-        HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
-        bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
-        StringBuilder cmd = new StringBuilder();
-        appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
-        bulkLoadStep.setJobParams(cmd.toString());
-        bulkLoadStep.setJobClass(BulkLoadJob.class);
-
-        return bulkLoadStep;
-
-    }
-
-    private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String jobId) {
-        final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
-        updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
-        updateCubeInfoStep.setSegmentId(seg.getUuid());
-        updateCubeInfoStep.setCubingJobId(jobId);
-        return updateCubeInfoStep;
-    }
-
-    private MergeDictionaryStep createMergeDictionaryStep(CubeSegment seg, List<String> mergingSegmentIds) {
-        MergeDictionaryStep result = new MergeDictionaryStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        return result;
-    }
-
-
-    private MergeStatisticsStep createMergeStatisticsStep(CubeSegment seg, List<String> mergingSegmentIds, String mergedStatisticsFolder) {
-        MergeStatisticsStep result = new MergeStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_MERGE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setMergedStatisticsPath(mergedStatisticsFolder);
-        return result;
-    }
-
-    private SaveStatisticsStep createSaveStatisticsStep(CubeSegment seg, String statisticsPath) {
-        SaveStatisticsStep result = new SaveStatisticsStep();
-        result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setStatisticsPath(statisticsPath);
-        return result;
-    }
-
-
-    private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", outputPath);
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
-        return mergeCuboidDataStep;
-    }
-
-
-    private MapReduceExecutable createMergeCuboidDataFromHBaseStep(CubeSegment seg, String inputPath, String hFilePath) {
-        MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
-        mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
-        StringBuilder cmd = new StringBuilder();
-
-        appendMapReduceParameters(cmd, seg);
-        appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-        appendExecCmdParameters(cmd, "segmentname", seg.getName());
-        appendExecCmdParameters(cmd, "input", inputPath);
-        appendExecCmdParameters(cmd, "output", hFilePath);
-        appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-        appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
-        mergeCuboidDataStep.setMapReduceParams(cmd.toString());
-        mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromHBaseJob.class);
-        mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
-        return mergeCuboidDataStep;
-    }
-
-    private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String jobId) {
-        UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
-        result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
-        result.setCubeName(seg.getCubeInstance().getName());
-        result.setSegmentId(seg.getUuid());
-        result.setMergingSegmentIds(mergingSegmentIds);
-        result.setCubingJobId(jobId);
-        return result;
-    }
-
-
-    private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable) {
-        GarbageCollectionStep result = new GarbageCollectionStep();
-        result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
-        result.setOldHTables(oldHtables);
-        result.setOldHiveTable(hiveIntermediateTable);
-        return result;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
deleted file mode 100644
index 828df7e..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-public class GarbageCollectionStep extends AbstractExecutable {
-
-    private static final String OLD_HTABLES = "oldHTables";
-
-    private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
-    private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
-    public GarbageCollectionStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
-        ExecuteResult.State state = null;
-        StringBuffer output = new StringBuffer();
-
-        final String hiveTable = this.getOldHiveTable();
-        if (StringUtils.isNotEmpty(hiveTable)) {
-            final String dropHiveCMD = "hive -e \"DROP TABLE IF EXISTS  " + hiveTable + ";\"";
-            ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
-            try {
-                context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
-                output.append("Hive table " + hiveTable + " is dropped. \n");
-            } catch (IOException e) {
-                logger.error("job:" + getId() + " execute finished with exception", e);
-                output.append(shellCmdOutput.getOutput()).append("\n").append(e.getLocalizedMessage());
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
-            }
-        }
-
-
-        List<String> oldTables = getOldHTables();
-        if (oldTables != null && oldTables.size() > 0) {
-            String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
-            Configuration conf = HBaseConfiguration.create();
-            HBaseAdmin admin = null;
-            try {
-                admin = new HBaseAdmin(conf);
-                for (String table : oldTables) {
-                    if (admin.tableExists(table)) {
-                        HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
-                        String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
-                        if (metadataUrlPrefix.equalsIgnoreCase(host)) {
-                            if (admin.isTableEnabled(table)) {
-                                admin.disableTable(table);
-                            }
-                            admin.deleteTable(table);
-                            logger.debug("Dropped htable: " + table);
-                            output.append("HBase table " + table + " is dropped. \n");
-                        } else {
-                            logger.debug("Skip htable: " + table);
-                            output.append("Skip htable: " + table + ". \n");
-                        }
-                    }
-                }
-
-            } catch (IOException e) {
-                output.append("Got error when drop HBase table, exiting... \n");
-                // This should not block the merge job; Orphans should be cleaned up in StorageCleanupJob
-                return new ExecuteResult(ExecuteResult.State.ERROR, output.append(e.getLocalizedMessage()).toString());
-            } finally {
-                if (admin != null)
-                    try {
-                        admin.close();
-                    } catch (IOException e) {
-                        logger.error(e.getLocalizedMessage());
-                    }
-            }
-        }
-
-
-        return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
-    }
-
-    public void setOldHTables(List<String> ids) {
-        setParam(OLD_HTABLES, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getOldHTables() {
-        final String ids = getParam(OLD_HTABLES);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setOldHiveTable(String hiveTable) {
-        setParam(OLD_HIVE_TABLE, hiveTable);
-    }
-
-    private String getOldHiveTable() {
-        return getParam(OLD_HIVE_TABLE);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
deleted file mode 100644
index 625029b..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
-    public MergeDictionaryStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig conf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(conf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-        
-        Collections.sort(mergingSegments);
-        
-        try {
-            checkLookupSnapshotsMustIncremental(mergingSegments);
-            
-            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
-            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
-            CubeBuilder cubeBuilder = new CubeBuilder(cube);
-            cubeBuilder.setToUpdateSegs(newSegment);
-            mgr.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
-        for (String id : mergingSegmentIds) {
-            result.add(cube.getSegmentById(id));
-        }
-        return result;
-    }
-
-    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
-
-        // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
-    }
-
-    /**
-     * For the new segment, we need to create dictionaries for it, too. For
-     * those dictionaries on fact table, create it by merging underlying
-     * dictionaries For those dictionaries on lookup table, just copy it from
-     * any one of the merging segments, it's guaranteed to be consistent(checked
-     * in CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     * @throws IOException
-     */
-    private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
-        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
-        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
-            }
-        }
-
-        for (TblColRef col : colsNeedMeringDict) {
-            logger.info("Merging fact table dictionary on : " + col);
-            List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
-            for (CubeSegment segment : mergingSegments) {
-                logger.info("Including fact table dictionary of segment : " + segment);
-                if (segment.getDictResPath(col) != null) {
-                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
-                    dictInfos.add(dictInfo);
-                }
-            }
-            mergeDictionaries(dictMgr, newSeg, dictInfos, col);
-        }
-
-        for (TblColRef col : colsNeedCopyDict) {
-            String path = mergingSegments.get(0).getDictResPath(col);
-            newSeg.putDictResPath(col, path);
-        }
-    }
-
-    private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
-        DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
-        if (dictInfo != null)
-            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-
-        return dictInfo;
-    }
-
-    /**
-     * make snapshots for the new segment by copying from one of the underlying
-     * merging segments. it's guaranteed to be consistent(checked in
-     * CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     */
-    private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
-            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id: splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index 8ad1f7a..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.CubeBuilder;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String CUBE_NAME = "cubeName";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    public UpdateCubeInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-        final CubeSegment segment = cube.getSegmentById(getSegmentId());
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long sourceCount = cubingJob.findSourceRecordCount();
-        long sourceSizeBytes = cubingJob.findSourceSizeBytes();
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-        boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
-
-        segment.setLastBuildJobID(getCubingJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setSizeKB(cubeSizeBytes / 1024);
-        segment.setInputRecords(sourceCount);
-        segment.setInputRecordsSize(sourceSizeBytes);
-
-        try {
-            if (segmentReady) {
-                cubeManager.promoteNewlyBuiltSegments(cube, segment);
-            } else {
-                CubeBuilder cubeBuilder = new CubeBuilder(cube);
-                cubeBuilder.setToUpdateSegs(segment);
-                cubeManager.updateCube(cubeBuilder);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index 47e5a31..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public UpdateCubeInfoAfterMergeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-
-        CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
-        if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
-        }
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-
-        // collect source statistics
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
-        }
-        long sourceCount = 0L;
-        long sourceSize = 0L;
-        for (String id : mergingSegmentIds) {
-            CubeSegment segment = cube.getSegmentById(id);
-            sourceCount += segment.getInputRecords();
-            sourceSize += segment.getInputRecordsSize();
-        }
-
-        // update segment info
-        mergedSegment.setSizeKB(cubeSizeBytes / 1024);
-        mergedSegment.setInputRecords(sourceCount);
-        mergedSegment.setInputRecordsSize(sourceSize);
-        mergedSegment.setLastBuildJobID(getCubingJobId());
-        mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (IOException e) {
-            logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index 8c752c2..422c6d7 100644
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -35,6 +35,10 @@ public class JobEngineConfig {
     public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
     public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
 
+    public boolean isInMemCubing() {
+        return config.isCubingInMem();
+    }
+    
     private static File getJobConfig(String fileName) {
         String path = System.getProperty(KylinConfig.KYLIN_CONF);
         if (StringUtils.isNotEmpty(path)) {
@@ -96,17 +100,6 @@ public class JobEngineConfig {
         return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
     }
 
-    private void inputStreamToFile(InputStream ins, File file) throws IOException {
-        OutputStream os = new FileOutputStream(file);
-        int bytesRead = 0;
-        byte[] buffer = new byte[8192];
-        while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
-            os.write(buffer, 0, bytesRead);
-        }
-        os.close();
-        ins.close();
-    }
-
     // there should be no setters
     private final KylinConfig config;
 
@@ -121,8 +114,7 @@ public class JobEngineConfig {
     public String getHdfsWorkingDirectory() {
         return config.getHdfsWorkingDirectory();
     }
-
-
+    
     /**
      * @return the maxConcurrentJobLimit
      */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
index 26a67f4..1ae101d 100644
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
@@ -23,17 +23,15 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.TimeZone;
 
-import com.google.common.base.Preconditions;
-
+import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.invertedindex.IISegment;
-import org.apache.kylin.job.AbstractJobBuilder;
+import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.hadoop.dict.CreateInvertedIndexDictionaryJob;
-import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
 import org.apache.kylin.job.hadoop.invertedindex.IIBulkLoadJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHFileJob;
 import org.apache.kylin.job.hadoop.invertedindex.IICreateHTableJob;
@@ -41,26 +39,30 @@ import org.apache.kylin.job.hadoop.invertedindex.IIDistinctColumnsJob;
 import org.apache.kylin.job.hadoop.invertedindex.InvertedIndexJob;
 import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
 
+import com.google.common.base.Preconditions;
+
 /**
  */
-public final class IIJobBuilder extends AbstractJobBuilder {
+public final class IIJobBuilder {
 
+    final JobEngineConfig engineConfig;
+    
     public IIJobBuilder(JobEngineConfig engineConfig) {
-        super(engineConfig);
+        this.engineConfig = engineConfig;
     }
 
-    public IIJob buildJob(IISegment seg) {
+    public IIJob buildJob(IISegment seg, String submitter) {
         checkPreconditions(seg);
 
-        IIJob result = initialJob(seg, "BUILD");
+        IIJob result = initialJob(seg, "BUILD", submitter);
         final String jobId = result.getId();
         final IIJoinedFlatTableDesc intermediateTableDesc = new IIJoinedFlatTableDesc(seg.getIIDesc());
-        final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId);
+        final String intermediateHiveTableName = intermediateTableDesc.getTableName(jobId);
         final String factDistinctColumnsPath = getIIDistinctColumnsPath(seg, jobId);
         final String iiRootPath = getJobWorkingDir(jobId) + "/" + seg.getIIInstance().getName() + "/";
         final String iiPath = iiRootPath + "*";
 
-        final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
+        final AbstractExecutable intermediateHiveTableStep = createFlatHiveTableStep(intermediateTableDesc, jobId);
         result.addTask(intermediateHiveTableStep);
 
         result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId, factDistinctColumnsPath));
@@ -81,14 +83,18 @@ public final class IIJobBuilder extends AbstractJobBuilder {
         return result;
     }
 
-    private IIJob initialJob(IISegment seg, String type) {
+    private AbstractExecutable createFlatHiveTableStep(IIJoinedFlatTableDesc intermediateTableDesc, String jobId) {
+        return JobBuilderSupport.createFlatHiveTableStep(engineConfig, intermediateTableDesc, jobId);
+    }
+
+    private IIJob initialJob(IISegment seg, String type, String submitter) {
         IIJob result = new IIJob();
         SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
         format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
         result.setIIName(seg.getIIInstance().getName());
         result.setSegmentId(seg.getUuid());
         result.setName(seg.getIIInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
-        result.setSubmitter(this.submitter);
+        result.setSubmitter(submitter);
         return result;
     }
 
@@ -210,4 +216,11 @@ public final class IIJobBuilder extends AbstractJobBuilder {
 
     }
 
+    private StringBuilder appendExecCmdParameters(StringBuilder buf, String paraName, String paraValue) {
+        return buf.append(" -").append(paraName).append(" ").append(paraValue);
+    }
+
+    private String getJobWorkingDir(String uuid) {
+        return engineConfig.getHdfsWorkingDirectory() + "/" + "kylin-" + uuid;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
index 00c2eef..3de467e 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput.java
@@ -29,4 +29,10 @@ public class HBaseMROutput implements IMROutput {
         return null;
     }
 
+    @Override
+    public IMRJobFlowParticipant createMergeFlowParticipant() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 4d0dc35..066cdd6 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -18,7 +18,19 @@
 
 package org.apache.kylin.job;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.lang.reflect.Method;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kylin.common.KylinConfig;
@@ -30,10 +42,11 @@ import org.apache.kylin.cube.CubeBuilder;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.cube.CubingJob;
-import org.apache.kylin.job.cube.CubingJobBuilder;
+import org.apache.kylin.engine.BuildEngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.impl.threadpool.DefaultScheduler;
 import org.apache.kylin.job.manager.ExecutableManager;
@@ -42,23 +55,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
-import java.lang.reflect.Method;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.TimeZone;
-import java.util.concurrent.*;
-
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
 
 public class BuildCubeWithEngineTest {
 
-    private JobEngineConfig jobEngineConfig;
-
     private CubeManager cubeManager;
-
     private DefaultScheduler scheduler;
-
     protected ExecutableManager jobService;
 
     private static final Log logger = LogFactory.getLog(BuildCubeWithEngineTest.class);
@@ -102,7 +104,6 @@ public class BuildCubeWithEngineTest {
             throw new RuntimeException("scheduler has not been started");
         }
         cubeManager = CubeManager.getInstance(kylinConfig);
-        jobEngineConfig = new JobEngineConfig(kylinConfig);
         for (String jobId : jobService.getAllJobIds()) {
             if (jobService.getJob(jobId) instanceof CubingJob) {
                 jobService.deleteJob(jobId);
@@ -274,8 +275,7 @@ public class BuildCubeWithEngineTest {
 
     private String buildSegment(String cubeName, long startDate, long endDate) throws Exception {
         CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate);
-        CubingJobBuilder cubingJobBuilder = new CubingJobBuilder(jobEngineConfig);
-        CubingJob job = cubingJobBuilder.buildJob(segment);
+        DefaultChainedExecutable job = BuildEngineFactory.createBatchBuildJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
         return job.getId();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
index 02bb6b8..2382540 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithEngineTest.java
@@ -217,7 +217,7 @@ public class BuildIIWithEngineTest {
         iiInstance.getSegments().add(segment);
         iiManager.updateII(iiInstance);
         IIJobBuilder iiJobBuilder = new IIJobBuilder(jobEngineConfig);
-        IIJob job = iiJobBuilder.buildJob(segment);
+        IIJob job = iiJobBuilder.buildJob(segment, "TEST");
         jobService.addJob(job);
         waitForJob(job.getId());
         return job.getId();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
index 68b81dd..70deada 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/BasicService.java
@@ -42,9 +42,9 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.invertedindex.IIDescManager;
 import org.apache.kylin.invertedindex.IIManager;
-import org.apache.kylin.job.cube.CubingJob;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
index 43ecfd5..92ea678 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/CubeService.java
@@ -18,6 +18,16 @@
 
 package org.apache.kylin.rest.service;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.WeakHashMap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.kylin.common.KylinConfig;
@@ -29,10 +39,9 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidCLI;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.BuildEngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.cube.CubingJob;
-import org.apache.kylin.job.cube.CubingJobBuilder;
-import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
@@ -62,9 +71,6 @@ import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.stereotype.Component;
 
-import java.io.IOException;
-import java.util.*;
-
 /**
  * Stateless & lightweight service facade of cube management functions.
  *
@@ -586,9 +592,7 @@ public class CubeService extends BasicService {
                         if (newSeg != null) {
                             newSeg = getCubeManager().mergeSegments(cube, newSeg.getDateRangeStart(), newSeg.getDateRangeEnd(), true);
                             logger.debug("Will submit merge job on " + newSeg);
-                            CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
-                            builder.setSubmitter("SYSTEM");
-                            CubingJob job = builder.mergeJob(newSeg);
+                            DefaultChainedExecutable job = BuildEngineFactory.createBatchMergeJob(newSeg, "SYSTEM");
                             getExecutableManager().addJob(job);
                         } else {
                             logger.debug("Not ready for merge on cube " + cubeName);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/87c32b0f/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 08c17ef..c799cb7 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -18,27 +18,28 @@
 
 package org.apache.kylin.rest.service;
 
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.kylin.common.util.Pair;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.kylin.cube.CubeBuilder;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.engine.BuildEngineFactory;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.common.HadoopShellExecutable;
 import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.cube.CubingJob;
-import org.apache.kylin.job.cube.CubingJobBuilder;
-import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.Output;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
@@ -49,8 +50,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Component;
 
-import java.io.IOException;
-import java.util.*;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * @author ysong1
@@ -126,31 +130,19 @@ public class JobService extends BasicService {
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN + " or hasPermission(#cube, 'ADMINISTRATION') or hasPermission(#cube, 'OPERATION') or hasPermission(#cube, 'MANAGEMENT')")
     public JobInstance submitJob(CubeInstance cube, long startDate, long endDate, CubeBuildTypeEnum buildType, boolean forceMergeEmptySeg, String submitter) throws IOException, JobException {
 
-        final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.allOf(ExecutableState.class));
-        for (CubingJob job : cubingJobs) {
-            if (job.getStatus() == ExecutableState.READY || job.getStatus() == ExecutableState.RUNNING || job.getStatus() == ExecutableState.ERROR) {
-                throw new JobException("The cube " + cube.getName() + " has running job(" + job.getId() + ") please discard it and try again.");
-            }
-        }
+        checkNoRunningJob(cube);
 
-        CubingJob job;
-        CubingJobBuilder builder = new CubingJobBuilder(new JobEngineConfig(getConfig()));
-        builder.setSubmitter(submitter);
+        DefaultChainedExecutable job;
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            if (cube.getDescriptor().hasHolisticCountDistinctMeasures() && cube.getSegments().size() > 0) {
-                Pair<CubeSegment, CubeSegment> segs = getCubeManager().appendAndMergeSegments(cube, endDate);
-                job = builder.buildAndMergeJob(segs.getFirst(), segs.getSecond());
-            } else {
-                CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate);
-                job = builder.buildJob(newSeg);
-            }
+            CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate);
+            job = BuildEngineFactory.createBatchBuildJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, forceMergeEmptySeg);
-            job = builder.mergeJob(newSeg);
+            job = BuildEngineFactory.createBatchMergeJob(newSeg, submitter);
         } else if (buildType == CubeBuildTypeEnum.REFRESH) {
             CubeSegment refreshSeg = getCubeManager().refreshSegment(cube, startDate, endDate);
-            job = builder.buildJob(refreshSeg);
+            job = BuildEngineFactory.createBatchBuildJob(refreshSeg, submitter);
         } else {
             throw new JobException("invalid build type:" + buildType);
         }
@@ -163,6 +155,15 @@ public class JobService extends BasicService {
         return jobInstance;
     }
 
+    private void checkNoRunningJob(CubeInstance cube) throws JobException {
+        final List<CubingJob> cubingJobs = listAllCubingJobs(cube.getName(), null, EnumSet.allOf(ExecutableState.class));
+        for (CubingJob job : cubingJobs) {
+            if (job.getStatus() == ExecutableState.READY || job.getStatus() == ExecutableState.RUNNING || job.getStatus() == ExecutableState.ERROR) {
+                throw new JobException("The cube " + cube.getName() + " has running job(" + job.getId() + ") please discard it and try again.");
+            }
+        }
+    }
+
     public JobInstance getJobInstance(String uuid) throws IOException, JobException {
         return getSingleJobInstance(getExecutableManager().getJob(uuid));
     }


Mime
View raw message