kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mahong...@apache.org
Subject [4/7] incubator-kylin git commit: KYLIN-942 support parallel scan for grid table
Date Sun, 25 Oct 2015 13:12:17 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query85.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query85.sql b/query/src/test/resources/query/sql/query85.sql
new file mode 100644
index 0000000..1a51a02
--- /dev/null
+++ b/query/src/test/resources/query/sql/query85.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select sum(price) as GMV, count(*) as TRANS_CNT  FROM test_kylin_fact
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
+ AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id 
+where test_kylin_fact.cal_dt < DATE '2012-05-01' or test_kylin_fact.cal_dt > DATE '2013-05-01'
+ 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/query/src/test/resources/query/sql/query86.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query86.sql b/query/src/test/resources/query/sql/query86.sql
new file mode 100644
index 0000000..f6feaaf
--- /dev/null
+++ b/query/src/test/resources/query/sql/query86.sql
@@ -0,0 +1,24 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select test_kylin_fact.cal_dt, count(*) as mmm from test_kylin_fact inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
+ inner JOIN edw.test_sites as test_sites
+ ON test_kylin_fact.lstg_site_id = test_sites.site_id where lstg_format_name = 'Others'  group by test_kylin_fact.cal_dt order by test_kylin_fact.cal_dt 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/server/src/main/resources/kylin-server-log4j.properties
----------------------------------------------------------------------
diff --git a/server/src/main/resources/kylin-server-log4j.properties b/server/src/main/resources/kylin-server-log4j.properties
index 6ccd161..a93627a 100644
--- a/server/src/main/resources/kylin-server-log4j.properties
+++ b/server/src/main/resources/kylin-server-log4j.properties
@@ -43,7 +43,7 @@ log4j.logger.org.springframework=WARN
 log4j.logger.org.apache.kylin.rest.controller.QueryController=DEBUG, query
 log4j.logger.org.apache.kylin.rest.service.QueryService=DEBUG, query
 log4j.logger.org.apache.kylin.query=DEBUG, query
-log4j.logger.org.apache.kylin.storage=DEBUG, query
+#log4j.logger.org.apache.kylin.storage=DEBUG, query //too many stuff in storage package now
 
 #job config
 log4j.logger.org.apache.kylin.rest.controller.JobController=DEBUG, job

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
index 421f648..53465d8 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/HBaseStorage.java
@@ -35,10 +35,7 @@ import org.apache.kylin.storage.IStorageQuery;
 import org.apache.kylin.storage.cache.CacheFledgedDynamicQuery;
 import org.apache.kylin.storage.cache.CacheFledgedStaticQuery;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput;
-import org.apache.kylin.storage.hbase.steps.HBaseMROutput2;
 import org.apache.kylin.storage.hbase.steps.HBaseMROutput2Transition;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.apache.kylin.storage.hybrid.HybridStorageQuery;
 
 import com.google.common.base.Preconditions;
 
@@ -46,7 +43,7 @@ import com.google.common.base.Preconditions;
 public class HBaseStorage implements IStorage {
 
     private final static boolean allowStorageLayerCache = true;
-    private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v1.CubeStorageQuery";
+    private final static String defaultCubeStorageQuery = "org.apache.kylin.storage.hbase.cube.v2.CubeStorageQuery";
     private final static String defaultIIStorageQuery = "org.apache.kylin.storage.hbase.ii.InvertedIndexStorageQuery";
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
index 09295b0..9b839c3 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorProjector.java
@@ -41,7 +41,7 @@ public class CoprocessorProjector {
 
         RowKeyEncoder rowKeyMaskEncoder = new RowKeyEncoder(cubeSegment, cuboid) {
             @Override
-            protected int fillHeader(byte[] bytes, byte[][] values) {
+            protected int fillHeader(byte[] bytes) {
                 Arrays.fill(bytes, 0, this.headerLength, (byte) 0xff);
                 return this.headerLength;
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
index 4b7c4dc..7ec97c0 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/common/coprocessor/CoprocessorRowType.java
@@ -131,7 +131,7 @@ public class CoprocessorRowType {
 
     private void init() {
         int[] offsets = new int[columns.length];
-        int o = RowConstants.ROWKEY_CUBOIDID_LEN;
+        int o = RowConstants.ROWKEY_HEADER_LEN;
         for (int i = 0; i < columns.length; i++) {
             offsets[i] = o;
             o += columnSizes[i];

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 9b2cf66..034ffac 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -186,7 +186,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         info.append(keyRange.getCuboid().getId());
         info.append("\nStart: ");
         info.append(keyRange.getStartKeyAsString());
-        info.append("     - ");
+        info.append(" - ");
         info.append(Bytes.toStringBinary(keyRange.getStartKey()));
         info.append("\nStop:  ");
         info.append(keyRange.getStopKeyAsString());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index ed3a518..5c2117d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -18,17 +18,32 @@
 
 package org.apache.kylin.storage.hbase.cube.v1;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
@@ -58,9 +73,8 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import com.google.common.collect.Lists;
 
-//v1
 @SuppressWarnings("unused")
 public class CubeStorageQuery implements ICachableStorageQuery {
 
@@ -86,7 +100,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             }
         }
     }
-    
+
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
 
@@ -135,11 +149,8 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         collectNonEvaluable(filter, groupsCopD);
         TupleFilter filterD = translateDerived(filter, groupsCopD);
 
-        // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
-        TupleFilter flatFilter = flattenToOrAndFilter(filterD);
-
         // translate filter into segment scan ranges
-        List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
+        List<HBaseKeyRange> scans = buildScanRanges(flattenToOrAndFilter(filterD), dimensionsD);
 
         // check involved measures, build value decoder for each each family:column
         List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
@@ -150,7 +161,9 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         setLimit(filter, context);
 
         HConnection conn = HBaseConnection.get(context.getConnUrl());
+
         return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
+        //Notice we're passing filterD down to storage instead of flatFilter
     }
 
     @Override
@@ -400,10 +413,12 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         return new ArrayList<RowValueDecoder>(codecMap.values());
     }
 
+    //check TupleFilter.flatFilter's comment
     private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
         if (filter == null)
             return null;
 
+        // core
         TupleFilter flatFilter = filter.flatFilter();
 
         // normalize to OR-AND filter
@@ -445,27 +460,30 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             }
 
             //log
-            sb.append(scanRanges.size() + "=>");
+            sb.append(scanRanges.size() + "=(mergeoverlap)>");
 
             List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
 
             //log
-            sb.append(mergedRanges.size() + "=>");
+            sb.append(mergedRanges.size() + "=(mergetoomany)>");
 
             mergedRanges = mergeTooManyRanges(mergedRanges);
 
             //log
-            sb.append(mergedRanges.size() + ", ");
+            sb.append(mergedRanges.size() + ",");
 
             result.addAll(mergedRanges);
         }
-
         logger.info(sb.toString());
 
         logger.info("hbasekeyrange count: " + result.size());
+
         dropUnhitSegments(result);
         logger.info("hbasekeyrange count after dropping unhit :" + result.size());
 
+        result = duplicateRangeByShard(result);
+        logger.info("hbasekeyrange count after dropping duplicatebyshard :" + result.size());
+
         return result;
     }
 
@@ -675,6 +693,42 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         }
     }
 
