kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [13/23] incubator-kylin git commit: KYLIN-875 half way
Date Thu, 23 Jul 2015 11:21:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
deleted file mode 100644
index ff6ffe5..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.inmemcubing.DoggedCubeBuilder;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArrayWritable, ByteArrayWritable> {
-
-    private static final Log logger = LogFactory.getLog(InMemCuboidMapper.class);
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment cubeSegment;
-    private IMRTableInputFormat flatTableInputFormat;
-
-    private int counter;
-    private BlockingQueue<List<String>> queue = new ArrayBlockingQueue<List<String>>(10000);
-    private Future<?> future;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        Configuration conf = context.getConfiguration();
-
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
-        cube = CubeManager.getInstance(config).getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
-
-        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            // dictionary
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                    Dictionary<?> dict = cubeSegment.getDictionary(col);
-                    if (dict == null) {
-                        logger.warn("Dictionary for " + col + " was not found.");
-                    }
-
-                    dictionaryMap.put(col, cubeSegment.getDictionary(col));
-                }
-            }
-        }
-
-        DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
-        ExecutorService executorService = Executors.newSingleThreadExecutor();
-        future = executorService.submit(cubeBuilder.buildAsRunnable(queue, new MapContextGTRecordWriter(context, cubeDesc, cubeSegment)));
-
-    }
-
-    @Override
-    public void map(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-        // put each row to the queue
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        List<String> rowAsList = Arrays.asList(row);
-        
-        while (!future.isDone()) {
-            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
-                counter++;
-                if (counter % BatchConstants.COUNTER_MAX == 0) {
-                    logger.info("Handled " + counter + " records!");
-                }
-                break;
-            }
-        }
-    }
-
-    @Override
-    protected void cleanup(Context context) throws IOException, InterruptedException {
-        logger.info("Totally handled " + counter + " records!");
-
-        while (!future.isDone()) {
-            if (queue.offer(new ArrayList<String>(0), 1, TimeUnit.SECONDS)) {
-                break;
-            }
-        }
-        
-        try {
-            future.get();
-        } catch (Exception e) {
-            throw new IOException("Failed to build cube in mapper " + context.getTaskAttemptID().getTaskID().getId(), e);
-        }
-        queue.clear();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
deleted file mode 100644
index 45da2c8..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinReducer;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class InMemCuboidReducer extends KylinReducer<ByteArrayWritable, ByteArrayWritable, Object, Object> {
-
-    private static final Logger logger = LoggerFactory.getLogger(InMemCuboidReducer.class);
-
-    private IMRStorageOutputFormat storageOutputFormat;
-    private MeasureCodec codec;
-    private MeasureAggregators aggs;
-
-    private int counter;
-    private Object[] input;
-    private Object[] result;
-
-    @Override
-    protected void setup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-        
-        String cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME);
-        boolean isMerge = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_IS_MERGE));
-
-        CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
-        CubeDesc cubeDesc = cube.getDescriptor();
-        CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        if (isMerge)
-            storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
-        else
-            storageOutputFormat = MRUtil.getBatchCubingOutputSide2(cubeSeg).getStorageOutputFormat();
-
-        List<MeasureDesc> measuresDescs = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                for (MeasureDesc measure : colDesc.getMeasures()) {
-                    measuresDescs.add(measure);
-                }
-            }
-        }
-
-        codec = new MeasureCodec(measuresDescs);
-        aggs = new MeasureAggregators(measuresDescs);
-
-        input = new Object[measuresDescs.size()];
-        result = new Object[measuresDescs.size()];
-    }
-
-    @Override
-    public void reduce(ByteArrayWritable key, Iterable<ByteArrayWritable> values, Context context) throws IOException, InterruptedException {
-
-        aggs.reset();
-
-        for (ByteArrayWritable value : values) {
-            codec.decode(value.asBuffer(), input);
-            aggs.aggregate(input);
-        }
-        aggs.collectStates(result);
-        
-        storageOutputFormat.doReducerOutput(key, result, context);
-        
-        counter++;
-        if (counter % BatchConstants.COUNTER_MAX == 0) {
-            logger.info("Handled " + counter + " records!");
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
deleted file mode 100644
index f2a5fcf..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ /dev/null
@@ -1,96 +0,0 @@
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapreduce.MapContext;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.job.inmemcubing.ICuboidWriter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.gridtable.GTRecord;
-
-/**
- */
-public class MapContextGTRecordWriter implements ICuboidWriter {
-
-    private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
-    protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
-    private Long lastCuboidId;
-    protected CubeSegment cubeSegment;
-    protected CubeDesc cubeDesc;
-
-    private int bytesLength;
-    private int dimensions;
-    private int measureCount;
-    private byte[] keyBuf;
-    private int[] measureColumnsIndex;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-    private long cuboidRowCount = 0;
-
-    public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
-        this.mapContext = mapContext;
-        this.cubeDesc = cubeDesc;
-        this.cubeSegment = cubeSegment;
-        this.measureCount = cubeDesc.getMeasures().size();
-
-    }
-
-    @Override
-    public void write(long cuboidId, GTRecord record) throws IOException {
-
-        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
-            // output another cuboid
-            initVariables(cuboidId);
-            if (lastCuboidId != null) {
-                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
-                cuboidRowCount = 0;
-            }
-        }
-
-        cuboidRowCount++;
-        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
-        for (int x = 0; x < dimensions; x++) {
-            System.arraycopy(record.get(x).array(), record.get(x).offset(), keyBuf, offSet, record.get(x).length());
-            offSet += record.get(x).length();
-        }
-
-        //output measures
-        valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
-
-        outputKey.set(keyBuf, 0, offSet);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        try {
-            mapContext.write(outputKey, outputValue);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void initVariables(Long cuboidId) {
-        bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
-        for (TblColRef column : cuboid.getColumns()) {
-            bytesLength += cubeSegment.getColumnLength(column);
-        }
-
-        keyBuf = new byte[bytesLength];
-        dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
-        measureColumnsIndex = new int[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
-
-        System.arraycopy(Bytes.toBytes(cuboidId), 0, keyBuf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
deleted file mode 100644
index 3d3b1f4..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageJob.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageOutputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.CuboidJob;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-
-/**
- * @author shaoshi
- */
-public class MergeCuboidFromStorageJob extends CuboidJob {
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        try {
-            options.addOption(OPTION_JOB_NAME);
-            options.addOption(OPTION_JOB_FLOW_ID);
-            options.addOption(OPTION_CUBE_NAME);
-            options.addOption(OPTION_SEGMENT_NAME);
-            parseOptions(options, args);
-
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
-            String segmentName = getOptionValue(OPTION_SEGMENT_NAME).toUpperCase();
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            CubeManager cubeMgr = CubeManager.getInstance(config);
-            CubeInstance cube = cubeMgr.getCube(cubeName);
-            CubeSegment cubeSeg = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-
-            Configuration conf = this.getConf();
-            HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
-
-            // start job
-            String jobName = getOptionValue(OPTION_JOB_NAME);
-            System.out.println("Starting: " + jobName);
-            job = Job.getInstance(conf, jobName);
-
-            setJobClasspath(job);
-            
-            // add metadata to distributed cache
-            attachKylinPropsAndMetadata(cube, job.getConfiguration());
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            job.getConfiguration().set(BatchConstants.CFG_IS_MERGE, "true");
-
-            // configure mapper input
-            IMRStorageInputFormat storageInputFormat = MRUtil.getBatchMergeInputSide2(cubeSeg).getStorageInputFormat();
-            storageInputFormat.configureInput(MergeCuboidFromStorageMapper.class, ByteArrayWritable.class, ByteArrayWritable.class, job);
-
-            // configure reducer output
-            IMRStorageOutputFormat storageOutputFormat = MRUtil.getBatchMergeOutputSide2(cubeSeg).getStorageOutputFormat();
-            storageOutputFormat.configureOutput(InMemCuboidReducer.class, getOptionValue(OPTION_JOB_FLOW_ID), job);
-            
-            return waitForCompletion(job);
-        } catch (Exception e) {
-            logger.error("error in MergeCuboidFromHBaseJob", e);
-            printUsage(options);
-            throw e;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
deleted file mode 100644
index 1162f14..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.SplittedBytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.common.RowKeySplitter;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.KylinMapper;
-import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
-import org.apache.kylin.engine.mr.MRUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * @author shaoshi
- */
-public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
-
-    private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
-    
-    private KylinConfig config;
-    private String cubeName;
-    private String segmentName;
-    private CubeManager cubeManager;
-    private CubeInstance cube;
-    private CubeDesc cubeDesc;
-    private CubeSegment mergedCubeSegment;
-    private CubeSegment sourceCubeSegment; // Must be unique during a mapper's life cycle
-    private IMRStorageInputFormat storageInputFormat;
-
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private byte[] newKeyBuf;
-    private RowKeySplitter rowKeySplitter;
-
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private MeasureCodec codec;
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = (String) DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
-
-    @Override
-    protected void setup(Context context) throws IOException, InterruptedException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-        config = AbstractHadoopJob.loadKylinPropsAndMetadata();
-
-        cubeName = context.getConfiguration().get(BatchConstants.CFG_CUBE_NAME).toUpperCase();
-        segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
-
-        cubeManager = CubeManager.getInstance(config);
-        cube = cubeManager.getCube(cubeName);
-        cubeDesc = cube.getDescriptor();
-        mergedCubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
-        storageInputFormat = MRUtil.getBatchMergeInputSide2(mergedCubeSegment).getStorageInputFormat();
-
-        newKeyBuf = new byte[256];// size will auto-grow
-
-        sourceCubeSegment = storageInputFormat.findSourceSegment(context, cube);
-        logger.info(sourceCubeSegment.toString());
-
-        this.rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
-
-        List<MeasureDesc> measuresDescs = Lists.newArrayList();
-        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                for (MeasureDesc measure : colDesc.getMeasures()) {
-                    measuresDescs.add(measure);
-                }
-            }
-        }
-        codec = new MeasureCodec(measuresDescs);
-    }
-
-    @Override
-    public void map(Object inKey, Object inValue, Context context) throws IOException, InterruptedException {
-        Pair<ByteArrayWritable, Object[]> pair = storageInputFormat.parseMapperInput(inKey, inValue);
-        ByteArrayWritable key = pair.getFirst();
-        Object[] value = pair.getSecond();
-        
-        Preconditions.checkState(key.offset() == 0);
-        
-        long cuboidID = rowKeySplitter.split(key.array(), key.length());
-        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidID);
-
-        SplittedBytes[] splittedByteses = rowKeySplitter.getSplitBuffers();
-        int bufOffset = 0;
-        BytesUtil.writeLong(cuboidID, newKeyBuf, bufOffset, RowConstants.ROWKEY_CUBOIDID_LEN);
-        bufOffset += RowConstants.ROWKEY_CUBOIDID_LEN;
-
-        for (int i = 0; i < cuboid.getColumns().size(); ++i) {
-            TblColRef col = cuboid.getColumns().get(i);
-
-            if (this.checkNeedMerging(col)) {
-                // if dictionary on fact table column, needs rewrite
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
-
-                while (sourceDict.getSizeOfValue() > newKeyBuf.length - bufOffset || mergedDict.getSizeOfValue() > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                int idInSourceDict = BytesUtil.readUnsigned(splittedByteses[i + 1].value, 0, splittedByteses[i + 1].length);
-
-                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBuf, bufOffset);
-                int idInMergedDict;
-                if (size < 0) {
-                    idInMergedDict = mergedDict.nullId();
-                } else {
-                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBuf, bufOffset, size);
-                }
-                BytesUtil.writeUnsigned(idInMergedDict, newKeyBuf, bufOffset, mergedDict.getSizeOfId());
-
-                bufOffset += mergedDict.getSizeOfId();
-            } else {
-                // keep as it is
-                while (splittedByteses[i + 1].length > newKeyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBuf;
-                    newKeyBuf = new byte[2 * newKeyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBuf, 0, oldBuf.length);
-                }
-
-                System.arraycopy(splittedByteses[i + 1].value, 0, newKeyBuf, bufOffset, splittedByteses[i + 1].length);
-                bufOffset += splittedByteses[i + 1].length;
-            }
-        }
-        byte[] newKey = Arrays.copyOf(newKeyBuf, bufOffset);
-        outputKey.set(newKey, 0, newKey.length);
-
-        valueBuf.clear();
-        codec.encode(value, valueBuf);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
-        
-        context.write(outputKey, outputValue);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
deleted file mode 100644
index d99cb03..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.DictionaryInfo;
-import org.apache.kylin.dict.DictionaryManager;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import java.io.IOException;
-import java.util.*;
-
-public class MergeDictionaryStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-
-    public MergeDictionaryStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig conf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(conf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
-        
-        Collections.sort(mergingSegments);
-        
-        try {
-            checkLookupSnapshotsMustIncremental(mergingSegments);
-            
-            makeDictForNewSegment(conf, cube, newSegment, mergingSegments);
-            makeSnapshotForNewSegment(cube, newSegment, mergingSegments);
-
-            CubeUpdate cubeBuilder = new CubeUpdate(cube);
-            cubeBuilder.setToUpdateSegs(newSegment);
-            mgr.updateCube(cubeBuilder);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge dictionary or lookup snapshots", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-    
-    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        List<CubeSegment> result = Lists.newArrayListWithCapacity(mergingSegmentIds.size());
-        for (String id : mergingSegmentIds) {
-            result.add(cube.getSegmentById(id));
-        }
-        return result;
-    }
-
-    private void checkLookupSnapshotsMustIncremental(List<CubeSegment> mergingSegments) {
-
-        // FIXME check each newer snapshot has only NEW rows but no MODIFIED rows
-    }
-
-    /**
-     * For the new segment, we need to create dictionaries for it, too. For
-     * those dictionaries on fact table, create it by merging underlying
-     * dictionaries For those dictionaries on lookup table, just copy it from
-     * any one of the merging segments, it's guaranteed to be consistent(checked
-     * in CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     * @throws IOException
-     */
-    private void makeDictForNewSegment(KylinConfig conf, CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) throws IOException {
-        HashSet<TblColRef> colsNeedMeringDict = new HashSet<TblColRef>();
-        HashSet<TblColRef> colsNeedCopyDict = new HashSet<TblColRef>();
-        DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        for (DimensionDesc dim : cubeDesc.getDimensions()) {
-            for (TblColRef col : dim.getColumnRefs()) {
-                if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
-                    String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
-                    if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
-                        colsNeedMeringDict.add(col);
-                    } else {
-                        colsNeedCopyDict.add(col);
-                    }
-                }
-            }
-        }
-
-        for (TblColRef col : colsNeedMeringDict) {
-            logger.info("Merging fact table dictionary on : " + col);
-            List<DictionaryInfo> dictInfos = new ArrayList<DictionaryInfo>();
-            for (CubeSegment segment : mergingSegments) {
-                logger.info("Including fact table dictionary of segment : " + segment);
-                if (segment.getDictResPath(col) != null) {
-                    DictionaryInfo dictInfo = dictMgr.getDictionaryInfo(segment.getDictResPath(col));
-                    dictInfos.add(dictInfo);
-                }
-            }
-            mergeDictionaries(dictMgr, newSeg, dictInfos, col);
-        }
-
-        for (TblColRef col : colsNeedCopyDict) {
-            String path = mergingSegments.get(0).getDictResPath(col);
-            newSeg.putDictResPath(col, path);
-        }
-    }
-
-    private DictionaryInfo mergeDictionaries(DictionaryManager dictMgr, CubeSegment cubeSeg, List<DictionaryInfo> dicts, TblColRef col) throws IOException {
-        DictionaryInfo dictInfo = dictMgr.mergeDictionary(dicts);
-        if (dictInfo != null)
-            cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
-
-        return dictInfo;
-    }
-
-    /**
-     * make snapshots for the new segment by copying from one of the underlying
-     * merging segments. it's guaranteed to be consistent(checked in
-     * CubeSegmentValidator)
-     *
-     * @param cube
-     * @param newSeg
-     */
-    private void makeSnapshotForNewSegment(CubeInstance cube, CubeSegment newSeg, List<CubeSegment> mergingSegments) {
-        CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 1);
-        for (Map.Entry<String, String> entry : lastSeg.getSnapshots().entrySet()) {
-            newSeg.putSnapshotResPath(entry.getKey(), entry.getValue());
-        }
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id: splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
deleted file mode 100644
index 8bd6ea2..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-public class MergeStatisticsStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IS = "mergingSegmentIds";
-    private static final String MERGED_STATISTICS_PATH = "mergedStatisticsPath";
-    protected Map<Long, HyperLogLogPlusCounter> cuboidHLLMap = Maps.newHashMap();
-
-    public MergeStatisticsStep() {
-        super();
-    }
-
-    @Override
-    @SuppressWarnings("deprecation")
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig kylinConf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(kylinConf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-
-        Configuration conf = new Configuration();
-        ResourceStore rs = ResourceStore.getStore(kylinConf);
-        try {
-
-            int averageSamplingPercentage = 0;
-            for (String segmentId : this.getMergingSegmentIds()) {
-                String fileKey = CubeSegment.getStatisticsResourcePath(getCubeName(), segmentId);
-                InputStream is = rs.getResource(fileKey);
-                File tempFile = null;
-                FileOutputStream tempFileStream = null;
-                try {
-                    tempFile = File.createTempFile(segmentId, ".seq");
-                    tempFileStream = new FileOutputStream(tempFile);
-                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
-                } finally {
-                    IOUtils.closeStream(is);
-                    IOUtils.closeStream(tempFileStream);
-                }
-
-                FileSystem fs = HadoopUtil.getFileSystem("file:///" + tempFile.getAbsolutePath());
-                SequenceFile.Reader reader = null;
-                try {
-                    reader = new SequenceFile.Reader(fs, new Path(tempFile.getAbsolutePath()), conf);
-                    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-                    BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-                    while (reader.next(key, value)) {
-                        if (key.get() == 0l) {
-                            // sampling percentage;
-                            averageSamplingPercentage += Bytes.toInt(value.getBytes());
-                        } else {
-                            HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter(14);
-                            ByteArray byteArray = new ByteArray(value.getBytes());
-                            hll.readRegisters(byteArray.asBuffer());
-
-                            if (cuboidHLLMap.get(key.get()) != null) {
-                                cuboidHLLMap.get(key.get()).merge(hll);
-                            } else {
-                                cuboidHLLMap.put(key.get(), hll);
-                            }
-                        }
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                    throw e;
-                } finally {
-                    IOUtils.closeStream(reader);
-                }
-            }
-            averageSamplingPercentage = averageSamplingPercentage / this.getMergingSegmentIds().size();
-            FactDistinctColumnsReducer.writeCuboidStatistics(conf, new Path(getMergedStatisticsPath()), cuboidHLLMap, averageSamplingPercentage);
-            Path statisticsFilePath = new Path(getMergedStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-            FileSystem fs = statisticsFilePath.getFileSystem(conf);
-            FSDataInputStream is = fs.open(statisticsFilePath);
-            try {
-                // put the statistics to metadata store
-                String statisticsFileName = newSegment.getStatisticsResourcePath();
-                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
-            } finally {
-                IOUtils.closeStream(is);
-            }
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to merge cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setMergedStatisticsPath(String path) {
-        setParam(MERGED_STATISTICS_PATH, path);
-    }
-
-    private String getMergedStatisticsPath() {
-        return getParam(MERGED_STATISTICS_PATH);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
deleted file mode 100644
index 14eef1a..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/SaveStatisticsStep.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- * Save the cube segment statistic to Kylin metadata store
- *
- */
-public class SaveStatisticsStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String STATISTICS_PATH = "statisticsPath";
-
-    public SaveStatisticsStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        KylinConfig kylinConf = context.getConfig();
-        final CubeManager mgr = CubeManager.getInstance(kylinConf);
-        final CubeInstance cube = mgr.getCube(getCubeName());
-        final CubeSegment newSegment = cube.getSegmentById(getSegmentId());
-
-        ResourceStore rs = ResourceStore.getStore(kylinConf);
-        try {
-            Path statisticsFilePath = new Path(getStatisticsPath(), BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION);
-            FileSystem fs = FileSystem.get(HadoopUtil.getCurrentConfiguration());
-            if (!fs.exists(statisticsFilePath))
-                throw new IOException("File " + statisticsFilePath + " does not exists;");
-
-            FSDataInputStream is = fs.open(statisticsFilePath);
-            try {
-                // put the statistics to metadata store
-                String statisticsFileName = newSegment.getStatisticsResourcePath();
-                rs.putResource(statisticsFileName, is, System.currentTimeMillis());
-                fs.delete(statisticsFilePath, false);
-            } finally {
-                IOUtils.closeStream(is);
-            }
-
-
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to save cuboid statistics", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setStatisticsPath(String path) {
-        this.setParam(STATISTICS_PATH, path);
-    }
-
-    private String getStatisticsPath() {
-        return getParam(STATISTICS_PATH);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
deleted file mode 100644
index dd99a64..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.kylin.cube.CubeUpdate;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable {
-
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String CUBE_NAME = "cubeName";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    public UpdateCubeInfoAfterBuildStep() {
-        super();
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeManager cubeManager = CubeManager.getInstance(context.getConfig());
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-        final CubeSegment segment = cube.getSegmentById(getSegmentId());
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long sourceCount = cubingJob.findSourceRecordCount();
-        long sourceSizeBytes = cubingJob.findSourceSizeBytes();
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-        boolean segmentReady = cubeSizeBytes > 0; // for build+merge scenario, convert HFile not happen yet, so cube size is 0
-
-        segment.setLastBuildJobID(getCubingJobId());
-        segment.setLastBuildTime(System.currentTimeMillis());
-        segment.setSizeKB(cubeSizeBytes / 1024);
-        segment.setInputRecords(sourceCount);
-        segment.setInputRecordsSize(sourceSizeBytes);
-
-        try {
-            if (segmentReady) {
-                cubeManager.promoteNewlyBuiltSegments(cube, segment);
-            } else {
-                CubeUpdate cubeBuilder = new CubeUpdate(cube);
-                cubeBuilder.setToUpdateSegs(segment);
-                cubeManager.updateCube(cubeBuilder);
-            }
-            return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
-        } catch (IOException e) {
-            logger.error("fail to update cube after build", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java b/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
deleted file mode 100644
index d237908..0000000
--- a/job/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterMergeStep.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class UpdateCubeInfoAfterMergeStep extends AbstractExecutable {
-
-    private static final String CUBE_NAME = "cubeName";
-    private static final String SEGMENT_ID = "segmentId";
-    private static final String MERGING_SEGMENT_IDS = "mergingSegmentIds";
-    private static final String CUBING_JOB_ID = "cubingJobId";
-
-    private final CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
-
-    public UpdateCubeInfoAfterMergeStep() {
-        super();
-    }
-
-    @Override
-    protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-        final CubeInstance cube = cubeManager.getCube(getCubeName());
-
-        CubeSegment mergedSegment = cube.getSegmentById(getSegmentId());
-        if (mergedSegment == null) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there is no segment with id:" + getSegmentId());
-        }
-        
-        CubingJob cubingJob = (CubingJob) executableManager.getJob(getCubingJobId());
-        long cubeSizeBytes = cubingJob.findCubeSizeBytes();
-
-        // collect source statistics
-        List<String> mergingSegmentIds = getMergingSegmentIds();
-        if (mergingSegmentIds.isEmpty()) {
-            return new ExecuteResult(ExecuteResult.State.FAILED, "there are no merging segments");
-        }
-        long sourceCount = 0L;
-        long sourceSize = 0L;
-        for (String id : mergingSegmentIds) {
-            CubeSegment segment = cube.getSegmentById(id);
-            sourceCount += segment.getInputRecords();
-            sourceSize += segment.getInputRecordsSize();
-        }
-
-        // update segment info
-        mergedSegment.setSizeKB(cubeSizeBytes / 1024);
-        mergedSegment.setInputRecords(sourceCount);
-        mergedSegment.setInputRecordsSize(sourceSize);
-        mergedSegment.setLastBuildJobID(getCubingJobId());
-        mergedSegment.setLastBuildTime(System.currentTimeMillis());
-
-        try {
-            cubeManager.promoteNewlyBuiltSegments(cube, mergedSegment);
-            return new ExecuteResult(ExecuteResult.State.SUCCEED);
-        } catch (IOException e) {
-            logger.error("fail to update cube after merge", e);
-            return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
-        }
-    }
-
-    public void setSegmentId(String segmentId) {
-        this.setParam(SEGMENT_ID, segmentId);
-    }
-
-    private String getSegmentId() {
-        return getParam(SEGMENT_ID);
-    }
-
-    public void setCubeName(String cubeName) {
-        this.setParam(CUBE_NAME, cubeName);
-    }
-
-    private String getCubeName() {
-        return getParam(CUBE_NAME);
-    }
-
-    public void setMergingSegmentIds(List<String> ids) {
-        setParam(MERGING_SEGMENT_IDS, StringUtils.join(ids, ","));
-    }
-
-    private List<String> getMergingSegmentIds() {
-        final String ids = getParam(MERGING_SEGMENT_IDS);
-        if (ids != null) {
-            final String[] splitted = StringUtils.split(ids, ",");
-            ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
-            for (String id : splitted) {
-                result.add(id);
-            }
-            return result;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    public void setCubingJobId(String id) {
-        setParam(CUBING_JOB_ID, id);
-    }
-
-    private String getCubingJobId() {
-        return getParam(CUBING_JOB_ID);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
index b00e10e..f8d6e1a 100644
--- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
+++ b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
@@ -45,23 +45,20 @@ import org.apache.kylin.cube.model.v1.CubeSegment;
 import org.apache.kylin.cube.model.v1.CubeSegmentStatusEnum;
 import org.apache.kylin.cube.model.v1.CubeStatusEnum;
 import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
 import org.apache.kylin.job.common.ShellExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobStepStatusEnum;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
 import org.apache.kylin.job.manager.ExecutableManager;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
@@ -72,6 +69,9 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.apache.kylin.metadata.project.RealizationEntry;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hbase.steps.BulkLoadJob;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+import org.apache.kylin.storage.hbase.steps.CubeHFileJob;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JobInstance.java b/job/src/main/java/org/apache/kylin/job/JobInstance.java
deleted file mode 100644
index e7f5772..0000000
--- a/job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonBackReference;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonManagedReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
-
-    public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
-    public static final String YARN_APP_ID = "yarn_application_id";
-    public static final String YARN_APP_URL = "yarn_application_tracking_url";
-    public static final String MR_JOB_ID = "mr_job_id";
-    public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
-    public static final String SOURCE_RECORDS_COUNT = "source_records_count";
-    public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-
-    public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
-    }
-
-    public static String getJobIdentity(JobInstance jobInstance) {
-        return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
-    }
-
-    public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
-        return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
-    }
-
-    public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
-        if (jobUuid == null || jobUuid.equals("")) {
-            throw new IllegalArgumentException("jobUuid can't be null or empty");
-        }
-        return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
-    }
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("type")
-    private CubeBuildTypeEnum type; // java implementation
-    @JsonProperty("duration")
-    private long duration;
-    @JsonProperty("related_cube")
-    private String relatedCube;
-    @JsonProperty("related_segment")
-    private String relatedSegment;
-    @JsonProperty("exec_start_time")
-    private long execStartTime;
-    @JsonProperty("exec_end_time")
-    private long execEndTime;
-    @JsonProperty("mr_waiting")
-    private long mrWaiting = 0;
-    @JsonManagedReference
-    @JsonProperty("steps")
-    private List<JobStep> steps;
-    @JsonProperty("submitter")
-    private String submitter;
-    @JsonProperty("job_status")
-    private JobStatusEnum status;
-
-    public JobStep getRunningStep() {
-        for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
-                return step;
-            }
-        }
-
-        return null;
-    }
-
-    @JsonProperty("progress")
-    public double getProgress() {
-        int completedStepCount = 0;
-        for (JobStep step : this.getSteps()) {
-            if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
-                completedStepCount++;
-            }
-        }
-
-        return 100.0 * completedStepCount / steps.size();
-    }
-
-    public JobStatusEnum getStatus() {
-        return this.status;
-    }
-
-    public void setStatus(JobStatusEnum status) {
-        this.status = status;
-    }
-
-//    @JsonProperty("job_status")
-//    public JobStatusEnum getStatus() {
-//
-//        // JobStatusEnum finalJobStatus;
-//        int compositResult = 0;
-//
-//        // if steps status are all NEW, then job status is NEW
-//        // if steps status are all FINISHED, then job status is FINISHED
-//        // if steps status are all PENDING, then job status is PENDING
-//        // if steps status are FINISHED and PENDING, the job status is PENDING
-//        // if one of steps status is RUNNING, then job status is RUNNING
-//        // if one of steps status is ERROR, then job status is ERROR
-//        // if one of steps status is KILLED, then job status is KILLED
-//        // default status is RUNNING
-//
-//        System.out.println(this.getName());
-//
-//        for (JobStep step : this.getSteps()) {
-//            //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
-//            compositResult = compositResult | step.getStatus().getCode();
-//        }
-//
-//        System.out.println();
-//
-//        if (compositResult == JobStatusEnum.FINISHED.getCode()) {
-//            return JobStatusEnum.FINISHED;
-//        } else if (compositResult == JobStatusEnum.NEW.getCode()) {
-//            return JobStatusEnum.NEW;
-//        } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
-//            return JobStatusEnum.PENDING;
-//        } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
-//            return JobStatusEnum.PENDING;
-//        } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
-//            return JobStatusEnum.ERROR;
-//        } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
-//            return JobStatusEnum.DISCARDED;
-//        } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
-//            return JobStatusEnum.RUNNING;
-//        }
-//
-//        return JobStatusEnum.RUNNING;
-//    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public CubeBuildTypeEnum getType() {
-        return type;
-    }
-
-    public void setType(CubeBuildTypeEnum type) {
-        this.type = type;
-    }
-
-    public long getDuration() {
-        return duration;
-    }
-
-    public void setDuration(long duration) {
-        this.duration = duration;
-    }
-
-    public String getRelatedCube() {
-        return relatedCube;
-    }
-
-    public void setRelatedCube(String relatedCube) {
-        this.relatedCube = relatedCube;
-    }
-
-    public String getRelatedSegment() {
-        return relatedSegment;
-    }
-
-    public void setRelatedSegment(String relatedSegment) {
-        this.relatedSegment = relatedSegment;
-    }
-
-    /**
-     * @return the execStartTime
-     */
-    public long getExecStartTime() {
-        return execStartTime;
-    }
-
-    /**
-     * @param execStartTime the execStartTime to set
-     */
-    public void setExecStartTime(long execStartTime) {
-        this.execStartTime = execStartTime;
-    }
-
-    /**
-     * @return the execEndTime
-     */
-    public long getExecEndTime() {
-        return execEndTime;
-    }
-
-    /**
-     * @param execEndTime the execEndTime to set
-     */
-    public void setExecEndTime(long execEndTime) {
-        this.execEndTime = execEndTime;
-    }
-
-    public long getMrWaiting() {
-        return this.mrWaiting;
-    }
-
-    public void setMrWaiting(long mrWaiting) {
-        this.mrWaiting = mrWaiting;
-    }
-
-    public List<JobStep> getSteps() {
-        if (steps == null) {
-            steps = Lists.newArrayList();
-        }
-        return steps;
-    }
-
-    public void clearSteps() {
-        getSteps().clear();
-    }
-
-    public void addSteps(Collection<JobStep> steps) {
-        this.getSteps().addAll(steps);
-    }
-
-    public void addStep(JobStep step) {
-        getSteps().add(step);
-    }
-
-    public void addStep(int index, JobStep step) {
-        getSteps().add(index, step);
-    }
-
-    public JobStep findStep(String stepName) {
-        for (JobStep step : getSteps()) {
-            if (stepName.equals(step.getName())) {
-                return step;
-            }
-        }
-        return null;
-    }
-
-        
-    public String getSubmitter() {
-        return submitter;
-    }
-
-    public void setSubmitter(String submitter) {
-        this.submitter = submitter;
-    }
-
-
-
-
-    @JsonIgnoreProperties(ignoreUnknown = true)
-    public static class JobStep implements Comparable<JobStep> {
-
-        @JsonBackReference
-        private JobInstance jobInstance;
-        
-        @JsonProperty("id")
-        private String id;
-
-        @JsonProperty("name")
-        private String name;
-
-        @JsonProperty("sequence_id")
-        private int sequenceID;
-
-        @JsonProperty("exec_cmd")
-        private String execCmd;
-
-        @JsonProperty("interrupt_cmd")
-        private String InterruptCmd;
-
-        @JsonProperty("exec_start_time")
-        private long execStartTime;
-        @JsonProperty("exec_end_time")
-        private long execEndTime;
-        @JsonProperty("exec_wait_time")
-        private long execWaitTime;
-
-        @JsonProperty("step_status")
-        private JobStepStatusEnum status;
-
-        @JsonProperty("cmd_type")
-        private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
-
-        @JsonProperty("info")
-        private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
-
-        @JsonProperty("run_async")
-        private boolean runAsync = false;
-
-        private ConcurrentHashMap<String, String> getInfo() {
-            return info;
-        }
-
-        public void putInfo(String key, String value) {
-            getInfo().put(key, value);
-        }
-
-        public String getInfo(String key) {
-            return getInfo().get(key);
-        }
-
-        public void clearInfo() {
-            getInfo().clear();
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public void setName(String name) {
-            this.name = name;
-        }
-
-        public int getSequenceID() {
-            return sequenceID;
-        }
-
-        public void setSequenceID(int sequenceID) {
-            this.sequenceID = sequenceID;
-        }
-
-        public String getExecCmd() {
-            return execCmd;
-        }
-
-        public void setExecCmd(String execCmd) {
-            this.execCmd = execCmd;
-        }
-
-        public JobStepStatusEnum getStatus() {
-            return status;
-        }
-
-        public void setStatus(JobStepStatusEnum status) {
-            this.status = status;
-        }
-        
-        
-
-        public String getId() {
-            return id;
-        }
-
-        public void setId(String id) {
-            this.id = id;
-        }
-
-        /**
-         * @return the execStartTime
-         */
-        public long getExecStartTime() {
-            return execStartTime;
-        }
-
-        /**
-         * @param execStartTime the execStartTime to set
-         */
-        public void setExecStartTime(long execStartTime) {
-            this.execStartTime = execStartTime;
-        }
-
-        /**
-         * @return the execEndTime
-         */
-        public long getExecEndTime() {
-            return execEndTime;
-        }
-
-        /**
-         * @param execEndTime the execEndTime to set
-         */
-        public void setExecEndTime(long execEndTime) {
-            this.execEndTime = execEndTime;
-        }
-
-        public long getExecWaitTime() {
-            return execWaitTime;
-        }
-
-        public void setExecWaitTime(long execWaitTime) {
-            this.execWaitTime = execWaitTime;
-        }
-
-        public String getInterruptCmd() {
-            return InterruptCmd;
-        }
-
-        public void setInterruptCmd(String interruptCmd) {
-            InterruptCmd = interruptCmd;
-        }
-
-        public JobStepCmdTypeEnum getCmdType() {
-            return cmdType;
-        }
-
-        public void setCmdType(JobStepCmdTypeEnum cmdType) {
-            this.cmdType = cmdType;
-        }
-
-        /**
-         * @return the runAsync
-         */
-        public boolean isRunAsync() {
-            return runAsync;
-        }
-
-        /**
-         * @param runAsync the runAsync to set
-         */
-        public void setRunAsync(boolean runAsync) {
-            this.runAsync = runAsync;
-        }
-
-        /**
-         * @return the jobInstance
-         */
-        public JobInstance getJobInstance() {
-            return jobInstance;
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = 1;
-            result = prime * result + ((name == null) ? 0 : name.hashCode());
-            result = prime * result + sequenceID;
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (obj == null)
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            JobStep other = (JobStep) obj;
-            if (name == null) {
-                if (other.name != null)
-                    return false;
-            } else if (!name.equals(other.name))
-                return false;
-            if (sequenceID != other.sequenceID)
-                return false;
-            return true;
-        }
-
-        @Override
-        public int compareTo(JobStep o) {
-            if (this.sequenceID < o.sequenceID) {
-                return -1;
-            } else if (this.sequenceID > o.sequenceID) {
-                return 1;
-            } else {
-                return 0;
-            }
-        }
-    }
-
-    @Override
-    public int compareTo(JobInstance o) {
-        return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
deleted file mode 100644
index eba6bd4..0000000
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-
-public class JoinedFlatTable {
-
-    public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
-
-    public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
-
-    public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
-        return storageDfsDir + "/" + intermediateTableDesc.getTableName();
-    }
-
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
-        StringBuilder ddl = new StringBuilder();
-
-        ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
-
-        ddl.append("(" + "\n");
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
-            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
-            if (i > 0) {
-                ddl.append(",");
-            }
-            ddl.append(colName(col.getCanonicalName()) + " " + SqlHiveDataTypeMapping.getHiveDataType(col.getDataType()) + "\n");
-        }
-        ddl.append(")" + "\n");
-
-        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
-        ddl.append("STORED AS SEQUENCEFILE" + "\n");
-        ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
-        // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
-        // ";\n");
-        return ddl.toString();
-    }
-
-    public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
-        StringBuilder ddl = new StringBuilder();
-        ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
-        return ddl.toString();
-    }
-
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
-        StringBuilder sql = new StringBuilder();
-
-        File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
-
-        if (hadoopPropertiesFile.exists()) {
-            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
-            DocumentBuilder builder;
-            Document doc;
-            try {
-                builder = factory.newDocumentBuilder();
-                doc = builder.parse(hadoopPropertiesFile);
-                NodeList nl = doc.getElementsByTagName("property");
-                for (int i = 0; i < nl.getLength(); i++) {
-                    String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
-                    String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
-                    if (name.equals("tmpjars") == false) {
-                        sql.append("SET " + name + "=" + value + ";").append("\n");
-                    }
-                }
-
-            } catch (ParserConfigurationException e) {
-                throw new IOException(e);
-            } catch (SAXException e) {
-                throw new IOException(e);
-            }
-        }
-
-        sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
-
-        return sql.toString();
-    }
-
-    public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
-        StringBuilder sql = new StringBuilder();
-        sql.append("SELECT" + "\n");
-        String tableAlias;
-        Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
-        for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
-            IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
-            if (i > 0) {
-                sql.append(",");
-            }
-            tableAlias = tableAliasMap.get(col.getTableName());
-            sql.append(tableAlias + "." + col.getColumnName() + "\n");
-        }
-        appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
-        appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
-        return sql.toString();
-    }
-
-    private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
-        Map<String, String> tableAliasMap = new HashMap<String, String>();
-
-        tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
-
-        int i = 1;
-        for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null) {
-                tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
-                i++;
-            }
-
-        }
-        return tableAliasMap;
-    }
-
-    private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        Set<String> dimTableCache = new HashSet<String>();
-
-        DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
-        String factTableName = dataModelDesc.getFactTable();
-        String factTableAlias = tableAliasMap.get(factTableName);
-        sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
-
-        for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
-            JoinDesc join = lookupDesc.getJoin();
-            if (join != null && join.getType().equals("") == false) {
-                String joinType = join.getType().toUpperCase();
-                String dimTableName = lookupDesc.getTable();
-                if (!dimTableCache.contains(dimTableName)) {
-                    TblColRef[] pk = join.getPrimaryKeyColumns();
-                    TblColRef[] fk = join.getForeignKeyColumns();
-                    if (pk.length != fk.length) {
-                        throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
-                    }
-                    sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
-                    sql.append("ON ");
-                    for (int i = 0; i < pk.length; i++) {
-                        if (i > 0) {
-                            sql.append(" AND ");
-                        }
-                        sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
-                    }
-                    sql.append("\n");
-
-                    dimTableCache.add(dimTableName);
-                }
-            }
-        }
-    }
-
-    private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
-        if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
-            return;//TODO: for now only cube segments support filter and partition
-        }
-        CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
-
-        boolean hasCondition = false;
-        StringBuilder whereBuilder = new StringBuilder();
-        whereBuilder.append("WHERE");
-
-        CubeDesc cubeDesc = desc.getCubeDesc();
-        DataModelDesc model = cubeDesc.getModel();
-        
-        if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
-            whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
-            hasCondition = true;
-        }
-
-        CubeSegment cubeSegment = desc.getCubeSegment();
-
-        if (null != cubeSegment) {
-            PartitionDesc partDesc = model.getPartitionDesc();
-            long dateStart = cubeSegment.getDateRangeStart();
-            long dateEnd = cubeSegment.getDateRangeEnd();
-
-            if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
-                whereBuilder.append(hasCondition ? " AND (" : " (");
-                whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
-                whereBuilder.append(")\n");
-                hasCondition = true;
-            }
-        }
-
-        if (hasCondition) {
-            sql.append(whereBuilder.toString());
-        }
-    }
-
-    private static String colName(String canonicalColName) {
-        return canonicalColName.replace(".", "_");
-    }
-}



Mime
View raw message