kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mahong...@apache.org
Subject [11/22] incubator-kylin git commit: KYLIN-960 organize existing storage module
Date Mon, 24 Aug 2015 09:00:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
new file mode 100644
index 0000000..95a9128
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregateRegionObserverTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverAggregators.HCol;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class AggregateRegionObserverTest {
+    ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+    byte[] mask = new byte[] { (byte) 0xff, (byte) 0xff, 0, 0 };
+    byte[] k1 = new byte[] { 0x01, 0x01, 0, 0x01 };
+    byte[] k2 = new byte[] { 0x01, 0x01, 0, 0x02 };
+    byte[] k3 = new byte[] { 0x02, 0x02, 0, 0x03 };
+    byte[] k4 = new byte[] { 0x02, 0x02, 0, 0x04 };
+
+    ArrayList<Cell> cellsInput = Lists.newArrayList();
+
+    byte[] family = Bytes.toBytes("f");
+    byte[] q1 = Bytes.toBytes("q1");
+    byte[] q2 = Bytes.toBytes("q2");
+
+    HCol c1 = new HCol(family, q1, new String[] { "SUM", "COUNT" }, new String[] { "decimal", "long" });
+    HCol c2 = new HCol(family, q2, new String[] { "SUM" }, new String[] { "decimal" });
+
+    @Before
+    public void setup() {
+        cellsInput.add(newCell(k1, c1, "10.5", 1));
+        cellsInput.add(newCell(k2, c1, "11.5", 2));
+        cellsInput.add(newCell(k3, c1, "12.5", 3));
+        cellsInput.add(newCell(k4, c1, "13.5", 4));
+
+        cellsInput.add(newCell(k1, c2, "21.5"));
+        cellsInput.add(newCell(k2, c2, "22.5"));
+        cellsInput.add(newCell(k3, c2, "23.5"));
+        cellsInput.add(newCell(k4, c2, "24.5"));
+
+    }
+
+    private Cell newCell(byte[] key, HCol col, String decimal) {
+        return newCell(key, col, decimal, Integer.MIN_VALUE);
+    }
+
+    private Cell newCell(byte[] key, HCol col, String decimal, int number) {
+        Object[] values = number == Integer.MIN_VALUE ? //
+        new Object[] { new BigDecimal(decimal) } //
+                : new Object[] { new BigDecimal(decimal), new LongMutable(number) };
+        buf.clear();
+        col.measureCodec.encode(values, buf);
+
+        Cell keyValue = new KeyValue(key, 0, key.length, //
+                col.family, 0, col.family.length, //
+                col.qualifier, 0, col.qualifier.length, //
+                HConstants.LATEST_TIMESTAMP, Type.Put, //
+                buf.array(), 0, buf.position());
+
+        return keyValue;
+    }
+
+    @Test
+    public void test() throws IOException {
+
+        CoprocessorRowType rowType = newRowType();
+        CoprocessorProjector projector = new CoprocessorProjector(mask, true);
+        ObserverAggregators aggregators = new ObserverAggregators(new HCol[] { c1, c2 });
+        CoprocessorFilter filter = CoprocessorFilter.deserialize(null); // a default,
+        // always-true,
+        // filter
+        HashSet<String> expectedResult = new HashSet<String>();
+
+        expectedResult.add("\\x02\\x02\\x00\\x00, f:q1, [26.0, 7]");
+        expectedResult.add("\\x02\\x02\\x00\\x00, f:q2, [48.0]");
+        expectedResult.add("\\x01\\x01\\x00\\x00, f:q1, [22.0, 3]");
+        expectedResult.add("\\x01\\x01\\x00\\x00, f:q2, [44.0]");
+
+        MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
+
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
+        ArrayList<Cell> result = Lists.newArrayList();
+        boolean hasMore = true;
+        while (hasMore) {
+            result.clear();
+            hasMore = aggrScanner.next(result);
+            if (result.isEmpty())
+                continue;
+
+            Cell cell = result.get(0);
+            HCol hcol = null;
+            if (ObserverAggregators.match(c1, cell)) {
+                hcol = c1;
+            } else if (ObserverAggregators.match(c2, cell)) {
+                hcol = c2;
+            } else
+                fail();
+
+            hcol.measureCodec.decode(ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()), hcol.measureValues);
+
+            String rowKey = toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), mask);
+            String col = Bytes.toString(hcol.family) + ":" + Bytes.toString(hcol.qualifier);
+            String values = Arrays.toString(hcol.measureValues);
+
+            System.out.println(rowKey);
+            System.out.println(col);
+            System.out.println(values);
+
+            assertTrue(expectedResult.contains(rowKey + ", " + col + ", " + values));
+        }
+        aggrScanner.close();
+    }
+
+    @Test
+    public void testNoMeasure() throws IOException {
+
+        CoprocessorRowType rowType = newRowType();
+        CoprocessorProjector projector = new CoprocessorProjector(mask, true);
+        ObserverAggregators aggregators = new ObserverAggregators(new HCol[] {});
+        CoprocessorFilter filter = CoprocessorFilter.deserialize(null); // a default,
+        // always-true,
+        // filter
+        HashSet<String> expectedResult = new HashSet<String>();
+
+        expectedResult.add("\\x02\\x02\\x00\\x00");
+        expectedResult.add("\\x01\\x01\\x00\\x00");
+
+        MockupRegionScanner innerScanner = new MockupRegionScanner(cellsInput);
+
+        RegionScanner aggrScanner = new AggregationScanner(rowType, filter, projector, aggregators, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
+        ArrayList<Cell> result = Lists.newArrayList();
+        boolean hasMore = true;
+        while (hasMore) {
+            result.clear();
+            hasMore = aggrScanner.next(result);
+            if (result.isEmpty())
+                continue;
+
+            Cell cell = result.get(0);
+
+            String rowKey = toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), mask);
+
+            assertTrue(expectedResult.contains(rowKey));
+        }
+        aggrScanner.close();
+    }
+
+    private String toString(byte[] array, int offset, short length, byte[] mask) {
+        StringBuilder result = new StringBuilder();
+        for (int i = 0; i < length; i++) {
+            int ch = array[offset + i] & 0xFF & mask[i];
+            result.append(String.format("\\x%02X", ch));
+        }
+        return result.toString();
+    }
+
+    private CoprocessorRowType newRowType() {
+        TableDesc t = new TableDesc();
+        t.setName("TABLE");
+        t.setDatabase("DEFAULT");
+        TblColRef[] cols = new TblColRef[] { newCol(1, "A", t), newCol(2, "B", t), newCol(3, "C", t), newCol(4, "D", t) };
+        int[] sizes = new int[] { 1, 1, 1, 1 };
+        return new CoprocessorRowType(cols, sizes);
+    }
+
+    private TblColRef newCol(int i, String name, TableDesc t) {
+        return new TblColRef(ColumnDesc.mockup(t, i, name, null));
+    }
+
+    public static class MockupRegionScanner implements RegionScanner {
+        List<Cell> input;
+        int i = 0;
+
+        public MockupRegionScanner(List<Cell> cellInputs) {
+            this.input = cellInputs;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
+         * .List)
+         */
+        @Override
+        public boolean next(List<Cell> results) throws IOException {
+            return nextRaw(results);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.InternalScanner#next(java.util
+         * .List, int)
+         */
+        @Override
+        public boolean next(List<Cell> result, int limit) throws IOException {
+            return next(result);
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see org.apache.hadoop.hbase.regionserver.InternalScanner#close()
+         */
+        @Override
+        public void close() throws IOException {
+
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#getRegionInfo()
+         */
+        @Override
+        public HRegionInfo getRegionInfo() {
+            return null;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#isFilterDone()
+         */
+        @Override
+        public boolean isFilterDone() throws IOException {
+            return false;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#reseek(byte[])
+         */
+        @Override
+        public boolean reseek(byte[] row) throws IOException {
+            return false;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#getMaxResultSize()
+         */
+        @Override
+        public long getMaxResultSize() {
+            return 0;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#getMvccReadPoint()
+         */
+        @Override
+        public long getMvccReadPoint() {
+            return 0;
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
+         * .List)
+         */
+        @Override
+        public boolean nextRaw(List<Cell> result) throws IOException {
+            if (i < input.size()) {
+                result.add(input.get(i));
+                i++;
+            }
+            return i < input.size();
+        }
+
+        /*
+         * (non-Javadoc)
+         * 
+         * @see
+         * org.apache.hadoop.hbase.regionserver.RegionScanner#nextRaw(java.util
+         * .List, int)
+         */
+        @Override
+        public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+            return nextRaw(result);
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java
new file mode 100644
index 0000000..7881688
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowAggregatorsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.Bytes;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class RowAggregatorsTest {
+
+    @Test
+    public void testSerialize() {
+        ObserverAggregators.HCol[] hcols = new ObserverAggregators.HCol[] { //
+        newHCol("f", "c1", new String[] { "SUM", "COUNT" }, new String[] { "decimal", "long" }), //
+                newHCol("f", "c2", new String[] { "SUM", "SUM" }, new String[] { "long", "long" }) };
+        ObserverAggregators sample = new ObserverAggregators(hcols);
+
+        byte[] bytes = ObserverAggregators.serialize(sample);
+        ObserverAggregators copy = ObserverAggregators.deserialize(bytes);
+
+        assertTrue(sample.nHCols == copy.nHCols);
+        assertTrue(sample.nTotalMeasures == copy.nTotalMeasures);
+        assertEquals(sample.hcols[0], copy.hcols[0]);
+        assertEquals(sample.hcols[1], copy.hcols[1]);
+    }
+
+    private static ObserverAggregators.HCol newHCol(String family, String qualifier, String[] funcNames, String[] dataTypes) {
+        return new ObserverAggregators.HCol(Bytes.toBytes(family), Bytes.toBytes(qualifier), funcNames, dataTypes);
+    }
+
+    private static void assertEquals(ObserverAggregators.HCol a, ObserverAggregators.HCol b) {
+        assertTrue(a.nMeasures == b.nMeasures);
+        assertTrue(Arrays.equals(a.family, b.family));
+        assertTrue(Arrays.equals(a.qualifier, b.qualifier));
+        assertTrue(Arrays.equals(a.funcNames, b.funcNames));
+        assertTrue(Arrays.equals(a.dataTypes, b.dataTypes));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowTypeTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowTypeTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowTypeTest.java
new file mode 100644
index 0000000..ba4b86a
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/RowTypeTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class RowTypeTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testSerialize() {
+
+        CubeInstance cube = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_without_slr_ready");
+        CubeDesc cubeDesc = cube.getDescriptor();
+        long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        Cuboid cuboid = Cuboid.findById(cubeDesc, baseCuboidId);
+
+        CoprocessorRowType rowType = CoprocessorRowType.fromCuboid(cube.getLatestReadySegment(), cuboid);
+        byte[] bytes = CoprocessorRowType.serialize(rowType);
+        CoprocessorRowType copy = CoprocessorRowType.deserialize(bytes);
+
+        assertTrue(Arrays.equals(rowType.columns, copy.columns));
+        assertTrue(Arrays.equals(rowType.columnSizes, copy.columnSizes));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
new file mode 100644
index 0000000..d410414
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/ITInvertedIndexHBaseTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.IISegment;
+import org.apache.kylin.invertedindex.index.RawTableRecord;
+import org.apache.kylin.invertedindex.index.Slice;
+import org.apache.kylin.invertedindex.index.TableRecord;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.model.IIDesc;
+import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.kylin.storage.hbase.steps.HBaseMetadataTestCase;
+import org.apache.kylin.storage.hbase.cube.v1.HBaseClientKVIterator;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * @author yangli9
+ */
+public class ITInvertedIndexHBaseTest extends HBaseMetadataTestCase {
+
+    IIInstance ii;
+    IISegment seg;
+    HConnection hconn;
+
+    TableRecordInfo info;
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        this.seg = ii.getFirstSegment();
+
+        String hbaseUrl = KylinConfig.getInstanceFromEnv().getStorageUrl();
+        Configuration hconf = HBaseConnection.newHBaseConfiguration(hbaseUrl);
+        hconn = HConnectionManager.createConnection(hconf);
+
+        this.info = new TableRecordInfo(seg);
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testLoad() throws Exception {
+
+        String tableName = seg.getStorageLocationIdentifier();
+        IIKeyValueCodec codec = new IIKeyValueCodec(info.getDigest());
+
+        List<Slice> slices = Lists.newArrayList();
+        HBaseClientKVIterator kvIterator = new HBaseClientKVIterator(hconn, tableName, IIDesc.HBASE_FAMILY_BYTES);
+        try {
+            for (Slice slice : codec.decodeKeyValue(kvIterator)) {
+                slices.add(slice);
+            }
+        } finally {
+            kvIterator.close();
+        }
+
+        List<TableRecord> records = iterateRecords(slices);
+        //dump(records);
+        System.out.println(records.size() + " records");
+    }
+
+    private List<TableRecord> iterateRecords(List<Slice> slices) {
+        List<TableRecord> records = Lists.newArrayList();
+        for (Slice slice : slices) {
+            for (RawTableRecord rec : slice) {
+                records.add(new TableRecord((RawTableRecord) rec.clone(), info));
+            }
+        }
+        return records;
+    }
+
+    @SuppressWarnings("unused")
+    private void dump(Iterable<TableRecord> records) {
+        for (TableRecord rec : records) {
+            byte[] x = rec.getBytes();
+            String y = BytesUtil.toReadableText(x);
+            System.out.println(y);
+            System.out.println();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluatorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluatorTest.java
new file mode 100644
index 0000000..da1cf71
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/BitMapFilterEvaluatorTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.ii.coprocessor.endpoint.BitMapFilterEvaluator.BitMapProvider;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import it.uniroma3.mat.extendedset.intset.ConciseSet;
+
+public class BitMapFilterEvaluatorTest {
+
+    static TblColRef colA;
+    static TblColRef colB;
+
+    static {
+        TableDesc table = TableDesc.mockup("DEFAULT.TABLE");
+
+        ColumnDesc col = ColumnDesc.mockup(table, 1, "colA", "string");
+        colA = new TblColRef(col);
+
+        col = ColumnDesc.mockup(table, 1, "colB", "string");
+        colB = new TblColRef(col);
+    }
+
+    static class MockBitMapProivder implements BitMapProvider {
+
+        private static final int MAX_ID = 8;
+        private static final int REC_COUNT = 10;
+
+        @Override
+        public ConciseSet getBitMap(TblColRef col, Integer startId, Integer endId) {
+            if (!col.equals(colA))
+                return null;
+
+            // i-th record has value ID i, and last record has value null
+            if (startId == null && endId == null) {
+                //entry for getting null value
+                ConciseSet s = new ConciseSet();
+                s.add(getRecordCount() - 1);
+                return s;
+            }
+
+            int start = 0;
+            int end = MAX_ID;
+            if (startId != null) {
+                start = startId;
+            }
+            if (endId != null) {
+                end = endId;
+            }
+
+            ConciseSet ret = new ConciseSet();
+            for (int i = start; i <= end; ++i) {
+                ConciseSet temp = getBitMap(col, i);
+                ret.addAll(temp);
+            }
+            return ret;
+        }
+
+        public ConciseSet getBitMap(TblColRef col, int valueId) {
+            if (!col.equals(colA))
+                return null;
+
+            // i-th record has value ID i, and last record has value null
+            ConciseSet bitMap = new ConciseSet();
+            if (valueId < 0 || valueId > getMaxValueId(col)) // null
+                bitMap.add(getRecordCount() - 1);
+            else
+                bitMap.add(valueId);
+
+            return bitMap;
+        }
+
+        @Override
+        public int getRecordCount() {
+            return REC_COUNT;
+        }
+
+        @Override
+        public int getMaxValueId(TblColRef col) {
+            return MAX_ID;
+        }
+    }
+
+    BitMapFilterEvaluator eval = new BitMapFilterEvaluator(new MockBitMapProivder());
+    ArrayList<CompareTupleFilter> basicFilters = Lists.newArrayList();
+    ArrayList<ConciseSet> basicResults = Lists.newArrayList();
+
+    public BitMapFilterEvaluatorTest() {
+        basicFilters.add(compare(colA, FilterOperatorEnum.ISNULL));
+        basicResults.add(set(9));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.ISNOTNULL));
+        basicResults.add(set(0, 1, 2, 3, 4, 5, 6, 7, 8));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.EQ, 0));
+        basicResults.add(set(0));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.NEQ, 0));
+        basicResults.add(set(1, 2, 3, 4, 5, 6, 7, 8));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.IN, 0, 5));
+        basicResults.add(set(0, 5));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.NOTIN, 0, 5));
+        basicResults.add(set(1, 2, 3, 4, 6, 7, 8));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.LT, 3));
+        basicResults.add(set(0, 1, 2));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.LTE, 3));
+        basicResults.add(set(0, 1, 2, 3));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.GT, 3));
+        basicResults.add(set(4, 5, 6, 7, 8));
+
+        basicFilters.add(compare(colA, FilterOperatorEnum.GTE, 3));
+        basicResults.add(set(3, 4, 5, 6, 7, 8));
+    }
+
+    @Test
+    public void testBasics() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            assertEquals(basicResults.get(i), eval.evaluate(basicFilters.get(i)));
+        }
+    }
+
+    @Test
+    public void testLogicalAnd() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            for (int j = 0; j < basicFilters.size(); j++) {
+                LogicalTupleFilter f = logical(FilterOperatorEnum.AND, basicFilters.get(i), basicFilters.get(j));
+                ConciseSet r = basicResults.get(i).clone();
+                r.retainAll(basicResults.get(j));
+                assertEquals(r, eval.evaluate(f));
+            }
+        }
+    }
+
+    @Test
+    public void testLogicalOr() {
+        for (int i = 0; i < basicFilters.size(); i++) {
+            for (int j = 0; j < basicFilters.size(); j++) {
+                LogicalTupleFilter f = logical(FilterOperatorEnum.OR, basicFilters.get(i), basicFilters.get(j));
+                ConciseSet r = basicResults.get(i).clone();
+                r.addAll(basicResults.get(j));
+                assertEquals(r, eval.evaluate(f));
+            }
+        }
+    }
+
+    @Test
+    public void testNotEvaluable() {
+        CompareTupleFilter notEvaluable = compare(colB, FilterOperatorEnum.EQ, 0);
+        assertEquals(null, eval.evaluate(notEvaluable));
+
+        LogicalTupleFilter or = logical(FilterOperatorEnum.OR, basicFilters.get(1), notEvaluable);
+        assertEquals(null, eval.evaluate(or));
+
+        LogicalTupleFilter and = logical(FilterOperatorEnum.AND, basicFilters.get(1), notEvaluable);
+        assertEquals(basicResults.get(1), eval.evaluate(and));
+    }
+
+    public static CompareTupleFilter compare(TblColRef col, TupleFilter.FilterOperatorEnum op, int... ids) {
+        CompareTupleFilter filter = new CompareTupleFilter(op);
+        filter.addChild(columnFilter(col));
+        for (int i : ids) {
+            filter.addChild(constFilter(i));
+        }
+        return filter;
+    }
+
+    public static LogicalTupleFilter logical(TupleFilter.FilterOperatorEnum op, TupleFilter... filters) {
+        LogicalTupleFilter filter = new LogicalTupleFilter(op);
+        for (TupleFilter f : filters)
+            filter.addChild(f);
+        return filter;
+    }
+
+    public static ColumnTupleFilter columnFilter(TblColRef col) {
+        return new ColumnTupleFilter(col);
+    }
+
+    public static ConstantTupleFilter constFilter(int id) {
+        return new ConstantTupleFilter(idToStr(id));
+    }
+
+    public static ConciseSet set(int... ints) {
+        ConciseSet set = new ConciseSet();
+        for (int i : ints)
+            set.add(i);
+        return set;
+    }
+
+    public static String idToStr(int id) {
+        byte[] bytes = new byte[] { (byte) id };
+        return Dictionary.dictIdToString(bytes, 0, bytes.length);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
new file mode 100644
index 0000000..e271129
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregationTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ *
+ * ii test
+ */
+public class EndpointAggregationTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setup() throws IOException {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void cleanUp() {
+        cleanupTestMetadata();
+    }
+
+    private List<FunctionDesc> buildAggregations() {
+        List<FunctionDesc> functions = new ArrayList<FunctionDesc>();
+
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("SUM");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue("PRICE");
+        f1.setParameter(p1);
+        f1.setReturnType("decimal");
+        functions.add(f1);
+
+        FunctionDesc f2 = new FunctionDesc();
+        f2.setExpression("MIN");
+        ParameterDesc p2 = new ParameterDesc();
+        p2.setType("column");
+        p2.setValue("PRICE");
+        f2.setParameter(p2);
+        f2.setReturnType("decimal");
+        functions.add(f2);
+
+        return functions;
+    }
+
+    @Test
+    public void testSerializeAggregator() {
+        final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+        final EndpointAggregators endpointAggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
+        byte[] x = EndpointAggregators.serialize(endpointAggregators);
+        final EndpointAggregators result = EndpointAggregators.deserialize(x);
+        assertArrayEquals(endpointAggregators.dataTypes, result.dataTypes);
+        assertArrayEquals(endpointAggregators.funcNames, result.funcNames);
+        assertArrayEquals(endpointAggregators.metricValues, result.metricValues);
+        assertEquals(endpointAggregators.rawTableRecord.getBytes().length, result.rawTableRecord.getBytes().length);
+    }
+
+    private byte[] randomBytes(final int length) {
+        byte[] result = new byte[length];
+        Random random = new Random();
+        for (int i = 0; i < length; i++) {
+            random.nextBytes(result);
+        }
+        return result;
+    }
+
+    private List<byte[]> mockData(TableRecordInfo tableRecordInfo) {
+        ArrayList<byte[]> result = Lists.newArrayList();
+        final int priceColumnIndex = 23;
+        final int groupByColumnIndex = 0;
+        TblColRef column = tableRecordInfo.getDescriptor().listAllColumns().get(priceColumnIndex);
+        FixedLenMeasureCodec codec = FixedLenMeasureCodec.get(column.getType());
+
+        byte[] data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+        byte[] groupOne = randomBytes(tableRecordInfo.getDigest().length(groupByColumnIndex));
+        codec.write(codec.valueOf("199.99"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+        System.arraycopy(groupOne, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupOne.length);
+        result.add(data);
+
+        data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+        codec.write(codec.valueOf("2.09"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+        System.arraycopy(groupOne, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupOne.length);
+        result.add(data);
+
+        byte[] groupTwo = randomBytes(tableRecordInfo.getDigest().length(groupByColumnIndex));
+        data = randomBytes(tableRecordInfo.getDigest().getByteFormLen());
+        System.arraycopy(groupTwo, 0, data, tableRecordInfo.getDigest().offset(groupByColumnIndex), groupTwo.length);
+        codec.write(codec.valueOf("100"), data, tableRecordInfo.getDigest().offset(priceColumnIndex));
+        result.add(data);
+
+        return result;
+    }
+
+    @Test
+    public void basicTest() {
+        final IIInstance ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        final TableRecordInfo tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+        final EndpointAggregators aggregators = EndpointAggregators.fromFunctions(tableRecordInfo, buildAggregations());
+        final EndpointAggregationCache aggCache = new EndpointAggregationCache(aggregators);
+        final Collection<TblColRef> dims = new HashSet<>();
+        final TblColRef groupByColumn = ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_FORMAT_NAME");
+        dims.add(groupByColumn);
+        CoprocessorProjector projector = CoprocessorProjector.makeForEndpoint(tableRecordInfo, dims);
+        List<byte[]> rawData = mockData(tableRecordInfo);
+        for (int i = 0; i < rawData.size(); ++i) {
+            byte[] data = rawData.get(i);
+            AggrKey aggKey = projector.getAggrKey(data);
+            MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
+            aggregators.aggregate(bufs, data);
+            aggCache.checkMemoryUsage();
+        }
+        long sumTotal = 0;
+        long minTotal = 0;
+        for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
+            sumTotal += ((LongMutable) entry.getValue()[0].getState()).get();
+            minTotal += ((LongMutable) entry.getValue()[1].getState()).get();
+
+        }
+        assertEquals(3020800, sumTotal);
+        assertEquals(1020900, minTotal);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
new file mode 100644
index 0000000..791002f
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TableRecordInfoTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class TableRecordInfoTest extends LocalFileMetadataTestCase {
+    IIInstance ii;
+    TableRecordInfo tableRecordInfo;
+
+    @Before
+    public void setup() throws IOException {
+        this.createTestMetadata();
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+    }
+
+    @Test
+    public void testSerialize() {
+        byte[] x = TableRecordInfoDigest.serialize(this.tableRecordInfo.getDigest());
+        TableRecordInfoDigest d = TableRecordInfoDigest.deserialize(x);
+        assertEquals(d.getColumnCount(), 25);
+    }
+
+    @After
+    public void cleanUp() {
+        cleanupTestMetadata();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
new file mode 100644
index 0000000..412e335
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/TsConditionExtractorTest.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.ii.coprocessor.endpoint;
+
+import java.io.IOException;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.invertedindex.IIInstance;
+import org.apache.kylin.invertedindex.IIManager;
+import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.ConstantTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.cache.TsConditionExtractor;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+
+/**
+ *
+ * ii test
+ */
+public class TsConditionExtractorTest extends LocalFileMetadataTestCase {
+    IIInstance ii;
+    TableRecordInfo tableRecordInfo;
+    TableDesc factTableDesc;
+
+    TblColRef calDt;
+    TblColRef siteId;
+
+    @Before
+    public void setup() throws IOException {
+        this.createTestMetadata();
+        this.ii = IIManager.getInstance(getTestConfig()).getII("test_kylin_ii_left_join");
+        this.tableRecordInfo = new TableRecordInfo(ii.getFirstSegment());
+        this.factTableDesc = MetadataManager.getInstance(getTestConfig()).getTableDesc("DEFAULT.TEST_KYLIN_FACT");
+        this.calDt = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "CAL_DT");
+        this.siteId = this.ii.getDescriptor().findColumnRef("DEFAULT.TEST_KYLIN_FACT", "LSTG_SITE_ID");
+    }
+
+    @After
+    public void cleanUp() {
+        cleanupTestMetadata();
+    }
+
+    @Test
+    public void testSimpleFilter() {
+        CompareTupleFilter aFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        aFilter.addChild(new ColumnTupleFilter(calDt));
+        aFilter.addChild(new ConstantTupleFilter("2000-01-01"));
+
+        Range<Long> range = TsConditionExtractor.extractTsCondition(ii.getAllColumns().get(tableRecordInfo.getTimestampColumn()), aFilter);
+        Assert.assertEquals(946684800000L, range.lowerEndpoint().longValue());
+        Assert.assertEquals(BoundType.OPEN, range.lowerBoundType());
+        Assert.assertTrue(!range.hasUpperBound());
+    }
+
+    @Test
+    public void testComplexFilter() {
+        CompareTupleFilter aFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        aFilter.addChild(new ColumnTupleFilter(calDt));
+        aFilter.addChild(new ConstantTupleFilter("2000-01-01"));
+
+        CompareTupleFilter bFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        bFilter.addChild(new ColumnTupleFilter(calDt));
+        bFilter.addChild(new ConstantTupleFilter("2000-01-03"));
+
+        CompareTupleFilter cFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        cFilter.addChild(new ColumnTupleFilter(calDt));
+        cFilter.addChild(new ConstantTupleFilter("2000-01-02"));
+
+        CompareTupleFilter dFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        dFilter.addChild(new ColumnTupleFilter(siteId));
+        dFilter.addChild(new ConstantTupleFilter("0"));
+
+        LogicalTupleFilter rootFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        rootFilter.addChildren(Lists.newArrayList(aFilter, bFilter, cFilter, dFilter));
+
+        Range<Long> range = TsConditionExtractor.extractTsCondition(ii.getAllColumns().get(tableRecordInfo.getTimestampColumn()), rootFilter);
+
+        Assert.assertEquals(946684800000L, range.lowerEndpoint().longValue());
+        Assert.assertEquals(946771200000L, range.upperEndpoint().longValue());
+        Assert.assertEquals(BoundType.OPEN, range.lowerBoundType());
+        Assert.assertEquals(BoundType.CLOSED, range.upperBoundType());
+    }
+
+    @Test
+    public void testMoreComplexFilter() {
+        CompareTupleFilter aFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        aFilter.addChild(new ColumnTupleFilter(calDt));
+        aFilter.addChild(new ConstantTupleFilter("2000-01-01"));
+
+        CompareTupleFilter bFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        bFilter.addChild(new ColumnTupleFilter(calDt));
+        bFilter.addChild(new ConstantTupleFilter("2000-01-04"));
+
+        CompareTupleFilter cFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        cFilter.addChild(new ColumnTupleFilter(calDt));
+        cFilter.addChild(new ConstantTupleFilter("2000-01-03"));
+
+        CompareTupleFilter dFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        dFilter.addChild(new ColumnTupleFilter(siteId));
+        dFilter.addChild(new ConstantTupleFilter("0"));
+
+        LogicalTupleFilter subRoot = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        subRoot.addChildren(Lists.newArrayList(aFilter, bFilter, cFilter, dFilter));
+
+        CompareTupleFilter outFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        outFilter.addChild(new ColumnTupleFilter(calDt));
+        outFilter.addChild(new ConstantTupleFilter("2000-01-02"));
+
+        LogicalTupleFilter root = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        root.addChildren(Lists.newArrayList(subRoot, outFilter));
+
+        Range<Long> range = TsConditionExtractor.extractTsCondition(ii.getAllColumns().get(tableRecordInfo.getTimestampColumn()), root);
+
+        Assert.assertEquals(946684800000L, range.lowerEndpoint().longValue());
+        Assert.assertEquals(946771200000L, range.upperEndpoint().longValue());
+        Assert.assertEquals(BoundType.OPEN, range.lowerBoundType());
+        Assert.assertEquals(BoundType.CLOSED, range.upperBoundType());
+    }
+
+    @Test
+    public void testComplexConflictFilter() {
+        CompareTupleFilter aFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        aFilter.addChild(new ColumnTupleFilter(calDt));
+        aFilter.addChild(new ConstantTupleFilter("2000-01-01"));
+
+        CompareTupleFilter bFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        bFilter.addChild(new ColumnTupleFilter(calDt));
+        bFilter.addChild(new ConstantTupleFilter("1999-01-03"));
+
+        CompareTupleFilter cFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        cFilter.addChild(new ColumnTupleFilter(calDt));
+        cFilter.addChild(new ConstantTupleFilter("2000-01-02"));
+
+        CompareTupleFilter dFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        dFilter.addChild(new ColumnTupleFilter(siteId));
+        dFilter.addChild(new ConstantTupleFilter("0"));
+
+        LogicalTupleFilter rootFilter = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        rootFilter.addChildren(Lists.newArrayList(aFilter, bFilter, cFilter, dFilter));
+
+        Range<Long> range = TsConditionExtractor.extractTsCondition(ii.getAllColumns().get(tableRecordInfo.getTimestampColumn()), rootFilter);
+
+        Assert.assertTrue(range == null);
+
+    }
+
+    @Test
+    public void testMoreComplexConflictFilter() {
+        CompareTupleFilter aFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.GT);
+        aFilter.addChild(new ColumnTupleFilter(calDt));
+        aFilter.addChild(new ConstantTupleFilter("2000-01-01"));
+
+        CompareTupleFilter bFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        bFilter.addChild(new ColumnTupleFilter(calDt));
+        bFilter.addChild(new ConstantTupleFilter("2000-01-04"));
+
+        CompareTupleFilter cFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        cFilter.addChild(new ColumnTupleFilter(calDt));
+        cFilter.addChild(new ConstantTupleFilter("2000-01-03"));
+
+        CompareTupleFilter dFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.EQ);
+        dFilter.addChild(new ColumnTupleFilter(siteId));
+        dFilter.addChild(new ConstantTupleFilter("0"));
+
+        LogicalTupleFilter subRoot = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        subRoot.addChildren(Lists.newArrayList(aFilter, bFilter, cFilter, dFilter));
+
+        CompareTupleFilter outFilter = new CompareTupleFilter(TupleFilter.FilterOperatorEnum.LTE);
+        outFilter.addChild(new ColumnTupleFilter(calDt));
+        outFilter.addChild(new ConstantTupleFilter("1999-01-02"));
+
+        LogicalTupleFilter root = new LogicalTupleFilter(TupleFilter.FilterOperatorEnum.AND);
+        root.addChildren(Lists.newArrayList(subRoot, outFilter));
+
+        Range<Long> range = TsConditionExtractor.extractTsCondition(ii.getAllColumns().get(tableRecordInfo.getTimestampColumn()), root);
+
+        Assert.assertTrue(range == null);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
new file mode 100644
index 0000000..091d182
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CreateHTableTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class CreateHTableTest extends LocalFileMetadataTestCase {
+
+    private Configuration conf;
+
+    @Before
+    public void setup() throws Exception {
+        conf = new Configuration();
+        conf.set("fs.default.name", "file:///");
+        conf.set("mapred.job.tracker", "local");
+        this.createTestMetadata();
+
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testGetSplits() throws IllegalArgumentException, Exception {
+        CreateHTableJob c = new CreateHTableJob();
+
+        String input = "src/test/resources/partition_list/part-r-00000";
+
+        byte[][] splits = c.getSplits(conf, new Path(input));
+
+        assertEquals(497, splits.length);
+        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 0, 15, -1, 11, 51, -45, 2 }, splits[0]);
+        assertArrayEquals(new byte[] { 0, 0, 0, 0, 0, 3, -1, -1, -54, -61, 109, -44, 1 }, splits[496]);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMetadataTestCase.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMetadataTestCase.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMetadataTestCase.java
new file mode 100644
index 0000000..08ed83a
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMetadataTestCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.File;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractKylinTestCase;
+import org.apache.kylin.common.util.ClassUtil;
+
+/**
+ * @author ysong1
+ */
+public class HBaseMetadataTestCase extends AbstractKylinTestCase {
+
+    static {
+        if (useSandbox()) {
+            try {
+                ClassUtil.addClasspath(new File("../examples/test_case_data/sandbox/").getAbsolutePath());
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    @Override
+    public void createTestMetadata() throws Exception {
+        staticCreateTestMetadata();
+    }
+
+    @Override
+    public void cleanupTestMetadata() {
+        staticCleanupTestMetadata();
+    }
+
+    public static void staticCreateTestMetadata() throws Exception {
+        if (useSandbox()) {
+            staticCreateTestMetadata(SANDBOX_TEST_DATA);
+        } else {
+            staticCreateTestMetadata(MINICLUSTER_TEST_DATA);
+            HBaseMiniclusterHelper.startupMinicluster();
+        }
+
+    }
+
+    public static void staticCreateTestMetadata(String kylinConfigFolder) {
+
+        KylinConfig.destoryInstance();
+
+        if (System.getProperty(KylinConfig.KYLIN_CONF) == null && System.getenv(KylinConfig.KYLIN_CONF) == null)
+            System.setProperty(KylinConfig.KYLIN_CONF, kylinConfigFolder);
+
+    }
+
+    public static boolean useSandbox() {
+        String useSandbox = System.getProperty("useSandbox");
+        if (StringUtils.isEmpty(useSandbox)) {
+            return true;
+        }
+
+        return Boolean.parseBoolean(useSandbox);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMiniclusterHelper.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMiniclusterHelper.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMiniclusterHelper.java
new file mode 100644
index 0000000..e62e1fa
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HBaseMiniclusterHelper.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.kylin.common.KylinConfig;
+
+/**
+ * a helper class to start and shutdown hbase mini cluster
+ *
+ * @author shaoshi
+ */
+public class HBaseMiniclusterHelper {
+
+    public static final String SHARED_STORAGE_PREFIX = "KYLIN_";
+    public static final String CUBE_STORAGE_PREFIX = "KYLIN_";
+    public static final String II_STORAGE_PREFIX = "KYLIN_II_";
+    public static final String TEST_METADATA_TABLE = "kylin_metadata";
+
+    private static final String hbaseTarLocation = "../examples/test_case_data/minicluster/hbase-export.tar.gz";
+    private static final String iiEndpointClassName = "org.apache.kylin.storage.hbase.coprocessor.endpoint.IIEndpoint";
+
+    public static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+    private static volatile boolean clusterStarted = false;
+    private static String hbaseconnectionUrl = "";
+
+    private static final Log logger = LogFactory.getLog(HBaseMiniclusterHelper.class);
+
+    static {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                shutdownMiniCluster();
+            }
+        });
+    }
+
+    /**
+     * Start the minicluster; Sub-classes should invoke this in BeforeClass method.
+     *
+     * @throws Exception
+     */
+    public static void startupMinicluster() throws Exception {
+
+        if (!clusterStarted) {
+            synchronized (HBaseMiniclusterHelper.class) {
+                if (!clusterStarted) {
+                    startupMiniClusterAndImportData();
+                    clusterStarted = true;
+                }
+            }
+        } else {
+            updateKylinConfigWithMinicluster();
+        }
+    }
+
+    private static void updateKylinConfigWithMinicluster() {
+
+        KylinConfig.getInstanceFromEnv().setMetadataUrl(TEST_METADATA_TABLE + "@" + hbaseconnectionUrl);
+        KylinConfig.getInstanceFromEnv().setStorageUrl(hbaseconnectionUrl);
+    }
+
+    private static void startupMiniClusterAndImportData() throws Exception {
+
+        logger.info("Going to start mini cluster.");
+
+        if (existInClassPath(iiEndpointClassName)) {
+            HBaseMiniclusterHelper.UTIL.getConfiguration().setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, iiEndpointClassName);
+        }
+
+        //https://issues.apache.org/jira/browse/HBASE-11711
+        UTIL.getConfiguration().setInt("hbase.master.info.port", -1);//avoid port clobbering
+
+        MiniHBaseCluster hbaseCluster = UTIL.startMiniCluster();
+
+        Configuration config = hbaseCluster.getConf();
+        String host = config.get(HConstants.ZOOKEEPER_QUORUM);
+        String port = config.get(HConstants.ZOOKEEPER_CLIENT_PORT);
+        String parent = config.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
+
+        // see in: https://hbase.apache.org/book.html#trouble.rs.runtime.zkexpired
+        config.set("zookeeper.session.timeout", "1200000");
+        config.set("hbase.zookeeper.property.tickTime", "6000");
+        // reduce rpc retry
+        config.set(HConstants.HBASE_CLIENT_PAUSE, "3000");
+        config.set(HConstants.HBASE_CLIENT_RETRIES_NUMBER, "1");
+        config.set(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, "60000");
+
+        hbaseconnectionUrl = "hbase:" + host + ":" + port + ":" + parent;
+        updateKylinConfigWithMinicluster();
+
+        UTIL.startMiniMapReduceCluster();
+
+        // create the metadata htables;
+        @SuppressWarnings("unused")
+        HBaseResourceStore store = new HBaseResourceStore(KylinConfig.getInstanceFromEnv());
+
+        // import the table content
+        HbaseImporter.importHBaseData(hbaseTarLocation, UTIL.getConfiguration());
+
+    }
+
+    private static boolean existInClassPath(String className) {
+        try {
+            Class.forName(className);
+        } catch (ClassNotFoundException e) {
+            return false;
+        }
+        return true;
+    }
+
+    /**
+     * Shutdown the minicluster; 
+     */
+    public static void shutdownMiniCluster() {
+
+        logger.info("Going to shutdown mini cluster.");
+
+        try {
+            UTIL.shutdownMiniMapReduceCluster();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        try {
+            UTIL.shutdownMiniCluster();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public static void main(String[] args) {
+        HBaseMiniclusterHelper t = new HBaseMiniclusterHelper();
+        logger.info(t);
+        try {
+            HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.MINICLUSTER_TEST_DATA);
+            HBaseMiniclusterHelper.startupMinicluster();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            HBaseMiniclusterHelper.shutdownMiniCluster();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HbaseImporter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HbaseImporter.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HbaseImporter.java
new file mode 100644
index 0000000..deea585
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/HbaseImporter.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.mapreduce.Import;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.kylin.common.KylinConfig;
+
+import com.google.common.base.Preconditions;
+
+/**
+ */
+public class HbaseImporter {
+
+    private static final Log logger = LogFactory.getLog(HbaseImporter.class);
+
+    public static void importHBaseData(String hbaseTarLocation, Configuration conf) throws IOException, ClassNotFoundException, InterruptedException {
+
+        if (System.getenv("JAVA_HOME") == null) {
+            logger.error("Didn't find $JAVA_HOME, this will cause HBase data import failed. Please set $JAVA_HOME.");
+            logger.error("Skipping table import...");
+            return;
+        }
+
+        File exportFile = new File(hbaseTarLocation);
+        if (!exportFile.exists()) {
+            logger.error("Didn't find the export achieve file on " + exportFile.getAbsolutePath());
+            return;
+        }
+
+        File folder = File.createTempFile("hbase-import", "tmp");
+        if (folder.exists()) {
+            FileUtils.forceDelete(folder);
+        }
+        folder.mkdirs();
+        FileUtils.forceDeleteOnExit(folder);
+
+        //TarGZUtil.uncompressTarGZ(exportFile, folder);
+        FileUtil.unTar(exportFile, folder);
+        String[] child = folder.list();
+        Preconditions.checkState(child.length == 1);
+        String backupFolderName = child[0];
+        File backupFolder = new File(folder, backupFolderName);
+        String[] tableNames = backupFolder.list();
+
+        for (String table : tableNames) {
+
+            if (!(table.equalsIgnoreCase(HBaseMiniclusterHelper.TEST_METADATA_TABLE) || table.startsWith(HBaseMiniclusterHelper.SHARED_STORAGE_PREFIX))) {
+                continue;
+            }
+
+            // create the htable; otherwise the import will fail.
+            if (table.startsWith(HBaseMiniclusterHelper.II_STORAGE_PREFIX)) {
+                HBaseConnection.createHTableIfNeeded(KylinConfig.getInstanceFromEnv().getStorageUrl(), table, "f");
+            } else if (table.startsWith(HBaseMiniclusterHelper.CUBE_STORAGE_PREFIX)) {
+                HBaseConnection.createHTableIfNeeded(KylinConfig.getInstanceFromEnv().getStorageUrl(), table, "F1", "F2");
+            }
+
+            // directly import from local fs, no need to copy to hdfs
+            String importLocation = "file://" + backupFolder.getAbsolutePath() + "/" + table;
+            String[] args = new String[] { table, importLocation };
+            boolean result = runImport(args, conf);
+            logger.info("importing table '" + table + "' with result:" + result);
+
+            if (!result)
+                break;
+        }
+
+    }
+
+    private static boolean runImport(String[] args, Configuration configuration) throws IOException, InterruptedException, ClassNotFoundException {
+        // need to make a copy of the configuration because to make sure different temp dirs are used.
+        GenericOptionsParser opts = new GenericOptionsParser(new Configuration(configuration), args);
+        Configuration newConf = opts.getConfiguration();
+        args = opts.getRemainingArgs();
+        Job job = Import.createSubmittableJob(newConf, args);
+        job.waitForCompletion(false);
+        return job.isSuccessful();
+    }
+
+    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
+        if (args.length != 1) {
+            logger.error("Usage: HbaseImporter hbase_tar_lcoation");
+            System.exit(-1);
+        }
+
+        logger.info("The KylinConfig being used:");
+        logger.info("=================================================");
+        KylinConfig.getInstanceFromEnv().printProperties();
+        logger.info("=================================================");
+
+        importHBaseData(args[0], HBaseConfiguration.create());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
new file mode 100644
index 0000000..4c9918d
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHBaseResourceStoreTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ITHBaseResourceStoreTest extends HBaseMetadataTestCase {
+
+    @Before
+    public void setup() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testHBaseStore() throws Exception {
+        testAStore(ResourceStore.getStore(KylinConfig.getInstanceFromEnv()));
+    }
+
+    @Test
+    public void testHBaseStoreWithLargeCell() throws Exception {
+        String path = "/cube/_test_large_cell.json";
+        String largeContent = "THIS_IS_A_LARGE_CELL";
+        StringEntity content = new StringEntity(largeContent);
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        int origSize = config.getHBaseKeyValueSize();
+        ResourceStore store = ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+
+        try {
+            config.setProperty("kylin.hbase.client.keyvalue.maxsize", String.valueOf(largeContent.length() - 1));
+
+            store.deleteResource(path);
+
+            store.putResource(path, content, StringEntity.serializer);
+            assertTrue(store.exists(path));
+            StringEntity t = store.getResource(path, StringEntity.class, StringEntity.serializer);
+            assertEquals(content, t);
+
+            Path redirectPath = ((HBaseResourceStore) store).bigCellHDFSPath(path);
+            Configuration hconf = HadoopUtil.getCurrentConfiguration();
+            FileSystem fileSystem = FileSystem.get(hconf);
+            assertTrue(fileSystem.exists(redirectPath));
+
+            FSDataInputStream in = fileSystem.open(redirectPath);
+            assertEquals(largeContent, in.readUTF());
+            in.close();
+
+            store.deleteResource(path);
+        } finally {
+            config.setProperty("kylin.hbase.client.keyvalue.maxsize", "" + origSize);
+            store.deleteResource(path);
+        }
+    }
+
+    void testAStore(ResourceStore store) throws IOException {
+        String dir1 = "/cube";
+        String path1 = "/cube/_test.json";
+        StringEntity content1 = new StringEntity("anything");
+        String dir2 = "/table";
+        String path2 = "/table/_test.json";
+        StringEntity content2 = new StringEntity("something");
+
+        // cleanup legacy if any
+        store.deleteResource(path1);
+        store.deleteResource(path2);
+
+        StringEntity t;
+
+        // put/get
+        store.putResource(path1, content1, StringEntity.serializer);
+        assertTrue(store.exists(path1));
+        t = store.getResource(path1, StringEntity.class, StringEntity.serializer);
+        assertEquals(content1, t);
+
+        store.putResource(path2, content2, StringEntity.serializer);
+        assertTrue(store.exists(path2));
+        t = store.getResource(path2, StringEntity.class, StringEntity.serializer);
+        assertEquals(content2, t);
+
+        // overwrite
+        t.str = "new string";
+        store.putResource(path2, t, StringEntity.serializer);
+
+        // write conflict
+        try {
+            t.setLastModified(t.getLastModified() - 1);
+            store.putResource(path2, t, StringEntity.serializer);
+            fail("write conflict should trigger IllegalStateException");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+
+        // list
+        ArrayList<String> list;
+
+        list = store.listResources(dir1);
+        assertTrue(list.contains(path1));
+        assertTrue(list.contains(path2) == false);
+
+        list = store.listResources(dir2);
+        assertTrue(list.contains(path2));
+        assertTrue(list.contains(path1) == false);
+
+        list = store.listResources("/");
+        assertTrue(list.contains(dir1));
+        assertTrue(list.contains(dir2));
+        assertTrue(list.contains(path1) == false);
+        assertTrue(list.contains(path2) == false);
+
+        list = store.listResources(path1);
+        assertNull(list);
+        list = store.listResources(path2);
+        assertNull(list);
+
+        // delete/exist
+        store.deleteResource(path1);
+        assertTrue(store.exists(path1) == false);
+        list = store.listResources(dir1);
+        assertTrue(list == null || list.contains(path1) == false);
+
+        store.deleteResource(path2);
+        assertTrue(store.exists(path2) == false);
+        list = store.listResources(dir2);
+        assertTrue(list == null || list.contains(path2) == false);
+    }
+
+    public static class StringEntity extends RootPersistentEntity {
+
+        static final Serializer<StringEntity> serializer = new Serializer<StringEntity>() {
+            @Override
+            public void serialize(StringEntity obj, DataOutputStream out) throws IOException {
+                out.writeUTF(obj.str);
+            }
+
+            @Override
+            public StringEntity deserialize(DataInputStream in) throws IOException {
+                String str = in.readUTF();
+                return new StringEntity(str);
+            }
+        };
+
+        String str;
+
+        public StringEntity(String str) {
+            this.str = str;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = super.hashCode();
+            result = prime * result + ((str == null) ? 0 : str.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == this)
+                return true;
+            if (!(obj instanceof StringEntity))
+                return false;
+            return StringUtils.equals(this.str, ((StringEntity) obj).str);
+        }
+
+        @Override
+        public String toString() {
+            return str;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
new file mode 100644
index 0000000..5db0697
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/ITHdfsOpsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ */
+public class ITHdfsOpsTest extends HBaseMetadataTestCase {
+
+    FileSystem fileSystem;
+
+    @Before
+    public void setup() throws Exception {
+
+        this.createTestMetadata();
+
+        Configuration hconf = new Configuration();
+
+        fileSystem = FileSystem.get(hconf);
+    }
+
+    @Test
+    public void TestPath() throws IOException {
+        String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+        Path coprocessorDir = new Path(hdfsWorkingDirectory, "test");
+        fileSystem.mkdirs(coprocessorDir);
+
+        Path newFile = new Path(coprocessorDir, "test_file");
+        newFile = newFile.makeQualified(fileSystem.getUri(), null);
+        FSDataOutputStream stream = fileSystem.create(newFile);
+        stream.write(new byte[] { 0, 1, 2 });
+        stream.close();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3d2685d8/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
new file mode 100644
index 0000000..d62da80
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/RowValueDecoderTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.steps;
+
+import static org.junit.Assert.assertEquals;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RowValueDecoderTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        MetadataManager.clearCache();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testDecode() throws Exception {
+        CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
+        HBaseColumnDesc hbaseCol = cubeDesc.getHBaseMapping().getColumnFamily()[0].getColumns()[0];
+
+        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
+        BigDecimal sum = new BigDecimal("333.1234567");
+        BigDecimal min = new BigDecimal("333.1111111");
+        BigDecimal max = new BigDecimal("333.1999999");
+        LongMutable count = new LongMutable(2);
+        LongMutable item_count = new LongMutable(100);
+        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        codec.encode(new Object[] { sum, min, max, count, item_count }, buf);
+
+        buf.flip();
+        byte[] valueBytes = new byte[buf.limit()];
+        System.arraycopy(buf.array(), 0, valueBytes, 0, buf.limit());
+
+        RowValueDecoder rowValueDecoder = new RowValueDecoder(hbaseCol);
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            FunctionDesc aggrFunc = measure.getFunction();
+            int index = hbaseCol.findMeasureIndex(aggrFunc);
+            rowValueDecoder.setIndex(index);
+        }
+
+        rowValueDecoder.decode(valueBytes);
+        Object[] measureValues = rowValueDecoder.getValues();
+        //BigDecimal.ROUND_HALF_EVEN in BigDecimalSerializer
+        assertEquals("[333.1235, 333.1111, 333.2000, 2, 100]", Arrays.toString(measureValues));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testError() throws Exception {
+        CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
+        HBaseColumnDesc hbaseCol = cubeDesc.getHBaseMapping().getColumnFamily()[0].getColumns()[0];
+
+        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
+        BigDecimal sum = new BigDecimal("11111111111111111111333.1234567");
+        BigDecimal min = new BigDecimal("333.1111111");
+        BigDecimal max = new BigDecimal("333.1999999");
+        LongWritable count = new LongWritable(2);
+        LongMutable item_count = new LongMutable(100);
+        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        codec.encode(new Object[] { sum, min, max, count, item_count }, buf);
+
+    }
+}



Mime
View raw message