kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [05/13] incubator-kylin git commit: KYLIN-943 update query/storage engine to support TopN
Date Mon, 21 Sep 2015 02:16:11 GMT
KYLIN-943 update query/storage engine to support TopN


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/988c2e76
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/988c2e76
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/988c2e76

Branch: refs/heads/KYLIN-943
Commit: 988c2e76f4d57b3e14c4b548d7c0959aeffa4924
Parents: 97de708
Author: shaofengshi <shaofengshi@apache.org>
Authored: Thu Sep 17 14:05:06 2015 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Mon Sep 21 10:11:54 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithEngineTest.java      |  6 +-
 .../apache/kylin/common/topn/TopNCounter.java   | 55 ++++++++++--
 .../kylin/cube/CubeCapabilityChecker.java       | 60 +++++++++++--
 .../apache/kylin/cube/CubeDimensionDeriver.java | 10 ++-
 .../org/apache/kylin/cube/model/CubeDesc.java   |  6 +-
 .../kylin/metadata/model/FunctionDesc.java      | 14 ++-
 .../kylin/metadata/realization/SQLDigest.java   | 13 ++-
 .../apache/kylin/storage/StorageContext.java    | 15 +---
 .../kylin/storage/hybrid/HybridInstance.java    |  4 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |  3 +
 .../kylin/storage/cache/DynamicCacheTest.java   |  4 +-
 .../kylin/storage/cache/StaticCacheTest.java    |  4 +-
 .../cube_desc/test_kylin_cube_topn_desc.json    |  2 +-
 .../localmeta/data/DEFAULT.STREAMING_TABLE.csv  |  0
 .../test_kylin_inner_join_model_desc.json       |  3 +-
 .../test_kylin_left_join_model_desc.json        |  3 +-
 .../apache/kylin/query/relnode/OLAPContext.java | 18 +++-
 .../kylin/query/relnode/OLAPLimitRel.java       |  1 +
 .../apache/kylin/query/relnode/OLAPSortRel.java | 11 +--
 .../kylin/query/test/ITKylinQueryTest.java      |  2 +-
 query/src/test/resources/query/sql/query81.sql  | 26 ++++++
 query/src/test/resources/query/sql/query82.sql  | 26 ++++++
 .../cube/v1/CubeSegmentTopNTupleIterator.java   | 86 ++++++++++++++++++
 .../hbase/cube/v1/CubeSegmentTupleIterator.java | 35 ++++----
 .../storage/hbase/cube/v1/CubeStorageQuery.java | 92 +++++++++++++++-----
 .../hbase/cube/v1/CubeTupleConverter.java       | 71 ++++++++++++++-
 .../cube/v1/SerializedHBaseTupleIterator.java   |  9 +-
 .../storage/hbase/common/ITStorageTest.java     |  4 +-
 28 files changed, 496 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