+    private List<HBaseKeyRange> duplicateRangeByShard(List<HBaseKeyRange> scans) {
+        List<HBaseKeyRange> ret = Lists.newArrayList();
+
+        for (HBaseKeyRange scan : scans) {
+            CubeSegment segment = scan.getCubeSegment();
+
+            byte[] startKey = scan.getStartKey();
+            byte[] stopKey = scan.getStopKey();
+
+            short cuboidShardNum = segment.getCuboidShardNum(scan.getCuboid().getId());
+            short cuboidShardBase = segment.getCuboidBaseShard(scan.getCuboid().getId());
+            for (short i = 0; i < cuboidShardNum; ++i) {
+                byte[] newStartKey = duplicateKeyAndChangeShard(i, startKey);
+                byte[] newStopKey = duplicateKeyAndChangeShard(i, stopKey);
+                HBaseKeyRange newRange = new HBaseKeyRange(segment, scan.getCuboid(), newStartKey, newStopKey, //
+                        scan.getFuzzyKeys(), scan.getFlatOrAndFilter(), scan.getPartitionColumnStartDate(), scan.getPartitionColumnEndDate());
+                ret.add(newRange);
+            }
+        }
+
+        Collections.sort(ret, new Comparator<HBaseKeyRange>() {
+            @Override
+            public int compare(HBaseKeyRange o1, HBaseKeyRange o2) {
+                return Bytes.compareTo(o1.getStartKey(), o2.getStartKey());
+            }
+        });
+
+        return ret;
+    }
+
+    private byte[] duplicateKeyAndChangeShard(short newShard, byte[] bytes) {
+        byte[] ret = Arrays.copyOf(bytes, bytes.length);
+        BytesUtil.writeShort(newShard, ret, 0, RowConstants.ROWKEY_SHARDID_LEN);
+        return ret;
+    }
+
     private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
         if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
             return;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 7f28baf..86bc42d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -21,9 +21,13 @@ package org.apache.kylin.storage.hbase.cube.v2;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.zip.DataFormatException;
 
 import javax.annotation.Nullable;
@@ -35,7 +39,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.util.KryoUtils;
@@ -43,24 +46,29 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.storage.hbase.HBaseConnection;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitResponse.Stats;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
-
-import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import com.google.protobuf.HBaseZeroCopyByteString;
 
 public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
-    static class EndpintResultsAsGTScanner implements IGTScanner {
+    static class EndpointResultsAsGTScanner implements IGTScanner {
         private GTInfo info;
         private Iterator<byte[]> blocks;
+        private ImmutableBitSet columns;
 
-        public EndpintResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks) {
+        public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns) {
             this.info = info;
             this.blocks = blocks;
+            this.columns = columns;
         }
 
         @Override
@@ -85,7 +93,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                 @Override
                 public Iterator<GTRecord> apply(@Nullable final byte[] input) {
 
-                    logger.info("Reassembling a raw block returned from Endpoint with byte length: " + input.length);
                     return new Iterator<GTRecord>() {
                         private ByteBuffer inputBuffer = null;
                         private GTRecord oneRecord = null;
@@ -102,7 +109,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
                         @Override
                         public GTRecord next() {
-                            oneRecord.loadAllColumns(inputBuffer);
+                            oneRecord.loadColumns(columns, inputBuffer);
                             return oneRecord;
                         }
 
@@ -123,43 +130,98 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
     @Override
     public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
 
-        try {
-            // primary key (also the 0th column block) is always selected
-            final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
-            // globally shared connection, does not require close
-            HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-            final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
-            final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
-
-            RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
-
-            byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
-            byte[] rawScanBytes = KryoUtils.serialize(rawScan);
-            CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
-            builder.setGtScanRequest(ByteString.copyFrom(scanRequestBytes)).setHbaseRawScan(ByteString.copyFrom(rawScanBytes));
-
-            Collection<CubeVisitProtos.CubeVisitResponse> results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey);
-            final Collection<byte[]> rowBlocks = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
-                @Nullable
+        // primary key (also the 0th column block) is always selected
+        final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
+        // globally shared connection, does not require close
+        HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+        final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+
+        List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+        List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
+        final List<IntList> hbaseColumnsToGTIntList = Lists.newArrayList();
+        for (List<Integer> list : hbaseColumnsToGT) {
+            hbaseColumnsToGTIntList.add(IntList.newBuilder().addAllInts(list).build());
+        }
+
+        byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
+        final ByteString scanRequestBytesString = HBaseZeroCopyByteString.wrap(scanRequestBytes);
+
+        ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size());
+        final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList());
+
+        logger.info("Total RawScan range count: " + rawScans.size());
+        for (RawScan rawScan : rawScans) {
+            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+        }
+
+        for (int i = 0; i < rawScans.size(); ++i) {
+            final int shardIndex = i;
+            final RawScan rawScan = rawScans.get(i);
+
+            executorService.submit(new Runnable() {
                 @Override
-                public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
+                public void run() {
+                    final byte[] rawScanBytes = KryoUtils.serialize(rawScan);
+                    CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+                    builder.setGtScanRequest(scanRequestBytesString).setHbaseRawScan(HBaseZeroCopyByteString.wrap(rawScanBytes));
+                    for (IntList intList : hbaseColumnsToGTIntList) {
+                        builder.addHbaseColumnsToGT(intList);
+                    }
+
+                    Collection<CubeVisitProtos.CubeVisitResponse> results;
                     try {
-                        return CompressionUtils.decompress(input.getCompressedRows().toByteArray());
-                    } catch (IOException | DataFormatException e) {
-                        throw new RuntimeException(e);
+                        results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey);
+                    } catch (Throwable throwable) {
+                        throw new RuntimeException("Error when visiting cubes by endpoint:", throwable);
+                    }
+
+                    //results.size() supposed to be 1;
+                    if (results.size() != 1) {
+                        logger.warn("{} CubeVisitResponse returned for shard {}", results.size(), shardIndex);
+                    }
+
+                    for (CubeVisitProtos.CubeVisitResponse result : results) {
+                        logger.info(getStatsString(result, shardIndex));
                     }
+
+                    Collection<byte[]> part = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
+                        @Nullable
+                        @Override
+                        public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
+                            try {
+                                return CompressionUtils.decompress(HBaseZeroCopyByteString.zeroCopyGetBytes(input.getCompressedRows()));
+                            } catch (IOException | DataFormatException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                    });
+                    rowBlocks.addAll(part);
                 }
             });
