kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [09/28] incubator-kylin git commit: KYLIN-875 Split job module into 'core-job', 'engine-mr', 'source-hive', 'storage-hbase'. The old job remains as an assembly project.
Date Thu, 23 Jul 2015 23:20:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
deleted file mode 100644
index 2cde011..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ /dev/null
@@ -1,661 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.kylin.job.inmemcubing;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.measure.DoubleMutable;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.GTAggregateScanner;
-import org.apache.kylin.storage.gridtable.GTBuilder;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.GridTable;
-import org.apache.kylin.storage.gridtable.IGTScanner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
- * Assumes base cuboid fits in memory or otherwise OOM exception will occur.
- */
-public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
-
-    private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final LongMutable ONE = new LongMutable(1l);
-
-    private final CuboidScheduler cuboidScheduler;
-    private final long baseCuboidId;
-    private final int totalCuboidCount;
-    private final CubeJoinedFlatTableDesc intermediateTableDesc;
-    private final MeasureCodec measureCodec;
-    private final String[] metricsAggrFuncs;
-    private final int[] hbaseMeasureRefIndex;
-    private final MeasureDesc[] measureDescs;
-    private final int measureCount;
-
-    private MemoryBudgetController memBudget;
-    private Thread[] taskThreads;
-    private Throwable[] taskThreadExceptions;
-    private LinkedBlockingQueue<CuboidTask> taskPending;
-    private AtomicInteger taskCuboidCompleted = new AtomicInteger(0);
-
-    private CuboidResult baseResult;
-    private Object[] totalSumForSanityCheck;
-    private ICuboidCollector resultCollector;
-
-    public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
-        super(cubeDesc, dictionaryMap);
-        this.cuboidScheduler = new CuboidScheduler(cubeDesc);
-        this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        this.totalCuboidCount = cuboidScheduler.getCuboidCount();
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-
-        Map<String, Integer> measureIndexMap = Maps.newHashMap();
-        List<String> metricsAggrFuncsList = Lists.newArrayList();
-        measureCount = cubeDesc.getMeasures().size();
-
-        List<MeasureDesc> measureDescsList = Lists.newArrayList();
-        hbaseMeasureRefIndex = new int[measureCount];
-        int measureRef = 0;
-        for (HBaseColumnFamilyDesc familyDesc : cubeDesc.getHbaseMapping().getColumnFamily()) {
-            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    for (int j = 0; j < measureCount; j++) {
-                        if (cubeDesc.getMeasures().get(j).equals(measure)) {
-                            measureDescsList.add(measure);
-                            hbaseMeasureRefIndex[measureRef] = j;
-                            break;
-                        }
-                    }
-                    measureRef++;
-                }
-            }
-        }
-
-        for (int i = 0; i < measureCount; i++) {
-            MeasureDesc measureDesc = measureDescsList.get(i);
-            metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
-            measureIndexMap.put(measureDesc.getName(), i);
-        }
-        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-        this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
-    }
-
-    private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
-        GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
-
-        // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
-        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
-        // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
-        ConcurrentDiskStore store = new ConcurrentDiskStore(info);
-
-        GridTable gridTable = new GridTable(info, store);
-        return gridTable;
-    }
-
-    private Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
-        BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
-        BitSet dimension = new BitSet();
-        dimension.set(0, bitSet.cardinality());
-        BitSet metrics = new BitSet();
-        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureCount);
-        return new Pair<ImmutableBitSet, ImmutableBitSet>(new ImmutableBitSet(dimension), new ImmutableBitSet(metrics));
-    }
-
-    @Override
-    public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
-        ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
-        for (CuboidResult cuboidResult : result.values()) {
-            outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
-            cuboidResult.table.close();
-        }
-    }
-
-    ConcurrentNavigableMap<Long, CuboidResult> build(BlockingQueue<List<String>> input) throws IOException {
-        final ConcurrentNavigableMap<Long, CuboidResult> result = new ConcurrentSkipListMap<Long, CuboidResult>();
-        build(input, new ICuboidCollector() {
-            @Override
-            public void collect(CuboidResult cuboidResult) {
-                result.put(cuboidResult.cuboidId, cuboidResult);
-            }
-        });
-        return result;
-    }
-
-    interface ICuboidCollector {
-        void collect(CuboidResult result);
-    }
-
-    static class CuboidResult {
-        public long cuboidId;
-        public GridTable table;
-        public int nRows;
-        public long timeSpent;
-        public int aggrCacheMB;
-
-        public CuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
-            this.cuboidId = cuboidId;
-            this.table = table;
-            this.nRows = nRows;
-            this.timeSpent = timeSpent;
-            this.aggrCacheMB = aggrCacheMB;
-        }
-    }
-
-    private void build(BlockingQueue<List<String>> input, ICuboidCollector collector) throws IOException {
-        long startTime = System.currentTimeMillis();
-        logger.info("In Mem Cube Build start, " + cubeDesc.getName());
-
-        // multiple threads to compute cuboid in parallel
-        taskPending = new LinkedBlockingQueue<>();
-        taskCuboidCompleted.set(0);
-        taskThreads = prepareTaskThreads();
-        taskThreadExceptions = new Throwable[taskThreadCount];
-
-        // build base cuboid
-        resultCollector = collector;
-        totalSumForSanityCheck = null;
-        baseResult = createBaseCuboid(input);
-        if (baseResult.nRows == 0)
-            return;
-
-        // plan memory budget
-        makeMemoryBudget();
-
-        // kick off N-D cuboid tasks and output
-        addChildTasks(baseResult);
-        start(taskThreads);
-
-        // wait complete
-        join(taskThreads);
-
-        long endTime = System.currentTimeMillis();
-        logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
-
-        throwExceptionIfAny();
-    }
-
-    public void abort() {
-        interrupt(taskThreads);
-    }
-
-    private void start(Thread... threads) {
-        for (Thread t : threads)
-            t.start();
-    }
-
-    private void interrupt(Thread... threads) {
-        for (Thread t : threads)
-            t.interrupt();
-    }
-
-    private void join(Thread... threads) throws IOException {
-        try {
-            for (Thread t : threads)
-                t.join();
-        } catch (InterruptedException e) {
-            throw new IOException("interrupted while waiting task and output complete", e);
-        }
-    }
-
-    private void throwExceptionIfAny() throws IOException {
-        ArrayList<Throwable> errors = new ArrayList<Throwable>();
-        for (int i = 0; i < taskThreadCount; i++) {
-            Throwable t = taskThreadExceptions[i];
-            if (t != null)
-                errors.add(t);
-        }
-        if (errors.isEmpty()) {
-            return;
-        } else if (errors.size() == 1) {
-            Throwable t = errors.get(0);
-            if (t instanceof IOException)
-                throw (IOException) t;
-            else
-                throw new IOException(t);
-        } else {
-            for (Throwable t : errors)
-                logger.error("Exception during in-mem cube build", t);
-            throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
-        }
-    }
-
-    private Thread[] prepareTaskThreads() {
-        Thread[] result = new Thread[taskThreadCount];
-        for (int i = 0; i < taskThreadCount; i++) {
-            result[i] = new CuboidTaskThread(i);
-        }
-        return result;
-    }
-
-    public boolean isAllCuboidDone() {
-        return taskCuboidCompleted.get() == totalCuboidCount;
-    }
-
-    private class CuboidTaskThread extends Thread {
-        private int id;
-
-        CuboidTaskThread(int id) {
-            super("CuboidTask-" + id);
-            this.id = id;
-        }
-
-        @Override
-        public void run() {
-            try {
-                while (!isAllCuboidDone()) {
-                    CuboidTask task = null;
-                    while (task == null && taskHasNoException()) {
-                        task = taskPending.poll(15, TimeUnit.SECONDS);
-                    }
-                    // if task error occurs
-                    if (task == null)
-                        break;
-
-                    CuboidResult newCuboid = buildCuboid(task.parent, task.childCuboidId);
-                    addChildTasks(newCuboid);
-
-                    if (isAllCuboidDone()) {
-                        for (Thread t : taskThreads) {
-                            if (t != Thread.currentThread())
-                                t.interrupt();
-                        }
-                    }
-                }
-            } catch (Throwable ex) {
-                if (!isAllCuboidDone()) {
-                    logger.error("task thread exception", ex);
-                    taskThreadExceptions[id] = ex;
-                }
-            }
-        }
-    }
-
-    private boolean taskHasNoException() {
-        for (int i = 0; i < taskThreadExceptions.length; i++)
-            if (taskThreadExceptions[i] != null)
-                return false;
-        return true;
-    }
-
-    private void addChildTasks(CuboidResult parent) {
-        List<Long> children = cuboidScheduler.getSpanningCuboid(parent.cuboidId);
-        for (Long child : children) {
-            taskPending.add(new CuboidTask(parent, child));
-        }
-    }
-
-    private int getSystemAvailMB() {
-        Runtime.getRuntime().gc();
-        try {
-            Thread.sleep(500);
-        } catch (InterruptedException e) {
-            logger.error("", e);
-        }
-        return MemoryBudgetController.getSystemAvailMB();
-    }
-
-    private void makeMemoryBudget() {
-        int systemAvailMB = getSystemAvailMB();
-        logger.info("System avail " + systemAvailMB + " MB");
-        int reserve = Math.max(reserveMemoryMB, baseResult.aggrCacheMB / 3);
-        logger.info("Reserve " + reserve + " MB for system basics");
-
-        int budget = systemAvailMB - reserve;
-        if (budget < baseResult.aggrCacheMB) {
-            // make sure we have base aggr cache as minimal
-            budget = baseResult.aggrCacheMB;
-            logger.warn("!!! System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
-        }
-
-        logger.info("Memory Budget is " + budget + " MB");
-        memBudget = new MemoryBudgetController(budget);
-    }
-
-    private CuboidResult createBaseCuboid(BlockingQueue<List<String>> input) throws IOException {
-        GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
-        GTBuilder baseBuilder = baseCuboid.rebuild();
-        IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
-
-        int mbBefore = getSystemAvailMB();
-        int mbAfter = 0;
-
-        Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
-        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
-
-        long startTime = System.currentTimeMillis();
-        logger.info("Calculating cuboid " + baseCuboidId);
-
-        int count = 0;
-        for (GTRecord r : aggregationScanner) {
-            if (mbAfter == 0) {
-                mbAfter = getSystemAvailMB();
-            }
-            baseBuilder.write(r);
-            count++;
-        }
-        aggregationScanner.close();
-        baseBuilder.close();
-
-        long timeSpent = System.currentTimeMillis() - startTime;
-        logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
-
-        int mbBaseAggrCacheOnHeap = mbAfter == 0 ? 0 : mbBefore - mbAfter;
-        int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
-        int mbBaseAggrCache = Math.max((int) (mbBaseAggrCacheOnHeap * 1.1), mbEstimateBaseAggrCache);
-        mbBaseAggrCache = Math.max(mbBaseAggrCache, 10); // let it be 10 MB at least
-        logger.info("Base aggr cache is " + mbBaseAggrCache + " MB (heap " + mbBaseAggrCacheOnHeap + " MB, estimate " + mbEstimateBaseAggrCache + " MB)");
-
-        return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, mbBaseAggrCache);
-    }
-
-    private CuboidResult updateCuboidResult(long cuboidId, GridTable table, int nRows, long timeSpent, int aggrCacheMB) {
-        if (aggrCacheMB <= 0) {
-            aggrCacheMB = (int) Math.ceil(1.0 * nRows / baseResult.nRows * baseResult.aggrCacheMB);
-        }
-
-        CuboidResult result = new CuboidResult(cuboidId, table, nRows, timeSpent, aggrCacheMB);
-        taskCuboidCompleted.incrementAndGet();
-
-        resultCollector.collect(result);
-        return result;
-    }
-
-    private CuboidResult buildCuboid(CuboidResult parent, long cuboidId) throws IOException {
-        final String consumerName = "AggrCache@Cuboid " + cuboidId;
-        MemoryBudgetController.MemoryConsumer consumer = new MemoryBudgetController.MemoryConsumer() {
-            @Override
-            public int freeUp(int mb) {
-                return 0; // cannot free up on demand
-            }
-
-            @Override
-            public String toString() {
-                return consumerName;
-            }
-        };
-
-        // reserve memory for aggregation cache, can't be larger than the parent
-        memBudget.reserveInsist(consumer, parent.aggrCacheMB);
-        try {
-            return aggregateCuboid(parent, cuboidId);
-        } finally {
-            memBudget.reserve(consumer, 0);
-        }
-    }
-
-    private CuboidResult aggregateCuboid(CuboidResult parent, long cuboidId) throws IOException {
-        Pair<ImmutableBitSet, ImmutableBitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parent.cuboidId);
-        ImmutableBitSet parentDimensions = columnBitSets.getFirst();
-        ImmutableBitSet measureColumns = columnBitSets.getSecond();
-        ImmutableBitSet childDimensions = parentDimensions;
-
-        long mask = Long.highestOneBit(parent.cuboidId);
-        long childCuboidId = cuboidId;
-        long parentCuboidIdActualLength = Long.SIZE - Long.numberOfLeadingZeros(parent.cuboidId);
-        int index = 0;
-        for (int i = 0; i < parentCuboidIdActualLength; i++) {
-            if ((mask & parent.cuboidId) > 0) {
-                if ((mask & childCuboidId) == 0) {
-                    // this dim will be aggregated
-                    childDimensions = childDimensions.set(index, false);
-                }
-                index++;
-            }
-            mask = mask >> 1;
-        }
-
-        return scanAndAggregateGridTable(parent.table, cuboidId, childDimensions, measureColumns);
-    }
-
-    private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
-        long startTime = System.currentTimeMillis();
-        logger.info("Calculating cuboid " + cuboidId);
-
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
-        GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId);
-        GTBuilder builder = newGridTable.rebuild();
-
-        ImmutableBitSet allNeededColumns = aggregationColumns.or(measureColumns);
-
-        GTRecord newRecord = new GTRecord(newGridTable.getInfo());
-        int count = 0;
-        try {
-            for (GTRecord record : scanner) {
-                count++;
-                for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
-                    int c = allNeededColumns.trueBitAt(i);
-                    newRecord.set(i, record.get(c));
-                }
-                builder.write(newRecord);
-            }
-
-            // disable sanity check for performance
-            sanityCheck(scanner.getTotalSumForSanityCheck());
-        } finally {
-            scanner.close();
-            builder.close();
-        }
-
-        long timeSpent = System.currentTimeMillis() - startTime;
-        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
-
-        return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
-    }
-
-    //@SuppressWarnings("unused")
-    private void sanityCheck(Object[] totalSum) {
-        // double sum introduces error and causes result not exactly equal
-        for (int i = 0; i < totalSum.length; i++) {
-            if (totalSum[i] instanceof DoubleMutable) {
-                totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get());
-            }
-        }
-
-        if (totalSumForSanityCheck == null) {
-            totalSumForSanityCheck = totalSum;
-            return;
-        }
-        if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
-            throw new IllegalStateException();
-        }
-    }
-
-    // ===========================================================================
-
-    private static class CuboidTask implements Comparable<CuboidTask> {
-        final CuboidResult parent;
-        final long childCuboidId;
-
-        CuboidTask(CuboidResult parent, long childCuboidId) {
-            this.parent = parent;
-            this.childCuboidId = childCuboidId;
-        }
-
-        @Override
-        public int compareTo(CuboidTask o) {
-            long comp = this.childCuboidId - o.childCuboidId;
-            return comp < 0 ? -1 : (comp > 0 ? 1 : 0);
-        }
-    }
-
-    // ============================================================================
-
-    private class InputConverter implements IGTScanner {
-        GTInfo info;
-        GTRecord record;
-        BlockingQueue<List<String>> input;
-
-        public InputConverter(GTInfo info, BlockingQueue<List<String>> input) {
-            this.info = info;
-            this.input = input;
-            this.record = new GTRecord(info);
-        }
-
-        @Override
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-
-                List<String> currentObject = null;
-
-                @Override
-                public boolean hasNext() {
-                    try {
-                        currentObject = input.take();
-                    } catch (InterruptedException e) {
-                        throw new RuntimeException(e);
-                    }
-                    return currentObject != null && currentObject.size() > 0;
-                }
-
-                @Override
-                public GTRecord next() {
-                    if (currentObject.size() == 0)
-                        throw new IllegalStateException();
-
-                    buildGTRecord(currentObject, record);
-                    return record;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public GTInfo getInfo() {
-            return info;
-        }
-
-        @Override
-        public int getScannedRowCount() {
-            return 0;
-        }
-
-        @Override
-        public int getScannedRowBlockCount() {
-            return 0;
-        }
-
-        private void buildGTRecord(List<String> row, GTRecord record) {
-            Object[] dimensions = buildKey(row);
-            Object[] metricsValues = buildValue(row);
-            Object[] recordValues = new Object[dimensions.length + metricsValues.length];
-            System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
-            System.arraycopy(metricsValues, 0, recordValues, dimensions.length, metricsValues.length);
-            record.setValues(recordValues);
-        }
-
-        private Object[] buildKey(List<String> row) {
-            int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
-            Object[] key = new Object[keySize];
-
-            for (int i = 0; i < keySize; i++) {
-                key[i] = row.get(intermediateTableDesc.getRowKeyColumnIndexes()[i]);
-            }
-
-            return key;
-        }
-
-        private Object[] buildValue(List<String> row) {
-
-            Object[] values = new Object[measureCount];
-            MeasureDesc measureDesc = null;
-
-            for (int position = 0; position < hbaseMeasureRefIndex.length; position++) {
-                int i = hbaseMeasureRefIndex[position];
-                measureDesc = measureDescs[i];
-
-                Object value = null;
-                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
-                FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
-                if (function.isCount() || function.isHolisticCountDistinct()) {
-                    // note for holistic count distinct, this value will be ignored
-                    value = ONE;
-                } else if (flatTableIdx == null) {
-                    value = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
-                } else if (flatTableIdx.length == 1) {
-                    value = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
-                } else {
-
-                    byte[] result = null;
-                    for (int x = 0; x < flatTableIdx.length; x++) {
-                        byte[] split = toBytes(row.get(flatTableIdx[x]));
-                        if (result == null) {
-                            result = Arrays.copyOf(split, split.length);
-                        } else {
-                            byte[] newResult = new byte[result.length + split.length];
-                            System.arraycopy(result, 0, newResult, 0, result.length);
-                            System.arraycopy(split, 0, newResult, result.length, split.length);
-                            result = newResult;
-                        }
-                    }
-                    value = measureCodec.getSerializer(i).valueOf(result);
-                }
-                values[position] = value;
-            }
-            return values;
-        }
-
-        private byte[] toBytes(String v) {
-            return v == null ? null : Bytes.toBytes(v);
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
deleted file mode 100644
index badb14f..0000000
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/MemDiskStore.java
+++ /dev/null
@@ -1,679 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements. See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License. You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.kylin.job.inmemcubing;
-
-import static org.apache.kylin.common.util.MemoryBudgetController.*;
-
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.StandardOpenOption;
-import java.util.NoSuchElementException;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryConsumer;
-import org.apache.kylin.common.util.MemoryBudgetController.NotEnoughBudgetException;
-import org.apache.kylin.storage.gridtable.GTInfo;
-import org.apache.kylin.storage.gridtable.GTRecord;
-import org.apache.kylin.storage.gridtable.GTRowBlock;
-import org.apache.kylin.storage.gridtable.GTScanRequest;
-import org.apache.kylin.storage.gridtable.IGTStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class MemDiskStore implements IGTStore, Closeable {
-
-    private static final Logger logger = LoggerFactory.getLogger(MemDiskStore.class);
-    private static final boolean debug = true;
-
-    private static final int STREAM_BUFFER_SIZE = 8192;
-    private static final int MEM_CHUNK_SIZE_MB = 5;
-
-    private final GTInfo info;
-    private final Object lock; // all public methods that read/write object states are synchronized on this lock
-    private final MemPart memPart;
-    private final DiskPart diskPart;
-    private final boolean delOnClose;
-
-    private Writer ongoingWriter;
-
-    public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl) throws IOException {
-        this(info, budgetCtrl, File.createTempFile("MemDiskStore", ""), true);
-    }
-
-    public MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile) throws IOException {
-        this(info, budgetCtrl, diskFile, false);
-    }
-
-    private MemDiskStore(GTInfo info, MemoryBudgetController budgetCtrl, File diskFile, boolean delOnClose) throws IOException {
-        this.info = info;
-        this.lock = this;
-        this.memPart = new MemPart(budgetCtrl);
-        this.diskPart = new DiskPart(diskFile);
-        this.delOnClose = delOnClose;
-
-        // in case user forget to call close()
-        if (delOnClose)
-            diskFile.deleteOnExit();
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public IGTStoreWriter rebuild(int shard) throws IOException {
-        return newWriter(0);
-    }
-
-    @Override
-    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
-        return newWriter(length());
-    }
-
-    private Writer newWriter(long startOffset) throws IOException {
-        synchronized (lock) {
-            if (ongoingWriter != null)
-                throw new IllegalStateException();
-
-            ongoingWriter = new Writer(startOffset);
-            return ongoingWriter;
-        }
-    }
-
-    @Override
-    public IGTStoreScanner scan(GTRecord pkStart, GTRecord pkEnd, ImmutableBitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
-        synchronized (lock) {
-            return new Reader();
-        }
-    }
-
-    @Override
-    public void close() throws IOException {
-        // synchronized inside the parts close()
-        memPart.close();
-        diskPart.close();
-    }
-
-    public long length() {
-        synchronized (lock) {
-            return Math.max(memPart.tailOffset(), diskPart.tailOffset);
-        }
-    }
-
-    @Override
-    public String toString() {
-        return "MemDiskStore@" + (info.getTableName() == null ? this.hashCode() : info.getTableName());
-    }
-
-    private class Reader implements IGTStoreScanner {
-
-        final DataInputStream din;
-        long readOffset = 0;
-        long memRead = 0;
-        long diskRead = 0;
-        int nReadCalls = 0;
-
-        GTRowBlock block = GTRowBlock.allocate(info);
-        GTRowBlock next = null;
-
-        Reader() throws IOException {
-            diskPart.openRead();
-            if (debug)
-                logger.debug(MemDiskStore.this + " read start @ " + readOffset);
-
-            InputStream in = new InputStream() {
-                byte[] tmp = new byte[1];
-                MemChunk memChunk;
-
-                @Override
-                public int read() throws IOException {
-                    int n = read(tmp, 0, 1);
-                    if (n <= 0)
-                        return -1;
-                    else
-                        return (int) tmp[0];
-                }
-
-                @Override
-                public int read(byte[] b, int off, int len) throws IOException {
-                    synchronized (lock) {
-                        nReadCalls++;
-                        if (available() <= 0)
-                            return -1;
-
-                        if (memChunk == null && memPart.headOffset() <= readOffset && readOffset < memPart.tailOffset()) {
-                            memChunk = memPart.seekMemChunk(readOffset);
-                        }
-
-                        int lenToGo = Math.min(available(), len);
-
-                        int nRead = 0;
-                        while (lenToGo > 0) {
-                            int n;
-                            if (memChunk != null) {
-                                if (memChunk.headOffset() > readOffset) {
-                                    memChunk = null;
-                                    continue;
-                                }
-                                if (readOffset >= memChunk.tailOffset()) {
-                                    memChunk = memChunk.next;
-                                    continue;
-                                }
-                                int chunkOffset = (int) (readOffset - memChunk.headOffset());
-                                n = Math.min((int) (memChunk.tailOffset() - readOffset), lenToGo);
-                                System.arraycopy(memChunk.data, chunkOffset, b, off, n);
-                                memRead += n;
-                            } else {
-                                n = diskPart.read(readOffset, b, off, lenToGo);
-                                diskRead += n;
-                            }
-                            lenToGo -= n;
-                            nRead += n;
-                            off += n;
-                            readOffset += n;
-                        }
-                        return nRead;
-                    }
-                }
-
-                @Override
-                public int available() throws IOException {
-                    synchronized (lock) {
-                        return (int) (length() - readOffset);
-                    }
-                }
-            };
-
-            din = new DataInputStream(new BufferedInputStream(in, STREAM_BUFFER_SIZE));
-        }
-
-        @Override
-        public boolean hasNext() {
-            if (next != null)
-                return true;
-
-            try {
-                if (din.available() > 0) {
-                    block.importFrom(din);
-                    next = block;
-                }
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-
-            return next != null;
-        }
-
-        @Override
-        public GTRowBlock next() {
-            if (next == null) {
-                hasNext();
-                if (next == null)
-                    throw new NoSuchElementException();
-            }
-            GTRowBlock r = next;
-            next = null;
-            return r;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                din.close();
-                diskPart.closeRead();
-                if (debug)
-                    logger.debug(MemDiskStore.this + " read end @ " + readOffset + ", " + (memRead) + " from mem, " + (diskRead) + " from disk, " + nReadCalls + " read() calls");
-            }
-        }
-
-    }
-
-    private class Writer implements IGTStoreWriter {
-
-        final DataOutputStream dout;
-        long writeOffset;
-        long memWrite = 0;
-        long diskWrite = 0;
-        int nWriteCalls;
-        boolean closed = false;
-
-        Writer(long startOffset) throws IOException {
-            writeOffset = 0; // TODO does not support append yet
-            memPart.clear();
-            diskPart.clear();
-            diskPart.openWrite(false);
-            if (debug)
-                logger.debug(MemDiskStore.this + " write start @ " + writeOffset);
-
-            memPart.activateMemWrite();
-
-            OutputStream out = new OutputStream() {
-                byte[] tmp = new byte[1];
-                boolean memPartActivated = true;
-
-                @Override
-                public void write(int b) throws IOException {
-                    tmp[0] = (byte) b;
-                    write(tmp, 0, 1);
-                }
-
-                @Override
-                public void write(byte[] bytes, int offset, int length) throws IOException {
-                    // lock inside memPart.write() and diskPartm.write()
-                    nWriteCalls++;
-                    while (length > 0) {
-                        int n;
-                        if (memPartActivated) {
-                            n = memPart.write(bytes, offset, length, writeOffset);
-                            memWrite += n;
-                            if (n == 0) {
-                                memPartActivated = false;
-                            }
-                        } else {
-                            n = diskPart.write(writeOffset, bytes, offset, length);
-                            diskWrite += n;
-                        }
-                        offset += n;
-                        length -= n;
-                        writeOffset += n;
-                    }
-                }
-            };
-            dout = new DataOutputStream(new BufferedOutputStream(out, STREAM_BUFFER_SIZE));
-        }
-
-        @Override
-        public void write(GTRowBlock block) throws IOException {
-            block.export(dout);
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                if (!closed) {
-                    dout.close();
-                    memPart.deactivateMemWrite();
-                }
-
-                if (memPart.asyncFlusher == null) {
-                    assert writeOffset == diskPart.tailOffset;
-                    diskPart.closeWrite();
-                    ongoingWriter = null;
-                    if (debug)
-                        logger.debug(MemDiskStore.this + " write end @ " + writeOffset + ", " + (memWrite) + " to mem, " + (diskWrite) + " to disk, " + nWriteCalls + " write() calls");
-                } else {
-                    // the asyncFlusher will call this close() again later
-                }
-                closed = true;
-            }
-        }
-    }
-
-    private static class MemChunk {
-        long diskOffset;
-        int length;
-        byte[] data;
-        MemChunk next;
-
-        boolean isFull() {
-            return length == data.length;
-        }
-
-        long headOffset() {
-            return diskOffset;
-        }
-
-        long tailOffset() {
-            return diskOffset + length;
-        }
-
-        int freeSpace() {
-            return data.length - length;
-        }
-    }
-
-    private class MemPart implements Closeable, MemoryConsumer {
-
-        final MemoryBudgetController budgetCtrl;
-
-        // async flush thread checks this flag out of sync block
-        volatile boolean writeActivated;
-        MemChunk firstChunk;
-        MemChunk lastChunk;
-        int chunkCount;
-
-        Thread asyncFlusher;
-        MemChunk asyncFlushChunk;
-        long asyncFlushDiskOffset;
-        Throwable asyncFlushException;
-
-        MemPart(MemoryBudgetController budgetCtrl) {
-            this.budgetCtrl = budgetCtrl;
-        }
-
-        long headOffset() {
-            return firstChunk == null ? 0 : firstChunk.headOffset();
-        }
-
-        long tailOffset() {
-            return lastChunk == null ? 0 : lastChunk.tailOffset();
-        }
-
-        public MemChunk seekMemChunk(long diskOffset) {
-            MemChunk c = firstChunk;
-            while (c != null && c.headOffset() <= diskOffset) {
-                if (diskOffset < c.tailOffset())
-                    break;
-                c = c.next;
-            }
-            return c;
-        }
-
-        public int write(byte[] bytes, int offset, int length, long diskOffset) {
-            int needMoreMem = 0;
-
-            synchronized (lock) {
-                if (writeActivated == false)
-                    return 0;
-
-                // write is only expected at the tail
-                if (diskOffset != tailOffset())
-                    return 0;
-
-                if (chunkCount == 0 || lastChunk.isFull())
-                    needMoreMem = (chunkCount + 1) * MEM_CHUNK_SIZE_MB;
-            }
-
-            // call to budgetCtrl.reserve() must be out of synchronized block, or deadlock may happen between MemoryConsumers
-            if (needMoreMem > 0) {
-                try {
-                    budgetCtrl.reserve(this, needMoreMem);
-                } catch (NotEnoughBudgetException ex) {
-                    deactivateMemWrite();
-                    return 0;
-                }
-            }
-
-            synchronized (lock) {
-                if (needMoreMem > 0 && (chunkCount == 0 || lastChunk.isFull())) {
-                    MemChunk chunk = new MemChunk();
-                    chunk.diskOffset = diskOffset;
-                    chunk.data = new byte[ONE_MB * MEM_CHUNK_SIZE_MB - 48]; // -48 for MemChunk overhead
-                    if (chunkCount == 0) {
-                        firstChunk = lastChunk = chunk;
-                    } else {
-                        lastChunk.next = chunk;
-                        lastChunk = chunk;
-                    }
-                    chunkCount++;
-                }
-
-                int n = Math.min(lastChunk.freeSpace(), length);
-                System.arraycopy(bytes, offset, lastChunk.data, lastChunk.length, n);
-                lastChunk.length += n;
-
-                if (n > 0)
-                    asyncFlush(lastChunk, diskOffset, n);
-
-                return n;
-            }
-        }
-
-        private void asyncFlush(MemChunk lastChunk, long diskOffset, int n) {
-            if (asyncFlushChunk == null) {
-                asyncFlushChunk = lastChunk;
-                asyncFlushDiskOffset = diskOffset;
-            }
-
-            if (asyncFlusher == null) {
-                asyncFlusher = new Thread() {
-                    public void run() {
-                        asyncFlushException = null;
-                        if (debug)
-                            logger.debug(MemDiskStore.this + " async flush started @ " + asyncFlushDiskOffset);
-                        try {
-                            while (writeActivated) {
-                                flushToDisk();
-                                Thread.sleep(10);
-                            }
-                            flushToDisk();
-
-                            if (debug)
-                                logger.debug(MemDiskStore.this + " async flush ended @ " + asyncFlushDiskOffset);
-
-                            synchronized (lock) {
-                                asyncFlusher = null;
-                                asyncFlushChunk = null;
-                                if (ongoingWriter.closed) {
-                                    ongoingWriter.close(); // call writer.close() again to clean up
-                                }
-                            }
-                        } catch (Throwable ex) {
-                            asyncFlushException = ex;
-                        }
-                    }
-                };
-                asyncFlusher.start();
-            }
-        }
-
-        private void flushToDisk() throws IOException {
-            byte[] data;
-            int offset = 0;
-            int length = 0;
-            int flushedLen = 0;
-
-            while (true) {
-                data = null;
-                synchronized (lock) {
-                    asyncFlushDiskOffset += flushedLen; // bytes written in last loop
-                    //                    if (debug)
-                    //                        logger.debug(GTMemDiskStore.this + " async flush @ " + asyncFlushDiskOffset);
-                    if (asyncFlushChunk != null && asyncFlushChunk.tailOffset() == asyncFlushDiskOffset) {
-                        asyncFlushChunk = asyncFlushChunk.next;
-                    }
-                    if (asyncFlushChunk != null) {
-                        data = asyncFlushChunk.data;
-                        offset = (int) (asyncFlushDiskOffset - asyncFlushChunk.headOffset());
-                        length = asyncFlushChunk.length - offset;
-                    }
-                }
-
-                if (data == null)
-                    break;
-
-                flushedLen = diskPart.write(asyncFlushDiskOffset, data, offset, length);
-            }
-        }
-
-        @Override
-        public int freeUp(int mb) {
-            synchronized (lock) {
-                int mbReleased = 0;
-                while (chunkCount > 0 && mbReleased < mb) {
-                    if (firstChunk == asyncFlushChunk)
-                        break;
-
-                    mbReleased += MEM_CHUNK_SIZE_MB;
-                    chunkCount--;
-                    if (chunkCount == 0) {
-                        firstChunk = lastChunk = null;
-                    } else {
-                        MemChunk next = firstChunk.next;
-                        firstChunk.next = null;
-                        firstChunk = next;
-                    }
-                }
-                return mbReleased;
-            }
-        }
-
-        public void activateMemWrite() {
-            if (budgetCtrl.getTotalBudgetMB() > 0) {
-                writeActivated = true;
-                if (debug)
-                    logger.debug(MemDiskStore.this + " mem write activated");
-            }
-        }
-
-        public void deactivateMemWrite() {
-            writeActivated = false;
-            if (debug)
-                logger.debug(MemDiskStore.this + " mem write de-activated");
-        }
-
-        public void clear() {
-            chunkCount = 0;
-            firstChunk = lastChunk = null;
-            budgetCtrl.reserve(this, 0);
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                if (asyncFlushException != null)
-                    throwAsyncException(asyncFlushException);
-            }
-            try {
-                asyncFlusher.join();
-            } catch (NullPointerException npe) {
-                // that's fine, async flusher may not present
-            } catch (InterruptedException e) {
-                logger.warn("async join interrupted", e);
-            }
-            synchronized (lock) {
-                if (asyncFlushException != null)
-                    throwAsyncException(asyncFlushException);
-
-                clear();
-            }
-        }
-
-        private void throwAsyncException(Throwable ex) throws IOException {
-            if (ex instanceof IOException)
-                throw (IOException) ex;
-            else
-                throw new IOException(ex);
-        }
-
-        @Override
-        public String toString() {
-            return MemDiskStore.this.toString();
-        }
-
-    }
-
-    private class DiskPart implements Closeable {
-        final File diskFile;
-        FileChannel writeChannel;
-        FileChannel readChannel;
-        int readerCount = 0; // allow parallel readers
-        long tailOffset;
-
-        DiskPart(File diskFile) throws IOException {
-            this.diskFile = diskFile;
-            this.tailOffset = diskFile.length();
-            if (debug)
-                logger.debug(MemDiskStore.this + " disk file " + diskFile.getAbsolutePath());
-        }
-
-        public void openRead() throws IOException {
-            if (readChannel == null) {
-                readChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.READ);
-            }
-            readerCount++;
-        }
-
-        public int read(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            return readChannel.read(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-        }
-
-        public void closeRead() throws IOException {
-            closeRead(false);
-        }
-
-        private void closeRead(boolean force) throws IOException {
-            readerCount--;
-            if (readerCount == 0 || force) {
-                if (readChannel != null) {
-                    readChannel.close();
-                    readChannel = null;
-                }
-            }
-        }
-
-        public void openWrite(boolean append) throws IOException {
-            if (append) {
-                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.APPEND, StandardOpenOption.WRITE);
-                tailOffset = diskFile.length();
-            } else {
-                diskFile.delete();
-                writeChannel = FileChannel.open(diskFile.toPath(), StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
-                tailOffset = 0;
-            }
-        }
-
-        public int write(long diskOffset, byte[] bytes, int offset, int length) throws IOException {
-            synchronized (lock) {
-                int n = writeChannel.write(ByteBuffer.wrap(bytes, offset, length), diskOffset);
-                tailOffset = Math.max(diskOffset + n, tailOffset);
-                return n;
-            }
-        }
-
-        public void closeWrite() throws IOException {
-            if (writeChannel != null) {
-                writeChannel.close();
-                writeChannel = null;
-            }
-        }
-
-        public void clear() throws IOException {
-            diskFile.delete();
-            tailOffset = 0;
-        }
-
-        @Override
-        public void close() throws IOException {
-            synchronized (lock) {
-                closeWrite();
-                closeRead(true);
-                if (delOnClose) {
-                    diskFile.delete();
-                }
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
index cc68e1b..2ef3d94 100644
--- a/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/invertedindex/IIJobBuilder.java
@@ -23,10 +23,10 @@ import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.TimeZone;
 
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIJoinedFlatTableDesc;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java b/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
deleted file mode 100644
index 05f8c8e..0000000
--- a/job/src/main/java/org/apache/kylin/job/manager/ExecutableManager.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.manager;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.dao.ExecutableDao;
-import org.apache.kylin.job.dao.ExecutableOutputPO;
-import org.apache.kylin.job.dao.ExecutablePO;
-import org.apache.kylin.job.exception.IllegalStateTranferException;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.job.execution.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.lang.reflect.Constructor;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class ExecutableManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(ExecutableManager.class);
-    private static final ConcurrentHashMap<KylinConfig, ExecutableManager> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableManager>();
-    @SuppressWarnings("unused")
-    private final KylinConfig config;
-
-    private ExecutableDao executableDao;
-
-    public static ExecutableManager getInstance(KylinConfig config) {
-        ExecutableManager r = CACHE.get(config);
-        if (r == null) {
-            r = new ExecutableManager(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one singleton exist");
-            }
-
-        }
-        return r;
-    }
-
-    private ExecutableManager(KylinConfig config) {
-        logger.info("Using metadata url: " + config);
-        this.config = config;
-        this.executableDao = ExecutableDao.getInstance(config);
-    }
-
-    public void addJob(AbstractExecutable executable) {
-        try {
-            executableDao.addJob(parse(executable));
-            addJobOutput(executable);
-        } catch (PersistentException e) {
-            logger.error("fail to submit job:" + executable.getId(), e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void addJobOutput(AbstractExecutable executable) throws PersistentException {
-        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
-        executableOutputPO.setUuid(executable.getId());
-        executableDao.addJobOutput(executableOutputPO);
-        if (executable instanceof DefaultChainedExecutable) {
-            for (AbstractExecutable subTask: ((DefaultChainedExecutable) executable).getTasks()) {
-                addJobOutput(subTask);
-            }
-        }
-    }
-
-    //for ut
-    public void deleteJob(String jobId) {
-        try {
-            executableDao.deleteJob(jobId);
-        } catch (PersistentException e) {
-            logger.error("fail to delete job:" + jobId, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public AbstractExecutable getJob(String uuid) {
-        try {
-            return parseTo(executableDao.getJob(uuid));
-        } catch (PersistentException e) {
-            logger.error("fail to get job:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Output getOutput(String uuid) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(uuid);
-            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + uuid);
-            return parseOutput(jobOutput);
-        } catch (PersistentException e) {
-            logger.error("fail to get job output:" + uuid, e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    private DefaultOutput parseOutput(ExecutableOutputPO jobOutput) {
-        final DefaultOutput result = new DefaultOutput();
-        result.setExtra(jobOutput.getInfo());
-        result.setState(ExecutableState.valueOf(jobOutput.getStatus()));
-        result.setVerboseMsg(jobOutput.getContent());
-        result.setLastModified(jobOutput.getLastModified());
-        return result;
-    }
-
-    public Map<String, Output> getAllOutputs() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            HashMap<String, Output> result = Maps.newHashMap();
-            for (ExecutableOutputPO jobOutput : jobOutputs) {
-                result.put(jobOutput.getId(), parseOutput(jobOutput));
-            }
-            return result;
-        } catch (PersistentException e) {
-            logger.error("fail to get all job output:", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<AbstractExecutable> getAllExecutables() {
-        try {
-            return Lists.transform(executableDao.getJobs(), new Function<ExecutablePO, AbstractExecutable>() {
-                @Nullable
-                @Override
-                public AbstractExecutable apply(ExecutablePO input) {
-                        return parseTo(input);
-                }
-            });
-        } catch (PersistentException e) {
-            logger.error("error get All Jobs", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public List<String> getAllJobIds() {
-        try {
-            return executableDao.getJobIds();
-        } catch (PersistentException e) {
-            logger.error("error get All Job Ids", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void updateAllRunningJobsToError() {
-        try {
-            final List<ExecutableOutputPO> jobOutputs = executableDao.getJobOutputs();
-            for (ExecutableOutputPO executableOutputPO : jobOutputs) {
-                if (executableOutputPO.getStatus().equalsIgnoreCase(ExecutableState.RUNNING.toString())) {
-                    executableOutputPO.setStatus(ExecutableState.ERROR.toString());
-                    executableDao.updateJobOutput(executableOutputPO);
-                }
-            }
-        } catch (PersistentException e) {
-            logger.error("error reset job status from RUNNING to ERROR", e);
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void resumeJob(String jobId) {
-        AbstractExecutable job = getJob(jobId);
-        if (job == null) {
-            return;
-        }
-        updateJobOutput(jobId, ExecutableState.READY, null, null);
-        if (job instanceof DefaultChainedExecutable) {
-            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
-            for (AbstractExecutable task : tasks) {
-                if (task.getStatus() == ExecutableState.ERROR) {
-                    updateJobOutput(task.getId(), ExecutableState.READY, null, null);
-                    break;
-                }
-            }
-        }
-    }
-
-    public void discardJob(String jobId) {
-        AbstractExecutable job = getJob(jobId);
-        if (job instanceof DefaultChainedExecutable) {
-            List<AbstractExecutable> tasks = ((DefaultChainedExecutable) job).getTasks();
-            for (AbstractExecutable task : tasks) {
-                if (!task.getStatus().isFinalState()) {
-                    updateJobOutput(task.getId(), ExecutableState.DISCARDED, null, null);
-                }
-            }
-        }
-        updateJobOutput(jobId, ExecutableState.DISCARDED, null, null);
-    }
-
-    public void updateJobOutput(String jobId, ExecutableState newStatus, Map<String, String> info, String output) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            Preconditions.checkArgument(jobOutput != null, "there is no related output for job id:" + jobId);
-            ExecutableState oldStatus = ExecutableState.valueOf(jobOutput.getStatus());
-            if (newStatus != null && oldStatus != newStatus) {
-                if (!ExecutableState.isValidStateTransfer(oldStatus, newStatus)) {
-                    throw new IllegalStateTranferException("there is no valid state transfer from:" + oldStatus + " to:" + newStatus);
-                }
-                jobOutput.setStatus(newStatus.toString());
-            }
-            if (info != null) {
-                jobOutput.setInfo(info);
-            }
-            if (output != null) {
-                jobOutput.setContent(output);
-            }
-            executableDao.updateJobOutput(jobOutput);
-            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
-        } catch (PersistentException e) {
-            logger.error("error change job:" + jobId + " to " + newStatus.toString());
-            throw new RuntimeException(e);
-        }
-    }
-
-    //for migration only
-    //TODO delete when migration finished
-    public void resetJobOutput(String jobId, ExecutableState state, String output) {
-        try {
-            final ExecutableOutputPO jobOutput = executableDao.getJobOutput(jobId);
-            jobOutput.setStatus(state.toString());
-            if (output != null) {
-                jobOutput.setContent(output);
-            }
-            executableDao.updateJobOutput(jobOutput);
-        } catch (PersistentException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addJobInfo(String id, Map<String, String> info) {
-        if (info == null) {
-            return;
-        }
-        try {
-            ExecutableOutputPO output = executableDao.getJobOutput(id);
-            Preconditions.checkArgument(output != null, "there is no related output for job id:" + id);
-            output.getInfo().putAll(info);
-            executableDao.updateJobOutput(output);
-        } catch (PersistentException e) {
-            logger.error("error update job info, id:" + id + "  info:" + info.toString());
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void addJobInfo(String id, String key, String value) {
-        Map<String, String> info = Maps.newHashMap();
-        info.put(key, value);
-        addJobInfo(id, info);
-    }
-
-    private static ExecutablePO parse(AbstractExecutable executable) {
-        ExecutablePO result = new ExecutablePO();
-        result.setName(executable.getName());
-        result.setUuid(executable.getId());
-        result.setType(executable.getClass().getName());
-        result.setParams(executable.getParams());
-        if (executable instanceof ChainedExecutable) {
-            List<ExecutablePO> tasks = Lists.newArrayList();
-            for (AbstractExecutable task : ((ChainedExecutable) executable).getTasks()) {
-                tasks.add(parse(task));
-            }
-            result.setTasks(tasks);
-        }
-        return result;
-    }
-
-    private static AbstractExecutable parseTo(ExecutablePO executablePO) {
-        if (executablePO == null) {
-            return null;
-        }
-        String type = executablePO.getType();
-        try {
-            Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
-            Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
-            AbstractExecutable result = constructor.newInstance();
-            result.setId(executablePO.getUuid());
-            result.setName(executablePO.getName());
-            result.setParams(executablePO.getParams());
-            List<ExecutablePO> tasks = executablePO.getTasks();
-            if (tasks != null && !tasks.isEmpty()) {
-                Preconditions.checkArgument(result instanceof ChainedExecutable);
-                for (ExecutablePO subTask: tasks) {
-                    ((ChainedExecutable) result).addTask(parseTo(subTask));
-                }
-            }
-            return result;
-        } catch (ReflectiveOperationException e) {
-            throw new IllegalArgumentException("cannot parse this job:" + executablePO.getId(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index 5fc445c..36feb15 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -33,6 +33,9 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -42,18 +45,15 @@ import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.HadoopUtil;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsReducer;
-import org.apache.kylin.job.hadoop.hbase.CubeHTableUtil;
-import org.apache.kylin.job.inmemcubing.ICuboidWriter;
-import org.apache.kylin.job.inmemcubing.InMemCubeBuilder;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.ReadableTable.TableSignature;
-import org.apache.kylin.storage.cube.CuboidToGridTableMapping;
-import org.apache.kylin.storage.gridtable.GTRecord;
 import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.apache.kylin.storage.hbase.InMemKeyValueCreator;
+import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.apache.kylin.storage.hbase.steps.InMemKeyValueCreator;
 import org.apache.kylin.streaming.MicroStreamBatch;
 import org.apache.kylin.streaming.MicroStreamBatchConsumer;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
index e1ebe20..47ed52e 100644
--- a/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/tools/CleanHtableCLI.java
@@ -28,8 +28,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.util.ToolRunner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java
deleted file mode 100644
index 89713c2..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/DefaultSslProtocolSocketFactory.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.Socket;
-import java.net.UnknownHostException;
-
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.apache.commons.httpclient.ConnectTimeoutException;
-import org.apache.commons.httpclient.HttpClientError;
-import org.apache.commons.httpclient.params.HttpConnectionParams;
-import org.apache.commons.httpclient.protocol.ControllerThreadSocketFactory;
-import org.apache.commons.httpclient.protocol.SecureProtocolSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- * 
- */
-public class DefaultSslProtocolSocketFactory implements SecureProtocolSocketFactory {
-    /** Log object for this class. */
-    private static Logger LOG = LoggerFactory.getLogger(DefaultSslProtocolSocketFactory.class);
-    private SSLContext sslcontext = null;
-
-    /**
-     * Constructor for DefaultSslProtocolSocketFactory.
-     */
-    public DefaultSslProtocolSocketFactory() {
-        super();
-    }
-
-    /**
-     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int,java.net.InetAddress,int)
-     */
-    public Socket createSocket(String host, int port, InetAddress clientHost, int clientPort) throws IOException, UnknownHostException {
-        return getSSLContext().getSocketFactory().createSocket(host, port, clientHost, clientPort);
-    }
-
-    /**
-     * Attempts to get a new socket connection to the given host within the
-     * given time limit.
-     * 
-     * <p>
-     * To circumvent the limitations of older JREs that do not support connect
-     * timeout a controller thread is executed. The controller thread attempts
-     * to create a new socket within the given limit of time. If socket
-     * constructor does not return until the timeout expires, the controller
-     * terminates and throws an {@link ConnectTimeoutException}
-     * </p>
-     * 
-     * @param host
-     *            the host name/IP
-     * @param port
-     *            the port on the host
-     * @param localAddress
-     *            the local host name/IP to bind the socket to
-     * @param localPort
-     *            the port on the local machine
-     * @param params
-     *            {@link HttpConnectionParams Http connection parameters}
-     * 
-     * @return Socket a new socket
-     * 
-     * @throws IOException
-     *             if an I/O error occurs while creating the socket
-     * @throws UnknownHostException
-     *             if the IP address of the host cannot be determined
-     * @throws ConnectTimeoutException
-     *             DOCUMENT ME!
-     * @throws IllegalArgumentException
-     *             DOCUMENT ME!
-     */
-    public Socket createSocket(final String host, final int port, final InetAddress localAddress, final int localPort, final HttpConnectionParams params) throws IOException, UnknownHostException, ConnectTimeoutException {
-        if (params == null) {
-            throw new IllegalArgumentException("Parameters may not be null");
-        }
-
-        int timeout = params.getConnectionTimeout();
-
-        if (timeout == 0) {
-            return createSocket(host, port, localAddress, localPort);
-        } else {
-            // To be eventually deprecated when migrated to Java 1.4 or above
-            return ControllerThreadSocketFactory.createSocket(this, host, port, localAddress, localPort, timeout);
-        }
-    }
-
-    /**
-     * @see SecureProtocolSocketFactory#createSocket(java.lang.String,int)
-     */
-    public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
-        return getSSLContext().getSocketFactory().createSocket(host, port);
-    }
-
-    /**
-     * @see SecureProtocolSocketFactory#createSocket(java.net.Socket,java.lang.String,int,boolean)
-     */
-    public Socket createSocket(Socket socket, String host, int port, boolean autoClose) throws IOException, UnknownHostException {
-        return getSSLContext().getSocketFactory().createSocket(socket, host, port, autoClose);
-    }
-
-    public boolean equals(Object obj) {
-        return ((obj != null) && obj.getClass().equals(DefaultX509TrustManager.class));
-    }
-
-    public int hashCode() {
-        return DefaultX509TrustManager.class.hashCode();
-    }
-
-    private static SSLContext createEasySSLContext() {
-        try {
-            SSLContext context = SSLContext.getInstance("TLS");
-            context.init(null, new TrustManager[] { new DefaultX509TrustManager(null) }, null);
-
-            return context;
-        } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
-            throw new HttpClientError(e.toString());
-        }
-    }
-
-    private SSLContext getSSLContext() {
-        if (this.sslcontext == null) {
-            this.sslcontext = createEasySSLContext();
-        }
-
-        return this.sslcontext;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/2ef9831e/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java b/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java
deleted file mode 100644
index 8fc2dcd..0000000
--- a/job/src/main/java/org/apache/kylin/job/tools/DefaultX509TrustManager.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.tools;
-
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509TrustManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- * 
- */
-public class DefaultX509TrustManager implements X509TrustManager {
-
-    /** Log object for this class. */
-    private static Logger LOG = LoggerFactory.getLogger(DefaultX509TrustManager.class);
-    private X509TrustManager standardTrustManager = null;
-
-    /**
-     * Constructor for DefaultX509TrustManager.
-     * 
-     */
-    public DefaultX509TrustManager(KeyStore keystore) throws NoSuchAlgorithmException, KeyStoreException {
-        super();
-
-        TrustManagerFactory factory = TrustManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-        factory.init(keystore);
-
-        TrustManager[] trustmanagers = factory.getTrustManagers();
-
-        if (trustmanagers.length == 0) {
-            throw new NoSuchAlgorithmException("SunX509 trust manager not supported");
-        }
-
-        this.standardTrustManager = (X509TrustManager) trustmanagers[0];
-    }
-
-    public X509Certificate[] getAcceptedIssuers() {
-        return this.standardTrustManager.getAcceptedIssuers();
-    }
-
-    public boolean isClientTrusted(X509Certificate[] certificates) {
-        return true;
-        // return this.standardTrustManager.isClientTrusted(certificates);
-    }
-
-    public boolean isServerTrusted(X509Certificate[] certificates) {
-        if ((certificates != null) && LOG.isDebugEnabled()) {
-            LOG.debug("Server certificate chain:");
-
-            for (int i = 0; i < certificates.length; i++) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("X509Certificate[" + i + "]=" + certificates[i]);
-                }
-            }
-        }
-
-        if ((certificates != null) && (certificates.length == 1)) {
-            X509Certificate certificate = certificates[0];
-
-            try {
-                certificate.checkValidity();
-            } catch (CertificateException e) {
-                LOG.error(e.toString());
-
-                return false;
-            }
-
-            return true;
-        } else {
-            return true;
-            // return this.standardTrustManager.isServerTrusted(certificates);
-        }
-    }
-
-    @Override
-    public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
-        // TODO Auto-generated method stub
-
-    }
-
-}


Mime
View raw message