index 7c6b028..4564ccc 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithEngineTest.java
@@ -120,13 +120,13 @@ public class BuildCubeWithEngineTest {
     @Test
     public void test() throws Exception {
         DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_left_join_empty");
-        testInner();
+//        testInner();
         testLeft();
     }
 
     private void testInner() throws Exception {
-       String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
-//        String[] testCase = new String[] { "testInnerJoinTopNCube" };
+        String[] testCase = new String[] { "testInnerJoinTopNCube" };
+       // String[] testCase = new String[] { "testInnerJoinCube", "testInnerJoinCube2", "testInnerJoinTopNCube"};
         runTestAndAssertSucceed(testCase);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
index 69e8d56..6814b8d 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java
@@ -23,10 +23,7 @@ import org.apache.kylin.common.util.Pair;
 
 import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * Modified from the StreamSummary.java in https://github.com/addthis/stream-lib
@@ -38,10 +35,11 @@ import java.util.Map;
  *
  * @param <T> type of data in the stream to be summarized
  */
-public class TopNCounter<T> implements ITopK<T> {
+public class TopNCounter<T> implements ITopK<T>, Iterable<Counter<T>> {
     
     public static final int EXTRA_SPACE_RATE = 50;
 
+
     protected class Bucket {
 
         protected DoublyLinkedList<Counter<T>> counterList;
@@ -365,4 +363,51 @@ public class TopNCounter<T> implements ITopK<T> {
         return items;
 
     }
+
+    @Override
+    public Iterator<Counter<T>> iterator() {
+        return new TopNCounterIterator();
+    }
+    
+    private class TopNCounterIterator implements Iterator {
+
+        private ListNode2<Bucket> currentBNode;
+        private Iterator<Counter<T>> currentCounterIterator;
+        
+        private TopNCounterIterator() {
+            currentBNode = bucketList.head();
+            if (currentBNode != null && currentBNode.getValue() != null) {
+                currentCounterIterator = currentBNode.getValue().counterList.iterator();
+            }
+        }
+        
+        @Override
+        public boolean hasNext() {
+            if (currentCounterIterator == null) {
+                return false;
+            }
+            
+            if (currentCounterIterator.hasNext()) {
+                return true;
+            }
+
+            currentBNode = currentBNode.getPrev();
+            
+            if (currentBNode == null)
+                return false;
+
+            currentCounterIterator = currentBNode.getValue().counterList.iterator();
+            return hasNext();
+        }
+
+        @Override
+        public Counter<T> next() {
+            return currentCounterIterator.next();
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 77c3298..628340e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -18,19 +18,20 @@
 
 package org.apache.kylin.cube;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 /**
  */
 public class CubeCapabilityChecker {
@@ -39,10 +40,11 @@ public class CubeCapabilityChecker {
     public static boolean check(CubeInstance cube, SQLDigest digest, boolean allowWeakMatch) {
 
         // retrieve members from olapContext
-        Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest.groupbyColumns, digest.filterColumns);
+        Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
         Collection<FunctionDesc> functions = digest.aggregations;
         Collection<TblColRef> metricsColumns = digest.metricColumns;
         Collection<JoinDesc> joins = digest.joinDescs;
+        boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
 
         // match dimensions & aggregations & joins
 
@@ -62,6 +64,13 @@ public class CubeCapabilityChecker {
             }
         }
 
+        // for topn, the group column can come from measure
+        if (hasTopN & matchJoin && !matchDimensions && functions.size() == 1) {
+            boolean matchedTopN = isMatchedWithTopN(dimensionColumns, cube, digest);
+            matchDimensions = matchedTopN;
+            matchAggregation = matchedTopN;
+        }
+
         if (!isOnline || !matchDimensions || !matchAggregation || !matchJoin) {
             logger.info("Exclude cube " + cube.getName() + " because " + " isOnlne=" + isOnline + ",matchDimensions=" + matchDimensions + ",matchAggregation=" + matchAggregation + ",matchJoin=" + matchJoin);
             return false;
@@ -70,6 +79,47 @@ public class CubeCapabilityChecker {
         return true;
     }
 
+    private static boolean isMatchedWithTopN(Collection<TblColRef> dimensionColumns, CubeInstance cube, SQLDigest digest) {
+
+        CubeDesc cubeDesc = cube.getDescriptor();
+        List<FunctionDesc> cubeFunctions = cubeDesc.listAllFunctions();
+        Collection<FunctionDesc> functions = digest.aggregations;
+        Collection<MeasureDesc> sortMeasures = digest.sortMeasures;
+        Collection<SQLDigest.OrderEnum> sortOrders = digest.sortOrders;
+
+        FunctionDesc onlyFunction = functions.iterator().next();
+        if (onlyFunction.isSum() == false) {
+            // topN only support SUM expression
+            return false;
+        }
+
+        Collection<TblColRef> dimensionColumnsCopy = new ArrayList<TblColRef>(dimensionColumns);
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (measure.getFunction().isTopN()) {
+                List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
+                TblColRef displayCol = cols.get(cols.size() - 1);
+                dimensionColumnsCopy.remove(displayCol);
+                if(isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
+                    if (measure.getFunction().isCompatible(onlyFunction)) {
+                        return true;
+                    }
+                }
+                dimensionColumnsCopy.add(displayCol);
+            }
+        }
+
+        return false;
+    }
+
+    private static boolean hasTopNMeasure(CubeDesc cubeDesc) {
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN())
+                return true;
+        }
+
+        return false;
+    }
+
     private static boolean isMatchedWithDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
         CubeDesc cubeDesc = cube.getDescriptor();
         boolean matchAgg = cubeDesc.listDimensionColumnsIncludingDerived().containsAll(dimensionColumns);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
index a746c99..138d01e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
@@ -21,7 +21,9 @@ package org.apache.kylin.cube;
 import java.util.Collection;
 import java.util.HashSet;
 
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
 
 /**
  *
@@ -29,7 +31,13 @@ import org.apache.kylin.metadata.model.TblColRef;
  */
 public class CubeDimensionDeriver {
 
-    public static Collection<TblColRef> getDimensionColumns(Collection<TblColRef> groupByColumns, Collection<TblColRef> filterColumns) {
+    public static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+        Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+        Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
+
+        Collection<MeasureDesc> sortMeasures = sqlDigest.sortMeasures;
+        Collection<SQLDigest.OrderEnum> sortOrders = sqlDigest.sortOrders;
+                
         Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
         dimensionColumns.addAll(groupByColumns);
         dimensionColumns.addAll(filterColumns);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 90b5474..9cbdfae 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -129,6 +129,7 @@ public class CubeDesc extends RootPersistentEntity {
     private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
     private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
+
     private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
     private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
     private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
@@ -827,5 +828,8 @@ public class CubeDesc extends RootPersistentEntity {
         }
         return result;
     }
-    
+
+    public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
+        return measureDisplayColumns;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index b87d50c..d10f395 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -60,7 +60,7 @@ public class FunctionDesc {
     }
 
     public boolean needRewrite() {
-        return !isSum() && !isDimensionAsMetric();
+        return !isSum() && !isDimensionAsMetric() && !isTopN();
     }
 
     public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
@@ -225,4 +225,16 @@ public class FunctionDesc {
         return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]";
     }
 
+    public boolean isCompatible(FunctionDesc another) {
+        if (another == null) {
+            return false;
+        }
+
+        if (this.isTopN() && another.isSum()) {
+            if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0)))
+                return true;
+        }
+
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index 7811858..e48cebe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -19,15 +19,22 @@
 package org.apache.kylin.metadata.realization;
 
 import java.util.Collection;
+import java.util.List;
 
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  */
 public class SQLDigest {
+
+    public enum OrderEnum {
+        ASCENDING, DESCENDING
+    }
+
     public String factTable;
     public TupleFilter filter;
     public Collection<JoinDesc> joinDescs;
@@ -36,9 +43,11 @@ public class SQLDigest {
     public Collection<TblColRef> filterColumns;
     public Collection<TblColRef> metricColumns;
     public Collection<FunctionDesc> aggregations;
+    public Collection<MeasureDesc> sortMeasures;
+    public Collection<OrderEnum> sortOrders;
 
     public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, //
-            Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc) {
+            Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) {
         this.factTable = factTable;
         this.filter = filter;
         this.joinDescs = joinDescs;
@@ -47,6 +56,8 @@ public class SQLDigest {
         this.filterColumns = filterColumns;
         this.metricColumns = aggregatedColumns;
         this.aggregations = aggregateFunnc;
+        this.sortMeasures = sortMeasures;
+        this.sortOrders = sortOrders;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 8b1b706..1643aa4 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -24,6 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.realization.SQLDigest;
 
 /**
  * @author xjiang
@@ -32,17 +33,12 @@ public class StorageContext {
 
     public static final int DEFAULT_THRESHOLD = 1000000;
 
-    public enum OrderEnum {
-        ASCENDING, DESCENDING
-    }
 
     private String connUrl;
     private int threshold;
     private int limit;
     private int offset;
     private boolean hasSort;
-    private List<MeasureDesc> sortMeasures;
-    private List<OrderEnum> sortOrders;
     private boolean acceptPartialResult;
 
     private boolean exactAggregation;
@@ -59,8 +55,6 @@ public class StorageContext {
         this.totalScanCount = new AtomicLong();
         this.cuboid = null;
         this.hasSort = false;
-        this.sortOrders = new ArrayList<OrderEnum>();
-        this.sortMeasures = new ArrayList<MeasureDesc>();
 
         this.exactAggregation = false;
         this.enableLimit = false;
@@ -110,13 +104,6 @@ public class StorageContext {
         return this.enableLimit;
     }
 
-    public void addSort(MeasureDesc measure, OrderEnum order) {
-        if (measure != null) {
-            sortMeasures.add(measure);
-            sortOrders.add(order);
-        }
-    }
-
     public void markSort() {
         this.hasSort = true;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 6ad27d5..0c30a3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -180,7 +180,9 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
 
     @Override
     public DataModelDesc getDataModelDesc() {
-        return this.getLatestRealization().getDataModelDesc();
+        if (this.getLatestRealization() != null)
+            return this.getLatestRealization().getDataModelDesc();
+        return null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 0ffae69..11b03bd 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -25,6 +25,7 @@ import java.util.List;
 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.metadata.measure.DoubleMutable;
 import org.apache.kylin.metadata.measure.LongMutable;
@@ -65,6 +66,8 @@ public class Tuple implements ITuple {
                 ret.values[i] = null;
             } else if (this.values[i] instanceof HyperLogLogPlusCounter) {
                 ret.values[i] = new HyperLogLogPlusCounter((HyperLogLogPlusCounter) this.values[i]);
+            } else if (this.values[i] instanceof TopNCounter) {
+                ret.values[i] = null;
             } else {
                 ret.values[i] = this.values[i];
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
index 309d67f..161cad6 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/DynamicCacheTest.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.cache;
 
+import java.util.ArrayList;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -8,6 +9,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.IdentityUtils;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -76,7 +78,7 @@ public class DynamicCacheTest {
         final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
         final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
 
-        SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations);
+        SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", null, null, Lists.<TblColRef> newArrayList(), groups, Lists.newArrayList(partitionCol), Lists.<TblColRef> newArrayList(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
 
         ITuple aTuple = new TsOnlyTuple(partitionCol, "2011-02-01");
         ITuple bTuple = new TsOnlyTuple(partitionCol, "2012-02-01");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
index e54e3e0..48b0b1d 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StaticCacheTest.java
@@ -1,5 +1,6 @@
 package org.apache.kylin.storage.cache;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -8,6 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.util.IdentityUtils;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -34,7 +36,7 @@ public class StaticCacheTest {
         final List<TblColRef> groups = StorageMockUtils.buildGroups();
         final List<FunctionDesc> aggregations = StorageMockUtils.buildAggregations();
         final TupleFilter filter = StorageMockUtils.buildFilter1(groups.get(0));
-        final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+        final SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
         final TupleInfo tupleInfo = StorageMockUtils.newTupleInfo(groups, aggregations);
 
         final List<ITuple> ret = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
index 2e0e376..96c3ace 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
@@ -111,7 +111,7 @@
     ]
   },
   "last_modified": 1422435345330,
-  "model_name": "test_kylin_inner_join_model_desc",
+  "model_name": "test_kylin_left_join_model_desc",
   "null_string": null,
   "hbase_mapping": {
     "column_family": [

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv b/examples/test_case_data/localmeta/data/DEFAULT.STREAMING_TABLE.csv
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
index 86f8169..a28684f 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_inner_join_model_desc.json
@@ -36,7 +36,8 @@
       "columns": [
         "lstg_format_name",
         "LSTG_SITE_ID",
-        "SLR_SEGMENT_CD"
+        "SLR_SEGMENT_CD",
+        "seller_id"
       ]
     },
     {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
index d05a08f..c26ffc5 100644
--- a/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
+++ b/examples/test_case_data/localmeta/model_desc/test_kylin_left_join_model_desc.json
@@ -47,7 +47,8 @@
       "columns": [
         "lstg_format_name",
         "LSTG_SITE_ID",
-        "SLR_SEGMENT_CD"
+        "SLR_SEGMENT_CD",
+        "seller_id"
       ]
     },
     {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 378221c..6865457 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -26,11 +26,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.Lists;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
@@ -81,6 +83,8 @@ public class OLAPContext {
     public OLAPContext(int seq) {
         this.id = seq;
         this.storageContext = new StorageContext();
+        this.sortMeasures = Lists.newArrayList();
+        this.sortOrders = Lists.newArrayList();
         Map<String, String> parameters = _localPrarameters.get();
         if (parameters != null) {
             String acceptPartialResult = parameters.get(PRM_ACCEPT_PARTIAL_RESULT);
@@ -111,10 +115,14 @@ public class OLAPContext {
     public Collection<TblColRef> filterColumns = new HashSet<TblColRef>();
     public TupleFilter filter;
     public List<JoinDesc> joins = new LinkedList<JoinDesc>();
+    private List<MeasureDesc> sortMeasures;
+    private List<SQLDigest.OrderEnum> sortOrders;
 
     // rewrite info
     public Map<String, RelDataType> rewriteFields = new HashMap<String, RelDataType>();
 
+    public int limit;
+
     // hive query
     public String sql = "";
 
@@ -126,7 +134,7 @@ public class OLAPContext {
 
     public SQLDigest getSQLDigest() {
         if (sqlDigest == null)
-            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations);
+            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders);
         return sqlDigest;
     }
 
@@ -144,4 +152,12 @@ public class OLAPContext {
         }
         this.returnTupleInfo = info;
     }
+
+    public void addSort(MeasureDesc measure, SQLDigest.OrderEnum order) {
+        if (measure != null) {
+            sortMeasures.add(measure);
+            sortOrders.add(order);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
index 60acd40..74d5de0 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPLimitRel.java
@@ -78,6 +78,7 @@ public class OLAPLimitRel extends SingleRel implements OLAPRel {
         Number limitValue = (Number) (((RexLiteral) localFetch).getValue());
         int limit = limitValue.intValue();
         this.context.storageContext.setLimit(limit);
+        this.context.limit = limit;
         if(localOffset != null) {
             Number offsetValue = (Number) (((RexLiteral) localOffset).getValue());
             int offset = offsetValue.intValue();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
index fa5dc1d..b023dfd 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPSortRel.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rex.RexNode;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.storage.StorageContext;
 
 import com.google.common.base.Preconditions;
@@ -82,12 +83,12 @@ public class OLAPSortRel extends Sort implements OLAPRel {
 
         for (RelFieldCollation fieldCollation : this.collation.getFieldCollations()) {
             int index = fieldCollation.getFieldIndex();
-            StorageContext.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
+            SQLDigest.OrderEnum order = getOrderEnum(fieldCollation.getDirection());
             OLAPRel olapChild = (OLAPRel) this.getInput();
             TblColRef orderCol = olapChild.getColumnRowType().getAllColumns().get(index);
             MeasureDesc measure = findMeasure(orderCol);
             if (measure != null) {
-                this.context.storageContext.addSort(measure, order);
+                this.context.addSort(measure, order);
             }
             this.context.storageContext.markSort();
         }
@@ -96,11 +97,11 @@ public class OLAPSortRel extends Sort implements OLAPRel {
         this.columnRowType = buildColumnRowType();
     }
 
-    private StorageContext.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
+    private SQLDigest.OrderEnum getOrderEnum(RelFieldCollation.Direction direction) {
         if (direction == RelFieldCollation.Direction.DESCENDING) {
-            return StorageContext.OrderEnum.DESCENDING;
+            return SQLDigest.OrderEnum.DESCENDING;
         } else {
-            return StorageContext.OrderEnum.ASCENDING;
+            return SQLDigest.OrderEnum.ASCENDING;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
index 72d7c4a..4821ce9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITKylinQueryTest.java
@@ -95,7 +95,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query44.sql";
+        String queryFileName = "src/test/resources/query/sql/query82.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/test/resources/query/sql/query81.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query81.sql b/query/src/test/resources/query/sql/query81.sql
new file mode 100644
index 0000000..78e30c5
--- /dev/null
+++ b/query/src/test/resources/query/sql/query81.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT 
+ seller_id 
+ ,sum(test_kylin_fact.price) as GMV 
+ FROM test_kylin_fact inner join edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ where test_kylin_fact.cal_dt < DATE '2013-02-01' 
+ group by 
+ test_kylin_fact.seller_id order by gmv desc limit 100

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/query/src/test/resources/query/sql/query82.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql/query82.sql b/query/src/test/resources/query/sql/query82.sql
new file mode 100644
index 0000000..6b62753
--- /dev/null
+++ b/query/src/test/resources/query/sql/query82.sql
@@ -0,0 +1,26 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT 
+ test_kylin_fact.cal_dt, seller_id 
+ ,sum(test_kylin_fact.price) as GMV 
+ FROM test_kylin_fact 
+left JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ group by 
+ test_kylin_fact.cal_dt, test_kylin_fact.seller_id order by gmv desc limit 100

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
new file mode 100644
index 0000000..a8b1d02
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+import org.apache.kylin.storage.translate.HBaseKeyRange;
+import org.apache.kylin.storage.tuple.Tuple;
+import org.apache.kylin.storage.tuple.TupleInfo;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ */
+public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{
+
+    private Iterator<Tuple> innerResultIterator;
+    
+    public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
+                                    Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, //
+                                    List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
+        super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol);
+    }
+    
+    @Override
+    public boolean hasNext() {
+        if (next != null)
+            return true;
+
+
+        if (innerResultIterator == null) {
+            if (resultIterator == null) {
+                if (rangeIterator.hasNext() == false)
+                    return false;
+
+                resultIterator = doScan(rangeIterator.next());
+            }
+
+            if (resultIterator.hasNext() == false) {
+                closeScanner();
+                resultIterator = null;
+                innerResultIterator = null;
+                return hasNext();
+            }
+
+            Result result = resultIterator.next();
+            scanCount++;
+            if (++scanCountDelta >= 1000)
+                flushScanCountDelta();
+            innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple);
+        }
+
+        if (innerResultIterator.hasNext()) {
+            next = innerResultIterator.next();
+            return true;
+        } else {
+            innerResultIterator = null;
+            return hasNext();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 0110fbe..9b2cf66 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -64,25 +64,26 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
     public static final int SCAN_CACHE = 1024;
 
-    private final CubeSegment cubeSeg;
+    protected final CubeSegment cubeSeg;
     private final TupleFilter filter;
     private final Collection<TblColRef> groupBy;
-    private final Collection<RowValueDecoder> rowValueDecoders;
+    protected final List<RowValueDecoder> rowValueDecoders;
     private final StorageContext context;
     private final String tableName;
     private final HTableInterface table;
 
-    private final CubeTupleConverter tupleConverter;
-    private final Iterator<HBaseKeyRange> rangeIterator;
-    private final Tuple oneTuple; // avoid new instance
+    protected CubeTupleConverter tupleConverter;
+    protected final Iterator<HBaseKeyRange> rangeIterator;
+    protected final Tuple oneTuple; // avoid new instance
 
     private Scan scan;
     private ResultScanner scanner;
-    private Iterator<Result> resultIterator;
-    private int scanCount;
-    private int scanCountDelta;
-    private Tuple next;
-
+    protected Iterator<Result> resultIterator;
+    protected int scanCount;
+    protected int scanCountDelta;
+    protected Tuple next;
+    protected final Cuboid cuboid;
+    
     public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
             Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, //
             List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
@@ -93,12 +94,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.context = context;
         this.tableName = cubeSeg.getStorageLocationIdentifier();
 
-        Cuboid cuboid = keyRanges.get(0).getCuboid();
+        cuboid = keyRanges.get(0).getCuboid();
         for (HBaseKeyRange range : keyRanges) {
             assert cuboid.equals(range.getCuboid());
         }
 
-        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
+        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null);
         this.oneTuple = new Tuple(returnTupleInfo);
         this.rangeIterator = keyRanges.iterator();
 
@@ -108,9 +109,10 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
             throw new StorageException("Error when open connection to table " + tableName, t);
         }
     }
-
+    
     @Override
     public boolean hasNext() {
+
         if (next != null)
             return true;
 
@@ -136,6 +138,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return true;
     }
 
+    
     @Override
     public Tuple next() {
         if (next == null) {
@@ -153,7 +156,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         throw new UnsupportedOperationException();
     }
 
-    private final Iterator<Result> doScan(HBaseKeyRange keyRange) {
+    protected final Iterator<Result> doScan(HBaseKeyRange keyRange) {
         Iterator<Result> iter = null;
         try {
             scan = buildScan(keyRange);
@@ -247,7 +250,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return result;
     }
 
-    private void closeScanner() {
+    protected void closeScanner() {
         flushScanCountDelta();
 
         if (logger.isDebugEnabled() && scan != null) {
@@ -286,7 +289,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         closeTable();
     }
 
-    private void flushScanCountDelta() {
+    protected void flushScanCountDelta() {
         context.increaseTotalScanCount(scanCountDelta);
         scanCountDelta = 0;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index aca3ca9..58c589b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -18,20 +18,10 @@
 
 package org.apache.kylin.storage.hbase.cube.v1;
 
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Pair;
@@ -68,10 +58,7 @@ import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
+import java.util.*;
 
 //v1
 @SuppressWarnings("unused")
@@ -85,17 +72,32 @@ public class CubeStorageQuery implements ICachableStorageQuery {
     private final CubeInstance cubeInstance;
     private final CubeDesc cubeDesc;
     private final String uuid;
+    private Collection<TblColRef> topNColumns;
 
     public CubeStorageQuery(CubeInstance cube) {
         this.cubeInstance = cube;
         this.cubeDesc = cube.getDescriptor();
         this.uuid = cube.getUuid();
+        this.topNColumns = Lists.newArrayList();
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isTopN()) {
+                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
+                topNColumns.add(colRefs.get(colRefs.size() - 1));
+            }
+        }
     }
-
+    
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
 
+        // check whether this is a TopN query;
+        checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TblColRef topNCol = extractTopNCol(groups);
+        if (topNCol != null)
+            groups.remove(topNCol);
+
         TupleFilter filter = sqlDigest.filter;
 
         // build dimension & metrics
@@ -148,7 +150,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         setLimit(filter, context);
 
         HConnection conn = HBaseConnection.get(context.getConnUrl());
-        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
+        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
     }
 
     @Override
@@ -179,6 +181,12 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             if (sqlDigest.metricColumns.contains(column)) {
                 continue;
             }
+
+            // skip topN display col
+            if (topNColumns.contains(column)) {
+                continue;
+            }
+
             dimensions.add(column);
         }
     }
@@ -700,4 +708,48 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
     }
 
+    private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TblColRef topNDisplayCol = extractTopNCol(groups);
+        boolean hasTopN = topNDisplayCol != null;
+
+        if (hasTopN == false)
+            return;
+
+        if (sqlDigest.aggregations.size() != 1) {
+            throw new IllegalStateException("When query with topN, only one metrics is allowed.");
+        }
+
+        FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
+        if (functionDesc.isSum() == false) {
+            throw new IllegalStateException("When query with topN, only SUM function is allowed.");
+        }
+
+        FunctionDesc rewriteFunction = null;
+        // replace the SUM to the TopN function
+        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
+            if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
+                rewriteFunction = measureDesc.getFunction();
+                break;
+            }
+        }
+
+        if (rewriteFunction == null) {
+            throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
+        }
+
+        sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
+        logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+    }
+
+    private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
+        for (TblColRef colRef : colRefs) {
+            if (topNColumns.contains(colRef)) {
+                return colRef;
+            }
+        }
+
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index e569cbd..8813901 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -2,21 +2,31 @@ package org.apache.kylin.storage.hbase.cube.v1;
 
 import java.io.IOException;
 import java.util.BitSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowKeyDecoder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
 import org.apache.kylin.storage.tuple.Tuple;
 import org.apache.kylin.storage.tuple.TupleInfo;
@@ -30,20 +40,24 @@ public class CubeTupleConverter {
     final TupleInfo tupleInfo;
     final RowKeyDecoder rowKeyDecoder;
     final List<RowValueDecoder> rowValueDecoders;
-    final List<IDerivedColumnFiller> derivedColFillers;
-
+    final List<IDerivedColumnFiller> derivedColFillers; 
     final int[] dimensionTupleIdx;
     final int[][] metricsMeasureIdx;
     final int[][] metricsTupleIdx;
+    final TblColRef topNCol;
+    int topNColTupleIdx;
+    int topNMeasureTupleIdx;
+    Dictionary<String> topNColDict;
 
-    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
+    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = tupleInfo;
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
         this.rowValueDecoders = rowValueDecoders;
         this.derivedColFillers = Lists.newArrayList();
-
+        this.topNCol = topNCol;
+        
         List<TblColRef> dimCols = cuboid.getColumns();
 
         // pre-calculate dimension index mapping to tuple
@@ -52,6 +66,7 @@ public class CubeTupleConverter {
             TblColRef col = dimCols.get(i);
             dimensionTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
         }
+        
 
         // pre-calculate metrics index mapping to tuple
         metricsMeasureIdx = new int[rowValueDecoders.size()][];
@@ -64,6 +79,7 @@ public class CubeTupleConverter {
             metricsTupleIdx[i] = new int[selectedMeasures.cardinality()];
             for (int j = 0, mi = selectedMeasures.nextSetBit(0); j < metricsMeasureIdx[i].length; j++, mi = selectedMeasures.nextSetBit(mi + 1)) {
                 FunctionDesc aggrFunc = measures[mi].getFunction();
+                
                 int tupleIdx;
                 // a rewrite metrics is identified by its rewrite field name
                 if (aggrFunc.needRewrite()) {
@@ -80,6 +96,13 @@ public class CubeTupleConverter {
             }
         }
 
+        if (this.topNCol != null) {
+            this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
+            this.topNMeasureTupleIdx = metricsTupleIdx[0][0];
+            
+            this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
+        }
+        
         // prepare derived columns and filler
         Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCube().getHostToDerivedInfo(dimCols, null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -93,6 +116,46 @@ public class CubeTupleConverter {
         }
     }
 
+    public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) {
+        translateResult(hbaseRow, tuple);
+        Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
+        assert (topNCounterObj instanceof TopNCounter);
+        return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
+    }
+
+    private class TopNCounterTupleIterator implements Iterator {
+
+        private Tuple tuple;
+        private Iterator<Counter> topNCounterIterator;
+        private Counter<ByteArray> counter;
+        
+        private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
+            this.tuple = tuple;
+            this.topNCounterIterator = topNCounter.iterator();
+        }
+        
+        @Override
+        public boolean hasNext() {
+            return topNCounterIterator.hasNext();
+        }
+
+        @Override
+        public Tuple next() {
+            counter = topNCounterIterator.next();
+            int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
+            String colValue = topNColDict.getValueFromId(key);
+            tuple.setDimensionValue(topNColTupleIdx, colValue);
+            tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
+            
+            return tuple;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+    
     public void translateResult(Result hbaseRow, Tuple tuple) {
         try {
             byte[] rowkey = hbaseRow.getRow();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index e433b78..831cadb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -58,7 +58,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private ITuple next;
 
     public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
-            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
+            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, //
             StorageContext context, TupleInfo returnTupleInfo) {
 
         this.context = context;
@@ -67,8 +67,13 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
 
         this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
         Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
+        boolean useTopN = topNCol != null;
         for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
-            CubeSegmentTupleIterator segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+            CubeSegmentTupleIterator segIter;
+            if (useTopN)
+                segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
+            else
+                segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
             this.segmentIteratorList.add(segIter);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/988c2e76/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
index 0b4fd07..df52664 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/common/ITStorageTest.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.common;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -28,6 +29,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.metadata.tuple.ITuple;
@@ -142,7 +144,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         int count = 0;
         ITupleIterator iterator = null;
         try {
-            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations);
+            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
             iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
             while (iterator.hasNext()) {
                 ITuple tuple = iterator.next();


Mime
View raw message