+        }
+        executorService.shutdown();
+        try {
+            if (!executorService.awaitTermination(1, TimeUnit.HOURS)) {
+                throw new RuntimeException("Visiting cube by endpoint timeout");
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Visiting cube by endpoint gets interrupted");
+        }
 
-            return new EndpintResultsAsGTScanner(fullGTInfo, rowBlocks.iterator());
+        return new EndpointResultsAsGTScanner(fullGTInfo, rowBlocks.iterator(), scanRequest.getColumns());
+    }
+
+    private String getStatsString(CubeVisitProtos.CubeVisitResponse result, int shardIndex) {
+        StringBuilder sb = new StringBuilder();
+        Stats stats = result.getStats();
+        sb.append("Shard " + shardIndex + ": ");
+        sb.append("Total scanned row: " + stats.getScannedRowCount() + ". ");
+        sb.append("Total filtered/aggred row: " + stats.getAggregatedRowCount() + ". ");
+        sb.append("Time elapsed in EP: " + (stats.getServiceEndTime() - stats.getServiceStartTime()) + "(ms). ");
+        return sb.toString();
 
-        } catch (Throwable throwable) {
-            throwable.printStackTrace();
-        }
-        return null;
     }
 
-    //TODO : async callback
     private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
         Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
             public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
@@ -174,8 +236,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             }
         });
 
-        logger.info("{} regions returned results ", results.values().size());
-
         return results.values();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 09bef0f..1d217ac 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -2,6 +2,7 @@ package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
@@ -14,6 +15,7 @@ import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -27,6 +29,7 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 public abstract class CubeHBaseRPC {
@@ -69,59 +72,142 @@ public abstract class CubeHBaseRPC {
         return scan;
     }
 
-    protected RawScan prepareRawScan(GTRecord pkStart, GTRecord pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
-        byte[] start = makeRowKeyToScan(pkStart, (byte) 0x00);
-        byte[] end = makeRowKeyToScan(pkEnd, (byte) 0xff);
+    protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) {
+        final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks);
+        List<RawScan> ret = Lists.newArrayList();
 
-        //TODO fuzzy match
+        byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE);
+        byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE);
+        List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys);
 
-        return new RawScan(start, end, selectedColumns, null);
+        short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId());
+
+        for (short i = 0; i < cuboidShardNum; ++i) {
+            short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards());
+
+            byte[] shardStart = Arrays.copyOf(start, start.length);
+            byte[] shardEnd = new byte[end.length + 1];//append extra 0 to the end key to make it inclusive while scanning
+            System.arraycopy(end, 0, shardEnd, 0, end.length);
+
+            BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN);
+            BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN);
+
+            ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys));
+        }
+        return ret;
+
+    }
+
+    /**
+     * translate GTRecord format fuzzy keys to hbase expected format
+     * @return
+     */
+    private List<Pair<byte[], byte[]>> translateFuzzyKeys(List<GTRecord> fuzzyKeys) {
+        if (fuzzyKeys == null || fuzzyKeys.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<Pair<byte[], byte[]>> ret = Lists.newArrayList();
+        int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey());
+        for (GTRecord gtRecordFuzzyKey : fuzzyKeys) {
+            byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
+            byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN];
+
+            int pos = 0;
+            //shard part
+            Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed
+            pos += RowConstants.ROWKEY_SHARDID_LEN;
+
+            //cuboid part
+            Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO);
+            System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN);
+            pos += RowConstants.ROWKEY_CUBOIDID_LEN;
+
+            //row key core part
+            ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO);
+            System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length());
+            ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey);
+            System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length());
+
+            Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length");
+            pos += coreKey.length();
+            Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated");
+
+            ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask));
+        }
+
+        return ret;
     }
 
     private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) {
-        ByteArray pk = GTRecord.exportScanKey(pkRec);
-        int pkMaxLen = pkRec.getInfo().getMaxColumnLength(pkRec.getInfo().getPrimaryKey());
+        ByteArray pk = HBaseScan.exportScanKey(pkRec, fill);
 
-        byte[] buf = new byte[pkMaxLen + RowConstants.ROWKEY_CUBOIDID_LEN];
+        byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN];
         Arrays.fill(buf, fill);
 
-        System.arraycopy(cuboid.getBytes(), 0, buf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        //for scanning/reading, later all possible shard will be applied 
+
+        System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN);
         if (pk != null && pk.array() != null) {
-            System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_CUBOIDID_LEN, pk.length());
+            System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_HEADER_LEN, pk.length());
         }
         return buf;
     }
 
+    /**
+     * prune untouched hbase columns
+     */
     protected List<Pair<byte[], byte[]>> makeHBaseColumns(ImmutableBitSet selectedColBlocks) {
         List<Pair<byte[], byte[]>> result = Lists.newArrayList();
 
-        int colBlockIdx = 1; // start from 1; the 0th column block is primary key which maps to rowkey
+        int colBlkIndex = 1;
         HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
         for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
             byte[] byteFamily = Bytes.toBytes(familyDesc.getName());
             for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
-                if (selectedColBlocks.get(colBlockIdx)) {
+                if (selectedColBlocks.get(colBlkIndex)) {
                     byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier());
                     result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier));
                 }
-                colBlockIdx++;
+                colBlkIndex++;
             }
         }
 
         return result;
     }
 
-    //possible to use binary search as cells might be sorted
-    public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) {
-        for (Cell c : cells) {
-            if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && //
-                    BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) {
-                return c;
+    /**
+     * for each selected hbase column, it might contain values of multiple GT columns.
+     * The mapping should be passed down to storage
+     */
+    protected List<List<Integer>> getHBaseColumnsGTMapping(ImmutableBitSet selectedColBlocks) {
+
+        List<List<Integer>> ret = Lists.newArrayList();
+
+        int colBlkIndex = 1;
+        int metricOffset = fullGTInfo.getPrimaryKey().trueBitCount();
+
+        HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
+        for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                if (selectedColBlocks.get(colBlkIndex)) {
+                    int[] metricIndexes = hbaseColDesc.getMeasureIndex();
+                    Integer[] gtIndexes = new Integer[metricIndexes.length];
+                    for (int i = 0; i < gtIndexes.length; i++) {
+                        gtIndexes[i] = metricIndexes[i] + metricOffset;
+                    }
+                    ret.add(Arrays.asList(gtIndexes));
+                }
+                colBlkIndex++;
             }
         }
-        return null;
+
+        Preconditions.checkState(selectedColBlocks.trueBitCount() == ret.size() + 1);
+        return ret;
     }
 
+  
+
     public static void applyHBaseColums(Scan scan, List<Pair<byte[], byte[]>> hbaseColumns) {
         for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) {
             byte[] byteFamily = hbaseColumn.getFirst();
@@ -157,4 +243,33 @@ public abstract class CubeHBaseRPC {
         return result;
     }
 
+    protected void logScan(RawScan rawScan, String tableName) {
+        StringBuilder info = new StringBuilder();
+        info.append("\nVisiting hbase table ").append(tableName).append(": ");
+        if (cuboid.requirePostAggregation()) {
+            info.append("cuboid require post aggregation, from ");
+        } else {
+            info.append("cuboid exact match, from ");
+        }
+        info.append(cuboid.getInputID());
+        info.append(" to ");
+        info.append(cuboid.getId());
+        info.append("\nStart: ");
+        info.append(rawScan.getStartKeyAsString());
+        info.append(" - ");
+        info.append(Bytes.toStringBinary(rawScan.startKey));
+        info.append("\nStop:  ");
+        info.append(rawScan.getEndKeyAsString());
+        info.append(" - ");
+        info.append(Bytes.toStringBinary(rawScan.endKey));
+        if (rawScan.fuzzyKey != null) {
+            info.append("\nFuzzy key counts: " + rawScan.fuzzyKey.size());
+            info.append("\nFuzzy: ");
+            info.append(rawScan.getFuzzyKeyAsString());
+        } else {
+            info.append("\nNo Fuzzy Key");
+        }
+        logger.info(info.toString());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
index 8838578..fa5a844 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -11,20 +11,55 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 /**
  * for test use only
  */
 public class CubeHBaseScanRPC extends CubeHBaseRPC {
 
+    static class TrimmedInfoGTRecordAdapter implements Iterable<GTRecord> {
+
+        private final GTInfo info;
+        private final Iterator<GTRecord> input;
+
+        public TrimmedInfoGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) {
+            this.info = info;
+            this.input = input;
+        }
+
+        @Override
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+                @Override
+                public boolean hasNext() {
+                    return input.hasNext();
+                }
+
+                @Override
+                public GTRecord next() {
+                    GTRecord x = input.next();
+                    return new GTRecord(info, x.getInternal());
+                }
+
+                @Override
+                public void remove() {
+
+                }
+            };
+        }
+    }
+
     public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
         super(cubeSeg, cuboid, fullGTInfo);
     }
@@ -34,34 +69,47 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
 
         // primary key (also the 0th column block) is always selected
         final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
-
         // globally shared connection, does not require close
         HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
-
         final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
-        final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
 
-        RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
-        Scan hbaseScan = buildScan(rawScan);
+        List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks);
+        List<List<Integer>> hbaseColumnsToGT = getHBaseColumnsGTMapping(selectedColBlocks);
+
+        final List<ResultScanner> scanners = Lists.newArrayList();
+        final List<Iterator<Result>> resultIterators = Lists.newArrayList();
+
+        for (RawScan rawScan : rawScans) {
+
+            logScan(rawScan, cubeSeg.getStorageLocationIdentifier());
+            Scan hbaseScan = buildScan(rawScan);
+
+            final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+            final Iterator<Result> iterator = scanner.iterator();
+
+            scanners.add(scanner);
+            resultIterators.add(iterator);
+        }
 
-        final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
-        final Iterator<Result> iterator = scanner.iterator();
+        final Iterator<Result> allResultsIterator = Iterators.concat(resultIterators.iterator());
 
         CellListIterator cellListIterator = new CellListIterator() {
             @Override
             public void close() throws IOException {
-                scanner.close();
+                for (ResultScanner scanner : scanners) {
+                    scanner.close();
+                }
                 hbaseTable.close();
             }
 
             @Override
             public boolean hasNext() {
-                return iterator.hasNext();
+                return allResultsIterator.hasNext();
             }
 
             @Override
             public List<Cell> next() {
-                return iterator.next().listCells();
+                return allResultsIterator.next().listCells();
             }
 
             @Override
@@ -70,8 +118,32 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC {
             }
         };
 
-        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, hbaseColumns);
+        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns, hbaseColumnsToGT);
         IGTScanner rawScanner = store.scan(scanRequest);
-        return scanRequest.decorateScanner(rawScanner);
+
+        final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner);
+        final TrimmedInfoGTRecordAdapter trimmedInfoGTRecordAdapter = new TrimmedInfoGTRecordAdapter(fullGTInfo, decorateScanner.iterator());
+
+        return new IGTScanner() {
+            @Override
+            public GTInfo getInfo() {
+                return fullGTInfo;
+            }
+
+            @Override
+            public int getScannedRowCount() {
+                return decorateScanner.getScannedRowCount();
+            }
+
+            @Override
+            public void close() throws IOException {
+                decorateScanner.close();
+            }
+
+            @Override
+            public Iterator<GTRecord> iterator() {
+                return trimmedInfoGTRecordAdapter.iterator();
+            }
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
deleted file mode 100644
index 9359934..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
+++ /dev/null
@@ -1,265 +0,0 @@
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.kylin.common.util.ImmutableBitSet;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CubeGridTable;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRange;
-import org.apache.kylin.gridtable.GTScanRangePlanner;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.GTUtil;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class CubeScanner implements IGTScanner {
-
-    private static final int MAX_SCAN_RANGES = 200;
-
-    final CubeSegment cubeSeg;
-    final GTInfo info;
-    final byte[] trimmedInfoBytes;
-    final List<GTScanRequest> scanRequests;
-    final Scanner scanner;
-    final Cuboid cuboid;
-
-    public CubeScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
-            Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
-        this.cuboid = cuboid;
-        this.cubeSeg = cubeSeg;
-        this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
-
-        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
-        //replace the constant values in filter to dictionary codes 
-        TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
-
-        ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
-        ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
-        ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
-        String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
-
-        //TODO: should remove this in endpoint scenario
-        GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info);
-        List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
-
-        scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
-
-        trimmedInfoBytes = GTInfo.serialize(info);
-        GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes);
-
-        for (GTScanRange range : scanRanges) {
-            scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate));
-        }
-
-        scanner = new Scanner();
-    }
-
-    private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
-        Set<TblColRef> ret = Sets.newHashSet();
-        for (TblColRef col : input) {
-            if (cubeDesc.isDerived(col)) {
-                for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
-                    ret.add(host);
-                }
-            } else {
-                ret.add(col);
-            }
-        }
-        return ret;
-    }
-
-    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
-        BitSet result = new BitSet();
-        for (TblColRef dim : dimensions) {
-            int idx = mapping.getIndexOf(dim);
-            if (idx >= 0)
-                result.set(idx);
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
-        BitSet result = new BitSet();
-        for (FunctionDesc metric : metrics) {
-            int idx = mapping.getIndexOf(metric);
-            if (idx < 0)
-                throw new IllegalStateException(metric + " not found in " + mapping);
-            result.set(idx);
-        }
-        return new ImmutableBitSet(result);
-    }
-
-    private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
-
-        //metrics are represented in ImmutableBitSet, which loses order information
-        //sort the aggrFuns to align with metrics natural order 
-        List<FunctionDesc> metricList = Lists.newArrayList(metrics);
-        Collections.sort(metricList, new Comparator<FunctionDesc>() {
-            @Override
-            public int compare(FunctionDesc o1, FunctionDesc o2) {
-                int a = mapping.getIndexOf(o1);
-                int b = mapping.getIndexOf(o2);
-                return a - b;
-            }
-        });
-
-        String[] result = new String[metricList.size()];
-        int i = 0;
-        for (FunctionDesc metric : metricList) {
-            result[i++] = metric.getExpression();
-        }
-        return result;
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        return scanner.iterator();
-    }
-
-    @Override
-    public void close() throws IOException {
-        scanner.close();
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public int getScannedRowCount() {
-        return scanner.getScannedRowCount();
-    }
-
-    static class RemoteGTRecordAdapter implements Iterable<GTRecord> {
-
-        private final GTInfo info;
-        private final Iterator<GTRecord> input;
-
-        public RemoteGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) {
-            this.info = info;
-            this.input = input;
-        }
-
-        @Override
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-                @Override
-                public boolean hasNext() {
-                    return input.hasNext();
-                }
-
-                @Override
-                public GTRecord next() {
-                    GTRecord x = input.next();
-                    return new GTRecord(info, x.getInternal());
-                }
-
-                @Override
-                public void remove() {
-
-                }
-            };
-        }
-    }
-
-    private class Scanner {
-        final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
-        int cur = 0;
-        Iterator<GTRecord> curIterator = null;
-        GTRecord next = null;
-
-        public Iterator<GTRecord> iterator() {
-            return new Iterator<GTRecord>() {
-
-                @Override
-                public boolean hasNext() {
-                    if (next != null)
-                        return true;
-
-                    if (curIterator == null) {
-                        if (cur >= scanRequests.size())
-                            return false;
-
-                        try {
-                            CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
-                            inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur));
-                            curIterator = inputScanners[cur].iterator();
-                            //curIterator = new RemoteGTRecordAdapter(info, inputScanners[cur].iterator()).iterator();
-                        } catch (IOException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }
-
-                    if (curIterator.hasNext() == false) {
-                        curIterator = null;
-                        cur++;
-                        return hasNext();
-                    }
-
-                    next = curIterator.next();
-                    return true;
-                }
-
-                @Override
-                public GTRecord next() {
-                    // fetch next record
-                    if (next == null) {
-                        hasNext();
-                        if (next == null)
-                            throw new NoSuchElementException();
-                    }
-
-                    GTRecord result = next;
-                    next = null;
-                    return result;
-                }
-
-                @Override
-                public void remove() {
-                    throw new UnsupportedOperationException();
-                }
-            };
-        }
-
-        public void close() throws IOException {
-            for (int i = 0; i < inputScanners.length; i++) {
-                if (inputScanners[i] != null) {
-                    inputScanners[i].close();
-                }
-            }
-        }
-
-        public int getScannedRowCount() {
-            int result = 0;
-            for (int i = 0; i < inputScanners.length; i++) {
-                if (inputScanners[i] == null)
-                    break;
-
-                result += inputScanners[i].getScannedRowCount();
-            }
-            return result;
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
new file mode 100644
index 0000000..286da55
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -0,0 +1,290 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class CubeSegmentScanner implements IGTScanner {
+
+    private static final int MAX_SCAN_RANGES = 200;
+
+    final CubeSegment cubeSeg;
+    final GTInfo info;
+    final byte[] trimmedInfoBytes;
+    final List<GTScanRequest> scanRequests;
+    final Scanner scanner;
+    final Cuboid cuboid;
+
+    public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+            Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
+        this.cuboid = cuboid;
+        this.cubeSeg = cubeSeg;
+        this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
+
+        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+        //replace the constant values in filter to dictionary codes 
+        TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
+
+        ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
+        ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
+        ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
+        String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
+
+        GTScanRangePlanner scanRangePlanner;
+        if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) {
+            TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef();
+            Pair<ByteArray, ByteArray> segmentStartAndEnd = null;
+            int index = mapping.getIndexOf(tblColRef);
+            if (index >= 0) {
+                segmentStartAndEnd = getSegmentStartAndEnd(tblColRef, index);
+            }
+            scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, tblColRef);
+        } else {
+            scanRangePlanner = new GTScanRangePlanner(info, null, null);
+        }
+        List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
+
+        scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
+
+        trimmedInfoBytes = GTInfo.serialize(info);
+        GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes);
+
+        for (GTScanRange range : scanRanges) {
+            scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate));
+        }
+
+        scanner = new Scanner();
+    }
+
+    private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(TblColRef tblColRef, int index) {
+
+        String partitionColType = tblColRef.getColumnDesc().getDatatype();
+
+        ByteArray start;
+        if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) {
+            start = translateTsToString(cubeSeg.getDateRangeStart(), partitionColType, index);
+        } else {
+            start = new ByteArray();
+        }
+
+        ByteArray end;
+        if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) {
+            end = translateTsToString(cubeSeg.getDateRangeEnd(), partitionColType, index);
+        } else {
+            end = new ByteArray();
+        }
+        return Pair.newPair(start, end);
+
+    }
+
+    private ByteArray translateTsToString(long ts, String partitionColType, int index) {
+        String value;
+        if ("date".equalsIgnoreCase(partitionColType)) {
+            value = DateFormat.formatToDateStr(ts);
+        } else if ("timestamp".equalsIgnoreCase(partitionColType)) {
+            //TODO: if partition col is not dict encoded, value's format may differ from expected. Though by default it is not the case
+            value = DateFormat.formatToTimeWithoutMilliStr(ts);
+        } else {
+            throw new RuntimeException("Type " + partitionColType + " is not valid partition column type");
+        }
+
+        ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength());
+        info.getCodeSystem().encodeColumnValue(index, value, buffer);
+
+        return ByteArray.copyOf(buffer.array(), 0, buffer.position());
+    }
+
+    private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
+        Set<TblColRef> ret = Sets.newHashSet();
+        for (TblColRef col : input) {
+            if (cubeDesc.isDerived(col)) {
+                for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
+                    ret.add(host);
+                }
+            } else {
+                ret.add(col);
+            }
+        }
+        return ret;
+    }
+
+    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
+        BitSet result = new BitSet();
+        for (TblColRef dim : dimensions) {
+            int idx = mapping.getIndexOf(dim);
+            if (idx >= 0)
+                result.set(idx);
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+        BitSet result = new BitSet();
+        for (FunctionDesc metric : metrics) {
+            int idx = mapping.getIndexOf(metric);
+            if (idx < 0)
+                throw new IllegalStateException(metric + " not found in " + mapping);
+            result.set(idx);
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+
+        //metrics are represented in ImmutableBitSet, which loses order information
+        //sort the aggrFuns to align with metrics natural order 
+        List<FunctionDesc> metricList = Lists.newArrayList(metrics);
+        Collections.sort(metricList, new Comparator<FunctionDesc>() {
+            @Override
+            public int compare(FunctionDesc o1, FunctionDesc o2) {
+                int a = mapping.getIndexOf(o1);
+                int b = mapping.getIndexOf(o2);
+                return a - b;
+            }
+        });
+
+        String[] result = new String[metricList.size()];
+        int i = 0;
+        for (FunctionDesc metric : metricList) {
+            result[i++] = metric.getExpression();
+        }
+        return result;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return scanner.iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        scanner.close();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return scanner.getScannedRowCount();
+    }
+
+    private class Scanner {
+        final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
+        int cur = 0;
+        Iterator<GTRecord> curIterator = null;
+        GTRecord next = null;
+
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+
+                @Override
+                public boolean hasNext() {
+                    if (next != null)
+                        return true;
+
+                    if (curIterator == null) {
+                        if (cur >= scanRequests.size())
+                            return false;
+
+                        try {
+
+                            CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
+                            //CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
+
+                            //change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info);
+                            //to debug locally
+
+                            inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur));
+                            curIterator = inputScanners[cur].iterator();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    if (curIterator.hasNext() == false) {
+                        curIterator = null;
+                        cur++;
+                        return hasNext();
+                    }
+
+                    next = curIterator.next();
+                    return true;
+                }
+
+                @Override
+                public GTRecord next() {
+                    // fetch next record
+                    if (next == null) {
+                        hasNext();
+                        if (next == null)
+                            throw new NoSuchElementException();
+                    }
+
+                    GTRecord result = next;
+                    next = null;
+                    return result;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        public void close() throws IOException {
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] != null) {
+                    inputScanners[i].close();
+                }
+            }
+        }
+
+        public int getScannedRowCount() {
+            int result = 0;
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] == null)
+                    break;
+
+                result += inputScanners[i].getScannedRowCount();
+            }
+            return result;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 35f95ca..71abb41 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -36,6 +36,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
+@SuppressWarnings("unused")
 public class CubeStorageQuery implements ICachableStorageQuery {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
@@ -91,13 +92,11 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         TupleFilter filterD = translateDerived(filter, groupsD);
 
         setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-        // TODO enable coprocessor
-        //        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 
-        List<CubeScanner> scanners = Lists.newArrayList();
+        List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
-            scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation));
+            scanners.add(new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation));
         }
 
         if (scanners.isEmpty())

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index a6c6a23..7731f19 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.gridtable.GTInfo;
@@ -34,20 +34,22 @@ import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.gridtable.IGTWriter;
 
+import com.google.common.base.Preconditions;
+
 public class HBaseReadonlyStore implements IGTStore {
 
     private CellListIterator cellListIterator;
 
     private GTInfo info;
     private List<Pair<byte[], byte[]>> hbaseColumns;
-    private ImmutableBitSet selectedColBlocks;
+    private List<List<Integer>> hbaseColumnsToGT;
 
-    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns) {
+    public HBaseReadonlyStore(CellListIterator cellListIterator, GTScanRequest gtScanRequest, List<Pair<byte[], byte[]>> hbaseColumns, List<List<Integer>> hbaseColumnsToGT) {
         this.cellListIterator = cellListIterator;
 
         this.info = gtScanRequest.getInfo();
         this.hbaseColumns = hbaseColumns;
-        this.selectedColBlocks = gtScanRequest.getSelectedColBlocks().set(0);
+        this.hbaseColumnsToGT = hbaseColumnsToGT;
     }
 
     @Override
@@ -56,20 +58,31 @@ public class HBaseReadonlyStore implements IGTStore {
     }
 
     @Override
-    public IGTWriter rebuild(int shard) throws IOException {
+    public IGTWriter rebuild() throws IOException {
         throw new UnsupportedOperationException();
     }
 
     @Override
-    public IGTWriter append(int shard) throws IOException {
+    public IGTWriter append() throws IOException {
         throw new UnsupportedOperationException();
     }
 
+    //TODO: possible to use binary search as cells might be sorted?
+    public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) {
+        for (Cell c : cells) {
+            if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && //
+                    BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) {
+                return c;
+            }
+        }
+        return null;
+    }
+
     @Override
     public IGTScanner scan(GTScanRequest scanRequest) throws IOException {
         return new IGTScanner() {
             int count;
-            
+
             @Override
             public void close() throws IOException {
                 cellListIterator.close();
@@ -79,7 +92,7 @@ public class HBaseReadonlyStore implements IGTStore {
             public Iterator<GTRecord> iterator() {
                 return new Iterator<GTRecord>() {
                     GTRecord oneRecord = new GTRecord(info); // avoid object creation
-                    
+
                     @Override
                     public boolean hasNext() {
                         return cellListIterator.hasNext();
@@ -87,26 +100,24 @@ public class HBaseReadonlyStore implements IGTStore {
 
                     @Override
                     public GTRecord next() {
+                        count++;
                         List<Cell> oneRow = cellListIterator.next();
                         if (oneRow.size() < 1) {
                             throw new IllegalStateException("cell list's size less than 1");
                         }
 
-                        ByteBuffer buf;
-                        
                         // dimensions, set to primary key, also the 0th column block
                         Cell firstCell = oneRow.get(0);
-                        buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_CUBOIDID_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_CUBOIDID_LEN);
+                        ByteBuffer buf = byteBuffer(firstCell.getRowArray(), RowConstants.ROWKEY_HEADER_LEN + firstCell.getRowOffset(), firstCell.getRowLength() - RowConstants.ROWKEY_HEADER_LEN);
                         oneRecord.loadCellBlock(0, buf);
 
                         // metrics
-                        int hbaseColIdx = 0;
-                        for (int i = 1; i < selectedColBlocks.trueBitCount(); i++) {
-                            int colBlockIdx = selectedColBlocks.trueBitAt(i);
-                            Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(hbaseColIdx++);
-                            Cell cell = CubeHBaseRPC.findCell(oneRow, hbaseColumn.getFirst(), hbaseColumn.getSecond());
+                        for (int i = 0; i < hbaseColumns.size(); i++) {
+                            Pair<byte[], byte[]> hbaseColumn = hbaseColumns.get(i);
+                            Cell cell = HBaseReadonlyStore.findCell(oneRow, hbaseColumn.getFirst(), hbaseColumn.getSecond());
+                            Preconditions.checkNotNull(cell);
                             buf = byteBuffer(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
-                            oneRecord.loadCellBlock(colBlockIdx, buf);
+                            oneRecord.loadColumns(hbaseColumnsToGT.get(i), buf);
                         }
                         return oneRecord;
 
@@ -116,7 +127,7 @@ public class HBaseReadonlyStore implements IGTStore {
                     public void remove() {
                         throw new UnsupportedOperationException();
                     }
-                    
+
                     private ByteBuffer byteBuffer(byte[] array, int offset, int length) {
                         return ByteBuffer.wrap(array, offset, length);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
new file mode 100644
index 0000000..7667830
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.util.Arrays;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+
+import com.google.common.base.Preconditions;
+
+public class HBaseScan {
+
+    /**
+     * every column in scan key is fixed length. for empty values, 0 zero will be populated 
+     */
+    public static ByteArray exportScanKey(GTRecord rec, byte fill) {
+
+        Preconditions.checkNotNull(rec);
+
+        GTInfo info = rec.getInfo();
+        int len = info.getMaxColumnLength(info.getPrimaryKey());
+        ByteArray buf = ByteArray.allocate(len);
+        int pos = 0;
+        for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+            int c = info.getPrimaryKey().trueBitAt(i);
+            int colLength = info.getCodeSystem().maxCodeLength(c);
+
+            if (rec.get(c).array() != null) {
+                Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " not equals cols[c] length: " + rec.get(c).length() + " c is " + c);
+                System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length());
+            } else {
+                Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
+            }
+            pos += colLength;
+        }
+        buf.setLength(pos);
+
+        return buf;
+    }
+
+    /**
+     * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated 
+     */
+    public static ByteArray exportScanMask(GTRecord rec) {
+        Preconditions.checkNotNull(rec);
+
+        GTInfo info = rec.getInfo();
+        int len = info.getMaxColumnLength(info.getPrimaryKey());
+        ByteArray buf = ByteArray.allocate(len);
+        byte fill;
+
+        int pos = 0;
+        for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) {
+            int c = info.getPrimaryKey().trueBitAt(i);
+            int colLength = info.getCodeSystem().maxCodeLength(c);
+
+            if (rec.get(c).array() != null) {
+                fill = RowConstants.BYTE_ZERO;
+            } else {
+                fill = RowConstants.BYTE_ONE;
+            }
+            Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill);
+            pos += colLength;
+        }
+        buf.setLength(pos);
+
+        return buf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
index aa73927..ad4263f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java
@@ -20,13 +20,14 @@ package org.apache.kylin.storage.hbase.cube.v2;
 
 import java.util.List;
 
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
 
 public class RawScan {
 
     public byte[] startKey;
     public byte[] endKey;
-    public List<Pair<byte[], byte[]>> hbaseColumns;
+    public List<Pair<byte[], byte[]>> hbaseColumns;//only contain interested columns
     public List<Pair<byte[], byte[]>> fuzzyKey;
 
     public RawScan(byte[] startKey, byte[] endKey, List<Pair<byte[], byte[]>> hbaseColumns, List<Pair<byte[], byte[]>> fuzzyKey) {
@@ -37,4 +38,23 @@ public class RawScan {
         this.fuzzyKey = fuzzyKey;
     }
 
+    public String getStartKeyAsString() {
+        return BytesUtil.toHex(this.startKey);
+    }
+
+    public String getEndKeyAsString() {
+        return BytesUtil.toHex(this.endKey);
+    }
+
+    public String getFuzzyKeyAsString() {
+        StringBuilder buf = new StringBuilder();
+        for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKey) {
+            buf.append(BytesUtil.toHex(fuzzyKey.getFirst()));
+            buf.append(" ");
+            buf.append(BytesUtil.toHex(fuzzyKey.getSecond()));
+            buf.append(System.lineSeparator());
+        }
+        return buf.toString();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 4686da2..85aa54a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -27,10 +27,10 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     private final Set<FunctionDesc> selectedMetrics;
     private final TupleInfo tupleInfo;
     private final Tuple tuple;
-    private final Iterator<CubeScanner> scannerIterator;
+    private final Iterator<CubeSegmentScanner> scannerIterator;
     private final StorageContext context;
 
-    private CubeScanner curScanner;
+    private CubeSegmentScanner curScanner;
     private Iterator<GTRecord> curRecordIterator;
     private CubeTupleConverter curTupleConverter;
     private Tuple next;
@@ -38,7 +38,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     private int scanCount;
     private int scanCountDelta;
 
-    public SequentialCubeTupleIterator(List<CubeScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+    public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
             Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
         this.cuboid = cuboid;
         this.selectedDimensions = selectedDimensions;
@@ -112,7 +112,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         }
     }
 
-    private void close(CubeScanner scanner) {
+    private void close(CubeSegmentScanner scanner) {
         try {
             scanner.close();
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/fabdd5cd/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index f0b8c6f..ba766bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -46,11 +46,12 @@ import org.apache.kylin.storage.hbase.cube.v2.CubeHBaseRPC;
 import org.apache.kylin.storage.hbase.cube.v2.HBaseReadonlyStore;
 import org.apache.kylin.storage.hbase.cube.v2.RawScan;
 import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos.CubeVisitRequest.IntList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
+import com.google.protobuf.HBaseZeroCopyByteString;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
@@ -125,9 +126,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
         try {
             this.serviceStartTime = System.currentTimeMillis();
 
-            GTScanRequest scanReq = KryoUtils.deserialize(request.getGtScanRequest().toByteArray(), GTScanRequest.class);
-            RawScan hbaseRawScan = KryoUtils.deserialize(request.getHbaseRawScan().toByteArray(), RawScan.class);
-            //TODO: rewrite own start/end
+            GTScanRequest scanReq = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getGtScanRequest()), GTScanRequest.class);
+            RawScan hbaseRawScan = KryoUtils.deserialize(HBaseZeroCopyByteString.zeroCopyGetBytes(request.getHbaseRawScan()), RawScan.class);
+            List<List<Integer>> hbaseColumnsToGT = Lists.newArrayList();
+            for (IntList intList : request.getHbaseColumnsToGTList()) {
+                hbaseColumnsToGT.add(intList.getIntsList());
+            }
+
             Scan scan = CubeHBaseRPC.buildScan(hbaseRawScan);
 
             region = env.getRegion();
@@ -136,26 +141,30 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             innerScanner = region.getScanner(scan);
             InnerScannerAsIterator cellListIterator = new InnerScannerAsIterator(innerScanner);
 
-            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns);
+            IGTStore store = new HBaseReadonlyStore(cellListIterator, scanReq, hbaseRawScan.hbaseColumns, hbaseColumnsToGT);
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner);
 
             ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);
+
+            ByteArrayOutputStream outputStream = new ByteArrayOutputStream(RowConstants.ROWVALUE_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
+            int finalRowCount = 0;
             for (GTRecord oneRecord : finalScanner) {
                 buffer.clear();
-                oneRecord.exportAllColumns(buffer);
+                oneRecord.exportColumns(scanReq.getColumns(), buffer);
                 buffer.flip();
+
                 outputStream.write(buffer.array(), buffer.arrayOffset() - buffer.position(), buffer.remaining());
+                finalRowCount++;
             }
             //outputStream.close() is not necessary
             byte[] allRows = outputStream.toByteArray();
             CubeVisitProtos.CubeVisitResponse.Builder responseBuilder = CubeVisitProtos.CubeVisitResponse.newBuilder();
             done.run(responseBuilder.//
-                    setCompressedRows(ByteString.copyFrom(CompressionUtils.compress(allRows))).//too many array copies 
+                    setCompressedRows(HBaseZeroCopyByteString.wrap(CompressionUtils.compress(allRows))).//too many array copies 
                     setStats(CubeVisitProtos.CubeVisitResponse.Stats.newBuilder().//
-                            setAggregatedRowCount(0).//
-                            setScannedRowCount(0).//
+                            setAggregatedRowCount(finalScanner.getScannedRowCount() - finalRowCount).//
+                            setScannedRowCount(finalScanner.getScannedRowCount()).//
                             setServiceStartTime(serviceStartTime).//
                             setServiceEndTime(System.currentTimeMillis()).build()).//
                     build());



Mime
View raw message