kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [27/34] incubator-kylin git commit: KYLIN-960 organize existing storage module
Date Wed, 26 Aug 2015 14:08:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
new file mode 100644
index 0000000..bc74ac1
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/AggregationScanner.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class AggregationScanner implements RegionScanner {
+
+    private RegionScanner outerScanner;
+    private ObserverBehavior behavior;
+
+    public AggregationScanner(CoprocessorRowType type, CoprocessorFilter filter, CoprocessorProjector groupBy, ObserverAggregators aggrs, RegionScanner innerScanner, ObserverBehavior behavior) throws IOException {
+
+        AggregateRegionObserver.LOG.info("Kylin Coprocessor start");
+
+        this.behavior = behavior;
+
+        ObserverAggregationCache aggCache;
+        Stats stats = new Stats();
+
+        aggCache = buildAggrCache(innerScanner, type, groupBy, aggrs, filter, stats);
+        stats.countOutputRow(aggCache.getSize());
+        this.outerScanner = aggCache.getScanner(innerScanner);
+
+        AggregateRegionObserver.LOG.info("Kylin Coprocessor aggregation done: " + stats);
+    }
+
+    @SuppressWarnings("rawtypes")
+    ObserverAggregationCache buildAggrCache(final RegionScanner innerScanner, CoprocessorRowType type, CoprocessorProjector projector, ObserverAggregators aggregators, CoprocessorFilter filter, Stats stats) throws IOException {
+
+        ObserverAggregationCache aggCache = new ObserverAggregationCache(aggregators);
+
+        ObserverTuple tuple = new ObserverTuple(type);
+        boolean hasMore = true;
+        List<Cell> results = new ArrayList<Cell>();
+        while (hasMore) {
+            results.clear();
+            hasMore = innerScanner.nextRaw(results);
+            if (results.isEmpty())
+                continue;
+
+            if (stats != null)
+                stats.countInputRow(results);
+
+            Cell cell = results.get(0);
+            tuple.setUnderlying(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+
+            if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER.ordinal()) {
+                if (filter != null && filter.evaluate(tuple) == false)
+                    continue;
+
+                if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR.ordinal()) {
+                    AggrKey aggKey = projector.getAggrKey(results);
+                    MeasureAggregator[] bufs = aggCache.getBuffer(aggKey);
+                    aggregators.aggregate(bufs, results);
+
+                    if (behavior.ordinal() >= ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
+                        aggCache.checkMemoryUsage();
+                    }
+                }
+            }
+        }
+        return aggCache;
+    }
+
+    @Override
+    public boolean next(List<Cell> results) throws IOException {
+        return outerScanner.next(results);
+    }
+
+    @Override
+    public boolean next(List<Cell> result, int limit) throws IOException {
+        return outerScanner.next(result, limit);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result) throws IOException {
+        return outerScanner.nextRaw(result);
+    }
+
+    @Override
+    public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+        return outerScanner.nextRaw(result, limit);
+    }
+
+    @Override
+    public void close() throws IOException {
+        outerScanner.close();
+    }
+
+    @Override
+    public HRegionInfo getRegionInfo() {
+        return outerScanner.getRegionInfo();
+    }
+
+    @Override
+    public boolean isFilterDone() throws IOException {
+        return outerScanner.isFilterDone();
+    }
+
+    @Override
+    public boolean reseek(byte[] row) throws IOException {
+        return outerScanner.reseek(row);
+    }
+
+    @Override
+    public long getMaxResultSize() {
+        return outerScanner.getMaxResultSize();
+    }
+
+    @Override
+    public long getMvccReadPoint() {
+        return outerScanner.getMvccReadPoint();
+    }
+
+    private static class Stats {
+        long inputRows = 0;
+        long inputBytes = 0;
+        long outputRows = 0;
+
+        // have no outputBytes because that requires actual serialize all the
+        // aggregator buffers
+
+        public void countInputRow(List<Cell> row) {
+            inputRows++;
+            inputBytes += row.get(0).getRowLength();
+            for (int i = 0, n = row.size(); i < n; i++) {
+                inputBytes += row.get(i).getValueLength();
+            }
+        }
+
+        public void countOutputRow(long rowCount) {
+            outputRows += rowCount;
+        }
+
+        public String toString() {
+            double percent = (double) outputRows / inputRows * 100;
+            return Math.round(percent) + "% = " + outputRows + " (out rows) / " + inputRows + " (in rows); in bytes = " + inputBytes + "; est. out bytes = " + Math.round(inputBytes * percent / 100);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
new file mode 100644
index 0000000..b7b12d1
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregationCache.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.storage.hbase.common.coprocessor.AggregationCache;
+import org.apache.kylin.storage.hbase.common.coprocessor.AggrKey;
+
+/**
+ * @author yangli9
+ */
+@SuppressWarnings("rawtypes")
+public class ObserverAggregationCache extends AggregationCache {
+
+    private final ObserverAggregators aggregators;
+
+    public ObserverAggregationCache(ObserverAggregators aggregators) {
+        this.aggregators = aggregators;
+    }
+
+    public RegionScanner getScanner(RegionScanner innerScanner) {
+        return new AggregationRegionScanner(innerScanner);
+    }
+
+    @Override
+    public MeasureAggregator[] createBuffer() {
+        return aggregators.createBuffer();
+    }
+
+    private class AggregationRegionScanner implements RegionScanner {
+
+        private final RegionScanner innerScanner;
+        private final Iterator<Entry<AggrKey, MeasureAggregator[]>> iterator;
+
+        public AggregationRegionScanner(RegionScanner innerScanner) {
+            this.innerScanner = innerScanner;
+            this.iterator = aggBufMap.entrySet().iterator();
+        }
+
+        @Override
+        public boolean next(List<Cell> results) throws IOException {
+            // AggregateRegionObserver.LOG.info("Kylin Scanner next()");
+            boolean hasMore = false;
+            if (iterator.hasNext()) {
+                Entry<AggrKey, MeasureAggregator[]> entry = iterator.next();
+                makeCells(entry, results);
+                hasMore = iterator.hasNext();
+            }
+            // AggregateRegionObserver.LOG.info("Kylin Scanner next() done");
+            return hasMore;
+        }
+
+        private void makeCells(Entry<AggrKey, MeasureAggregator[]> entry, List<Cell> results) {
+            byte[][] families = aggregators.getHColFamilies();
+            byte[][] qualifiers = aggregators.getHColQualifiers();
+            int nHCols = aggregators.getHColsNum();
+
+            AggrKey rowKey = entry.getKey();
+            MeasureAggregator[] aggBuf = entry.getValue();
+            ByteBuffer[] rowValues = aggregators.getHColValues(aggBuf);
+
+            if (nHCols == 0) {
+                Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), //
+                        null, 0, 0, //
+                        null, 0, 0, //
+                        HConstants.LATEST_TIMESTAMP, Type.Put, //
+                        null, 0, 0);
+                results.add(keyValue);
+            } else {
+                for (int i = 0; i < nHCols; i++) {
+                    Cell keyValue = new KeyValue(rowKey.get(), rowKey.offset(), rowKey.length(), //
+                            families[i], 0, families[i].length, //
+                            qualifiers[i], 0, qualifiers[i].length, //
+                            HConstants.LATEST_TIMESTAMP, Type.Put, //
+                            rowValues[i].array(), 0, rowValues[i].position());
+                    results.add(keyValue);
+                }
+            }
+        }
+
+        @Override
+        public boolean next(List<Cell> result, int limit) throws IOException {
+            return next(result);
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result) throws IOException {
+            return next(result);
+        }
+
+        @Override
+        public boolean nextRaw(List<Cell> result, int limit) throws IOException {
+            return next(result);
+        }
+
+        @Override
+        public void close() throws IOException {
+            // AggregateRegionObserver.LOG.info("Kylin Scanner close()");
+            innerScanner.close();
+            // AggregateRegionObserver.LOG.info("Kylin Scanner close() done");
+        }
+
+        @Override
+        public HRegionInfo getRegionInfo() {
+            // AggregateRegionObserver.LOG.info("Kylin Scanner getRegionInfo()");
+            return innerScanner.getRegionInfo();
+        }
+
+        @Override
+        public long getMaxResultSize() {
+            // AggregateRegionObserver.LOG.info("Kylin Scanner getMaxResultSize()");
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public boolean isFilterDone() throws IOException {
+            // AggregateRegionObserver.LOG.info("Kylin Scanner isFilterDone()");
+            return false;
+        }
+
+        @Override
+        public boolean reseek(byte[] row) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getMvccReadPoint() {
+            // AggregateRegionObserver.LOG.info("Kylin Scanner getMvccReadPoint()");
+            return Long.MAX_VALUE;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
new file mode 100644
index 0000000..634e7d3
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+
+/**
+ * @author yangli9
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class ObserverAggregators {
+
+    public static ObserverAggregators fromValueDecoders(Collection<RowValueDecoder> rowValueDecoders) {
+
+        // each decoder represents one HBase column
+        HCol[] hcols = new HCol[rowValueDecoders.size()];
+        int i = 0;
+        for (RowValueDecoder rowValueDecoder : rowValueDecoders) {
+            hcols[i++] = buildHCol(rowValueDecoder.getHBaseColumn());
+        }
+
+        ObserverAggregators aggrs = new ObserverAggregators(hcols);
+        return aggrs;
+
+    }
+
+    private static HCol buildHCol(HBaseColumnDesc desc) {
+        byte[] family = Bytes.toBytes(desc.getColumnFamilyName());
+        byte[] qualifier = Bytes.toBytes(desc.getQualifier());
+        MeasureDesc[] measures = desc.getMeasures();
+
+        String[] funcNames = new String[measures.length];
+        String[] dataTypes = new String[measures.length];
+
+        for (int i = 0; i < measures.length; i++) {
+            funcNames[i] = measures[i].getFunction().getExpression();
+            dataTypes[i] = measures[i].getFunction().getReturnType();
+        }
+
+        return new HCol(family, qualifier, funcNames, dataTypes);
+    }
+
+    public static byte[] serialize(ObserverAggregators o) {
+        ByteBuffer buf = ByteBuffer.allocate(CoprocessorConstants.SERIALIZE_BUFFER_SIZE);
+        serializer.serialize(o, buf);
+        byte[] result = new byte[buf.position()];
+        System.arraycopy(buf.array(), 0, result, 0, buf.position());
+        return result;
+    }
+
+    public static ObserverAggregators deserialize(byte[] bytes) {
+        return serializer.deserialize(ByteBuffer.wrap(bytes));
+    }
+
+    private static final Serializer serializer = new Serializer();
+
+    private static class Serializer implements BytesSerializer<ObserverAggregators> {
+
+        @Override
+        public void serialize(ObserverAggregators value, ByteBuffer out) {
+            BytesUtil.writeVInt(value.nHCols, out);
+            for (int i = 0; i < value.nHCols; i++) {
+                HCol col = value.hcols[i];
+                BytesUtil.writeByteArray(col.family, out);
+                BytesUtil.writeByteArray(col.qualifier, out);
+                BytesUtil.writeAsciiStringArray(col.funcNames, out);
+                BytesUtil.writeAsciiStringArray(col.dataTypes, out);
+            }
+        }
+
+        @Override
+        public ObserverAggregators deserialize(ByteBuffer in) {
+            int nHCols = BytesUtil.readVInt(in);
+            HCol[] hcols = new HCol[nHCols];
+            for (int i = 0; i < nHCols; i++) {
+                byte[] family = BytesUtil.readByteArray(in);
+                byte[] qualifier = BytesUtil.readByteArray(in);
+                String[] funcNames = BytesUtil.readAsciiStringArray(in);
+                String[] dataTypes = BytesUtil.readAsciiStringArray(in);
+                hcols[i] = new HCol(family, qualifier, funcNames, dataTypes);
+            }
+            return new ObserverAggregators(hcols);
+        }
+
+    }
+
+    // ============================================================================
+
+    final HCol[] hcols;
+    final int nHCols;
+    final ByteBuffer[] hColValues;
+    final int nTotalMeasures;
+
+    public ObserverAggregators(HCol[] _hcols) {
+        this.hcols = sort(_hcols);
+        this.nHCols = hcols.length;
+        this.hColValues = new ByteBuffer[nHCols];
+
+        int nTotalMeasures = 0;
+        for (HCol col : hcols)
+            nTotalMeasures += col.nMeasures;
+        this.nTotalMeasures = nTotalMeasures;
+    }
+
+    private HCol[] sort(HCol[] hcols) {
+        HCol[] copy = Arrays.copyOf(hcols, hcols.length);
+        Arrays.sort(copy, new Comparator<HCol>() {
+            @Override
+            public int compare(HCol o1, HCol o2) {
+                int comp = Bytes.compareTo(o1.family, o2.family);
+                if (comp != 0)
+                    return comp;
+                comp = Bytes.compareTo(o1.qualifier, o2.qualifier);
+                return comp;
+            }
+        });
+        return copy;
+    }
+
+    public MeasureAggregator[] createBuffer() {
+        MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures];
+        int i = 0;
+        for (HCol col : hcols) {
+            for (int j = 0; j < col.nMeasures; j++)
+                aggrs[i++] = MeasureAggregator.create(col.funcNames[j], col.dataTypes[j]);
+        }
+        return aggrs;
+    }
+
+    public void aggregate(MeasureAggregator[] measureAggrs, List<Cell> rowCells) {
+        int i = 0;
+        for (int ci = 0; ci < nHCols; ci++) {
+            HCol col = hcols[ci];
+            Cell cell = findCell(col, rowCells);
+
+            if (cell == null) {
+                i += col.nMeasures;
+                continue;
+            }
+
+            ByteBuffer input = ByteBuffer.wrap(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
+
+            col.measureCodec.decode(input, col.measureValues);
+            for (int j = 0; j < col.nMeasures; j++)
+                measureAggrs[i++].aggregate(col.measureValues[j]);
+        }
+    }
+
+    private Cell findCell(HCol col, List<Cell> cells) {
+        // cells are ordered by timestamp asc, thus search from back, first hit
+        // is the latest version
+        for (int i = cells.size() - 1; i >= 0; i--) {
+            Cell cell = cells.get(i);
+            if (match(col, cell)) {
+                return cell;
+            }
+        }
+        return null;
+    }
+
+    public static boolean match(HCol col, Cell cell) {
+        return Bytes.compareTo(col.family, 0, col.family.length, cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) == 0 && Bytes.compareTo(col.qualifier, 0, col.qualifier.length, cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) == 0;
+    }
+
+    public int getHColsNum() {
+        return nHCols;
+    }
+
+    public byte[][] getHColFamilies() {
+        byte[][] result = new byte[nHCols][];
+        for (int i = 0; i < nHCols; i++)
+            result[i] = hcols[i].family;
+        return result;
+    }
+
+    public byte[][] getHColQualifiers() {
+        byte[][] result = new byte[nHCols][];
+        for (int i = 0; i < nHCols; i++)
+            result[i] = hcols[i].qualifier;
+        return result;
+    }
+
+    public ByteBuffer[] getHColValues(MeasureAggregator[] aggrs) {
+        int i = 0;
+        for (int ci = 0; ci < nHCols; ci++) {
+            HCol col = hcols[ci];
+            for (int j = 0; j < col.nMeasures; j++)
+                col.measureValues[j] = aggrs[i++].getState();
+
+            col.measureBuf.clear();
+            col.measureCodec.encode(col.measureValues, col.measureBuf);
+            hColValues[ci] = col.measureBuf;
+        }
+        return hColValues;
+    }
+
+    // ============================================================================
+
+    public static class HCol {
+        final byte[] family;
+        final byte[] qualifier;
+        final String[] funcNames;
+        final String[] dataTypes;
+        final int nMeasures;
+
+        final MeasureCodec measureCodec;
+        final Object[] measureValues;
+        final ByteBuffer measureBuf;
+
+        public HCol(byte[] bFamily, byte[] bQualifier, String[] funcNames, String[] dataTypes) {
+            this.family = bFamily;
+            this.qualifier = bQualifier;
+            this.funcNames = funcNames;
+            this.dataTypes = dataTypes;
+            this.nMeasures = funcNames.length;
+            assert funcNames.length == dataTypes.length;
+
+            this.measureCodec = new MeasureCodec(dataTypes);
+            this.measureValues = new Object[nMeasures];
+            this.measureBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        }
+
+        @Override
+        public String toString() {
+            return "HCol [bFamily=" + Bytes.toString(family) + ", bQualifier=" + Bytes.toString(qualifier) + ", nMeasures=" + nMeasures + "]";
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverBehavior.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverBehavior.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverBehavior.java
new file mode 100644
index 0000000..7630d57
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverBehavior.java
@@ -0,0 +1,10 @@
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+/**
+ */
+public enum ObserverBehavior {
+    SCAN, //only scan data, used for profiling tuple scan speed
+    SCAN_FILTER, //only scan+filter used,used for profiling filter speed
+    SCAN_FILTER_AGGR, //aggregate the result
+    SCAN_FILTER_AGGR_CHECKMEM, //default full operations
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
new file mode 100644
index 0000000..e22cc00
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverEnabler.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorFilter;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorProjector;
+import org.apache.kylin.storage.hbase.common.coprocessor.FilterDecorator;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+import org.apache.kylin.storage.hbase.cube.v1.RegionScannerAdapter;
+import org.apache.kylin.storage.hbase.cube.v1.ResultScannerAdapter;
+import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * @author yangli9
+ */
+public class ObserverEnabler {
+
+    private static final Logger logger = LoggerFactory.getLogger(ObserverEnabler.class);
+
+    static final String FORCE_COPROCESSOR = "forceObserver";
+    static final Map<String, Boolean> CUBE_OVERRIDES = Maps.newConcurrentMap();
+
+    public static ResultScanner scanWithCoprocessorIfBeneficial(CubeSegment segment, Cuboid cuboid, TupleFilter tupleFiler, //
+            Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context, HTableInterface table, Scan scan) throws IOException {
+
+        if (context.isCoprocessorEnabled() == false) {
+            return table.getScanner(scan);
+        }
+
+        CoprocessorRowType type = CoprocessorRowType.fromCuboid(segment, cuboid);
+        CoprocessorFilter filter = CoprocessorFilter.fromFilter(segment, tupleFiler, FilterDecorator.FilterConstantsTreatment.REPLACE_WITH_GLOBAL_DICT);
+        CoprocessorProjector projector = CoprocessorProjector.makeForObserver(segment, cuboid, groupBy);
+        ObserverAggregators aggrs = ObserverAggregators.fromValueDecoders(rowValueDecoders);
+
+        boolean localCoprocessor = KylinConfig.getInstanceFromEnv().getQueryRunLocalCoprocessor() || BackdoorToggles.getRunLocalCoprocessor();
+
+        if (localCoprocessor) {
+            RegionScanner innerScanner = new RegionScannerAdapter(table.getScanner(scan));
+            AggregationScanner aggrScanner = new AggregationScanner(type, filter, projector, aggrs, innerScanner, ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM);
+            return new ResultScannerAdapter(aggrScanner);
+        } else {
+
+            // debug/profiling purpose
+            String toggle = BackdoorToggles.getObserverBehavior();
+            if (toggle == null) {
+                toggle = ObserverBehavior.SCAN_FILTER_AGGR_CHECKMEM.toString(); //default behavior
+            } else {
+                logger.info("The execution of this query will use " + toggle + " as observer's behavior");
+            }
+
+            scan.setAttribute(AggregateRegionObserver.COPROCESSOR_ENABLE, new byte[] { 0x01 });
+            scan.setAttribute(AggregateRegionObserver.BEHAVIOR, toggle.getBytes());
+            scan.setAttribute(AggregateRegionObserver.TYPE, CoprocessorRowType.serialize(type));
+            scan.setAttribute(AggregateRegionObserver.PROJECTOR, CoprocessorProjector.serialize(projector));
+            scan.setAttribute(AggregateRegionObserver.AGGREGATORS, ObserverAggregators.serialize(aggrs));
+            scan.setAttribute(AggregateRegionObserver.FILTER, CoprocessorFilter.serialize(filter));
+            return table.getScanner(scan);
+        }
+    }
+
+    public static void enableCoprocessorIfBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+        if (isCoprocessorBeneficial(cube, groupBy, rowValueDecoders, context)) {
+            context.enableCoprocessor();
+        }
+    }
+
+    private static boolean isCoprocessorBeneficial(CubeInstance cube, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+
+        String forceFlag = System.getProperty(FORCE_COPROCESSOR);
+        if (forceFlag != null) {
+            return Boolean.parseBoolean(forceFlag);
+        }
+
+        Boolean cubeOverride = CUBE_OVERRIDES.get(cube.getName());
+        if (cubeOverride != null) {
+            return cubeOverride.booleanValue();
+        }
+
+        //        if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
+        //            logger.info("Coprocessor is disabled because there is memory hungry count distinct");
+        //            return false;
+        //        }
+
+        if (context.isExactAggregation()) {
+            logger.info("Coprocessor is disabled because exactAggregation is true");
+            return false;
+        }
+
+        Cuboid cuboid = context.getCuboid();
+        Set<TblColRef> toAggr = Sets.newHashSet(cuboid.getAggregationColumns());
+        toAggr.removeAll(groupBy);
+        if (toAggr.isEmpty()) {
+            logger.info("Coprocessor is disabled because no additional columns to aggregate");
+            return false;
+        }
+
+        logger.info("Coprocessor is enabled to aggregate " + toAggr + ", returning " + groupBy);
+        return true;
+    }
+
+    @SuppressWarnings("unused")
+    private static int getBitsToScan(byte[] startKey, byte[] stopKey) {
+        // find the first bit difference from the beginning
+        int totalBits = startKey.length * 8;
+        int bitsToScan = totalBits;
+        for (int i = 0; i < totalBits; i++) {
+            int byteIdx = i / 8;
+            int bitIdx = 7 - i % 8;
+            byte bitMask = (byte) (1 << bitIdx);
+            if ((startKey[byteIdx] & bitMask) == (stopKey[byteIdx] & bitMask))
+                bitsToScan--;
+            else
+                break;
+        }
+        return bitsToScan;
+    }
+
+    public static void forceCoprocessorOn() {
+        System.setProperty(FORCE_COPROCESSOR, "true");
+    }
+
+    public static void forceCoprocessorOff() {
+        System.setProperty(FORCE_COPROCESSOR, "false");
+    }
+
+    public static String getForceCoprocessor() {
+        return System.getProperty(FORCE_COPROCESSOR);
+    }
+
+    public static void forceCoprocessorUnset() {
+        System.clearProperty(FORCE_COPROCESSOR);
+    }
+
+    public static void updateCubeOverride(String cubeName, String force) {
+        if ("null".equalsIgnoreCase(force) || "default".equalsIgnoreCase(force)) {
+            CUBE_OVERRIDES.remove(cubeName);
+        } else if ("true".equalsIgnoreCase(force)) {
+            CUBE_OVERRIDES.put(cubeName, Boolean.TRUE);
+        } else if ("false".equalsIgnoreCase(force)) {
+            CUBE_OVERRIDES.put(cubeName, Boolean.FALSE);
+        }
+    }
+
+    public static Map<String, Boolean> getCubeOverrides() {
+        return CUBE_OVERRIDES;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
new file mode 100644
index 0000000..4952931
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverTuple.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorRowType;
+
+/**
+ * A special kind of tuple that exposes column value (dictionary ID) directly on
+ * top of row key.
+ *
+ * @author yangli9
+ */
+public class ObserverTuple implements IEvaluatableTuple {
+
+    final CoprocessorRowType type;
+
+    ImmutableBytesWritable rowkey;
+    String[] values;
+
+    public ObserverTuple(CoprocessorRowType type) {
+        this.type = type;
+        this.rowkey = new ImmutableBytesWritable();
+        this.values = new String[type.getColumnCount()];
+    }
+
+    public void setUnderlying(byte[] array, int offset, int length) {
+        rowkey.set(array, offset, length);
+        for (int i = 0; i < values.length; i++) {
+            values[i] = null;
+        }
+    }
+
+    private String getValueAt(int i) {
+        int n = type.getColumnCount();
+        if (i < 0 || i >= n)
+            return null;
+
+        if (values[i] == null) {
+            values[i] = Dictionary.dictIdToString(rowkey.get(), rowkey.getOffset() + type.columnOffsets[i], type.columnSizes[i]);
+        }
+
+        return values[i];
+    }
+
+    @Override
+    public Object getValue(TblColRef col) {
+        int i = type.getColIndexByTblColRef(col);
+        return getValueAt(i);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CellListIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CellListIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CellListIterator.java
new file mode 100644
index 0000000..89b58fd
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CellListIterator.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+
+public interface CellListIterator extends Iterator<List<Cell>>, Closeable {
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
new file mode 100644
index 0000000..7d1d833
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.DataFormatException;
+
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.ServerRpcController;
+import org.apache.kylin.common.util.CompressionUtils;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.util.KryoUtils;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Iterators;
+import com.google.protobuf.ByteString;
+import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.CubeVisitProtos;
+
+public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
+
+    static class EndpintResultsAsGTScanner implements IGTScanner {
+        private GTInfo info;
+        private Iterator<byte[]> blocks;
+
+        public EndpintResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks) {
+            this.info = info;
+            this.blocks = blocks;
+        }
+
+        @Override
+        public GTInfo getInfo() {
+            return info;
+        }
+
+        @Override
+        public int getScannedRowCount() {
+            return 0;
+        }
+
+        @Override
+        public void close() throws IOException {
+            //do nothing
+        }
+
+        @Override
+        public Iterator<GTRecord> iterator() {
+            return Iterators.concat(Iterators.transform(blocks, new Function<byte[], Iterator<GTRecord>>() {
+                @Nullable
+                @Override
+                public Iterator<GTRecord> apply(@Nullable final byte[] input) {
+
+                    logger.info("Reassembling a raw block returned from Endpoint with byte length: " + input.length);
+                    return new Iterator<GTRecord>() {
+                        private ByteBuffer inputBuffer = null;
+                        private GTRecord oneRecord = null;
+
+                        @Override
+                        public boolean hasNext() {
+                            if (inputBuffer == null) {
+                                inputBuffer = ByteBuffer.wrap(input);
+                                oneRecord = new GTRecord(info);
+                            }
+
+                            return inputBuffer.position() < inputBuffer.limit();
+                        }
+
+                        @Override
+                        public GTRecord next() {
+                            oneRecord.loadAllColumns(inputBuffer);
+                            return oneRecord;
+                        }
+
+                        @Override
+                        public void remove() {
+                            throw new UnsupportedOperationException();
+                        }
+                    };
+                }
+            }));
+        }
+    }
+
+    public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
+        super(cubeSeg, cuboid, fullGTInfo);
+    }
+
+    @Override
+    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+
+        try {
+            // primary key (also the 0th column block) is always selected
+            final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
+            // globally shared connection, does not require close
+            HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+            final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+            final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
+
+            RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
+
+            byte[] scanRequestBytes = KryoUtils.serialize(scanRequest);
+            byte[] rawScanBytes = KryoUtils.serialize(rawScan);
+            CubeVisitProtos.CubeVisitRequest.Builder builder = CubeVisitProtos.CubeVisitRequest.newBuilder();
+            builder.setGtScanRequest(ByteString.copyFrom(scanRequestBytes)).setHbaseRawScan(ByteString.copyFrom(rawScanBytes));
+
+            Collection<CubeVisitProtos.CubeVisitResponse> results = getResults(builder.build(), hbaseTable, rawScan.startKey, rawScan.endKey);
+            final Collection<byte[]> rowBlocks = Collections2.transform(results, new Function<CubeVisitProtos.CubeVisitResponse, byte[]>() {
+                @Nullable
+                @Override
+                public byte[] apply(CubeVisitProtos.CubeVisitResponse input) {
+                    try {
+                        return CompressionUtils.decompress(input.getCompressedRows().toByteArray());
+                    } catch (IOException | DataFormatException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
+
+            return new EndpintResultsAsGTScanner(fullGTInfo, rowBlocks.iterator());
+
+        } catch (Throwable throwable) {
+            throwable.printStackTrace();
+        }
+        return null;
+    }
+
+    //TODO : async callback
+    private Collection<CubeVisitProtos.CubeVisitResponse> getResults(final CubeVisitProtos.CubeVisitRequest request, HTableInterface table, byte[] startKey, byte[] endKey) throws Throwable {
+        Map<byte[], CubeVisitProtos.CubeVisitResponse> results = table.coprocessorService(CubeVisitProtos.CubeVisitService.class, startKey, endKey, new Batch.Call<CubeVisitProtos.CubeVisitService, CubeVisitProtos.CubeVisitResponse>() {
+            public CubeVisitProtos.CubeVisitResponse call(CubeVisitProtos.CubeVisitService rowsService) throws IOException {
+                ServerRpcController controller = new ServerRpcController();
+                BlockingRpcCallback<CubeVisitProtos.CubeVisitResponse> rpcCallback = new BlockingRpcCallback<>();
+                rowsService.visitCube(controller, request, rpcCallback);
+                CubeVisitProtos.CubeVisitResponse response = rpcCallback.get();
+                if (controller.failedOnException()) {
+                    throw controller.getFailedOn();
+                }
+                return response;
+            }
+        });
+
+        logger.info("{} regions returned results ", results.values().size());
+
+        return results.values();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
new file mode 100644
index 0000000..09bef0f
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -0,0 +1,160 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.cube.model.HBaseMappingDesc;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public abstract class CubeHBaseRPC {
+
+    public static final Logger logger = LoggerFactory.getLogger(CubeHBaseRPC.class);
+
+    public static final int SCAN_CACHE = 1024;
+
+    final protected CubeSegment cubeSeg;
+    final protected Cuboid cuboid;
+    final protected GTInfo fullGTInfo;
+
+    public CubeHBaseRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
+        this.cubeSeg = cubeSeg;
+        this.cuboid = cuboid;
+        this.fullGTInfo = fullGTInfo;
+    }
+
+    abstract IGTScanner getGTScanner(GTScanRequest scanRequest) throws IOException;
+
+    public static Scan buildScan(RawScan rawScan) {
+        Scan scan = new Scan();
+        scan.setCaching(SCAN_CACHE);
+        scan.setCacheBlocks(true);
+        scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE));
+
+        if (rawScan.startKey != null) {
+            scan.setStartRow(rawScan.startKey);
+        }
+        if (rawScan.endKey != null) {
+            scan.setStopRow(rawScan.endKey);
+        }
+        if (rawScan.fuzzyKey != null) {
+            applyFuzzyFilter(scan, rawScan.fuzzyKey);
+        }
+        if (rawScan.hbaseColumns != null) {
+            applyHBaseColums(scan, rawScan.hbaseColumns);
+        }
+
+        return scan;
+    }
+
+    protected RawScan prepareRawScan(GTRecord pkStart, GTRecord pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
+        byte[] start = makeRowKeyToScan(pkStart, (byte) 0x00);
+        byte[] end = makeRowKeyToScan(pkEnd, (byte) 0xff);
+
+        //TODO fuzzy match
+
+        return new RawScan(start, end, selectedColumns, null);
+    }
+
+    private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) {
+        ByteArray pk = GTRecord.exportScanKey(pkRec);
+        int pkMaxLen = pkRec.getInfo().getMaxColumnLength(pkRec.getInfo().getPrimaryKey());
+
+        byte[] buf = new byte[pkMaxLen + RowConstants.ROWKEY_CUBOIDID_LEN];
+        Arrays.fill(buf, fill);
+
+        System.arraycopy(cuboid.getBytes(), 0, buf, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        if (pk != null && pk.array() != null) {
+            System.arraycopy(pk.array(), pk.offset(), buf, RowConstants.ROWKEY_CUBOIDID_LEN, pk.length());
+        }
+        return buf;
+    }
+
+    protected List<Pair<byte[], byte[]>> makeHBaseColumns(ImmutableBitSet selectedColBlocks) {
+        List<Pair<byte[], byte[]>> result = Lists.newArrayList();
+
+        int colBlockIdx = 1; // start from 1; the 0th column block is primary key which maps to rowkey
+        HBaseMappingDesc hbaseMapping = cubeSeg.getCubeDesc().getHbaseMapping();
+        for (HBaseColumnFamilyDesc familyDesc : hbaseMapping.getColumnFamily()) {
+            byte[] byteFamily = Bytes.toBytes(familyDesc.getName());
+            for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
+                if (selectedColBlocks.get(colBlockIdx)) {
+                    byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier());
+                    result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier));
+                }
+                colBlockIdx++;
+            }
+        }
+
+        return result;
+    }
+
+    //possible to use binary search as cells might be sorted
+    public static Cell findCell(List<Cell> cells, byte[] familyName, byte[] columnName) {
+        for (Cell c : cells) {
+            if (BytesUtil.compareBytes(familyName, 0, c.getFamilyArray(), c.getFamilyOffset(), familyName.length) == 0 && //
+                    BytesUtil.compareBytes(columnName, 0, c.getQualifierArray(), c.getQualifierOffset(), columnName.length) == 0) {
+                return c;
+            }
+        }
+        return null;
+    }
+
+    public static void applyHBaseColums(Scan scan, List<Pair<byte[], byte[]>> hbaseColumns) {
+        for (Pair<byte[], byte[]> hbaseColumn : hbaseColumns) {
+            byte[] byteFamily = hbaseColumn.getFirst();
+            byte[] byteQualifier = hbaseColumn.getSecond();
+            scan.addColumn(byteFamily, byteQualifier);
+        }
+    }
+
+    public static void applyFuzzyFilter(Scan scan, List<org.apache.kylin.common.util.Pair<byte[], byte[]>> fuzzyKeys) {
+        if (fuzzyKeys != null && fuzzyKeys.size() > 0) {
+            FuzzyRowFilter rowFilter = new FuzzyRowFilter(convertToHBasePair(fuzzyKeys));
+
+            Filter filter = scan.getFilter();
+            if (filter != null) {
+                // may have existed InclusiveStopFilter, see buildScan
+                FilterList filterList = new FilterList();
+                filterList.addFilter(filter);
+                filterList.addFilter(rowFilter);
+                scan.setFilter(filterList);
+            } else {
+                scan.setFilter(rowFilter);
+            }
+        }
+    }
+
+    private static List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> convertToHBasePair(List<org.apache.kylin.common.util.Pair<byte[], byte[]>> pairList) {
+        List<org.apache.hadoop.hbase.util.Pair<byte[], byte[]>> result = Lists.newArrayList();
+        for (org.apache.kylin.common.util.Pair pair : pairList) {
+            org.apache.hadoop.hbase.util.Pair element = new org.apache.hadoop.hbase.util.Pair(pair.getFirst(), pair.getSecond());
+            result.add(element);
+        }
+
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
new file mode 100644
index 0000000..e673f32
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java
@@ -0,0 +1,77 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.IGTStore;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+
+/**
+ * for test use only
+ */
+public class CubeHBaseScanRPC extends CubeHBaseRPC {
+
+    public CubeHBaseScanRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
+        super(cubeSeg, cuboid, fullGTInfo);
+    }
+
+    @Override
+    public IGTScanner getGTScanner(final GTScanRequest scanRequest) throws IOException {
+
+        // primary key (also the 0th column block) is always selected
+        final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0);
+
+        // globally shared connection, does not require close
+        HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl());
+
+        final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier());
+        final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks);
+
+        RawScan rawScan = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns);
+        Scan hbaseScan = buildScan(rawScan);
+
+        final ResultScanner scanner = hbaseTable.getScanner(hbaseScan);
+        final Iterator<Result> iterator = scanner.iterator();
+
+        CellListIterator cellListIterator = new CellListIterator() {
+            @Override
+            public void close() throws IOException {
+                scanner.close();
+                hbaseTable.close();
+            }
+
+            @Override
+            public boolean hasNext() {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public List<Cell> next() {
+                return iterator.next().listCells();
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+
+        IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, hbaseColumns);
+        IGTScanner rawScanner = store.scan(scanRequest);
+        return scanRequest.decorateScanner(rawScanner);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
new file mode 100644
index 0000000..9359934
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeScanner.java
@@ -0,0 +1,265 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CubeGridTable;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRange;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTUtil;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class CubeScanner implements IGTScanner {
+
+    private static final int MAX_SCAN_RANGES = 200;
+
+    final CubeSegment cubeSeg;
+    final GTInfo info;
+    final byte[] trimmedInfoBytes;
+    final List<GTScanRequest> scanRequests;
+    final Scanner scanner;
+    final Cuboid cuboid;
+
+    public CubeScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+            Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
+        this.cuboid = cuboid;
+        this.cubeSeg = cubeSeg;
+        this.info = CubeGridTable.newGTInfo(cubeSeg, cuboid.getId());
+
+        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+        //replace the constant values in filter to dictionary codes 
+        TupleFilter gtFilter = GTUtil.convertFilterColumnsAndConstants(filter, info, mapping.getCuboidDimensionsInGTOrder(), groups);
+
+        ImmutableBitSet gtDimensions = makeGridTableColumns(mapping, dimensions);
+        ImmutableBitSet gtAggrGroups = makeGridTableColumns(mapping, replaceDerivedColumns(groups, cubeSeg.getCubeDesc()));
+        ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics);
+        String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics);
+
+        //TODO: should remove this in endpoint scenario
+        GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info);
+        List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES);
+
+        scanRequests = Lists.newArrayListWithCapacity(scanRanges.size());
+
+        trimmedInfoBytes = GTInfo.serialize(info);
+        GTInfo trimmedInfo = GTInfo.deserialize(trimmedInfoBytes);
+
+        for (GTScanRange range : scanRanges) {
+            scanRequests.add(new GTScanRequest(trimmedInfo, range, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate));
+        }
+
+        scanner = new Scanner();
+    }
+
+    private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) {
+        Set<TblColRef> ret = Sets.newHashSet();
+        for (TblColRef col : input) {
+            if (cubeDesc.isDerived(col)) {
+                for (TblColRef host : cubeDesc.getHostInfo(col).columns) {
+                    ret.add(host);
+                }
+            } else {
+                ret.add(col);
+            }
+        }
+        return ret;
+    }
+
+    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Set<TblColRef> dimensions) {
+        BitSet result = new BitSet();
+        for (TblColRef dim : dimensions) {
+            int idx = mapping.getIndexOf(dim);
+            if (idx >= 0)
+                result.set(idx);
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    private ImmutableBitSet makeGridTableColumns(CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+        BitSet result = new BitSet();
+        for (FunctionDesc metric : metrics) {
+            int idx = mapping.getIndexOf(metric);
+            if (idx < 0)
+                throw new IllegalStateException(metric + " not found in " + mapping);
+            result.set(idx);
+        }
+        return new ImmutableBitSet(result);
+    }
+
+    private String[] makeAggrFuncs(final CuboidToGridTableMapping mapping, Collection<FunctionDesc> metrics) {
+
+        //metrics are represented in ImmutableBitSet, which loses order information
+        //sort the aggrFuns to align with metrics natural order 
+        List<FunctionDesc> metricList = Lists.newArrayList(metrics);
+        Collections.sort(metricList, new Comparator<FunctionDesc>() {
+            @Override
+            public int compare(FunctionDesc o1, FunctionDesc o2) {
+                int a = mapping.getIndexOf(o1);
+                int b = mapping.getIndexOf(o2);
+                return a - b;
+            }
+        });
+
+        String[] result = new String[metricList.size()];
+        int i = 0;
+        for (FunctionDesc metric : metricList) {
+            result[i++] = metric.getExpression();
+        }
+        return result;
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return scanner.iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        scanner.close();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return scanner.getScannedRowCount();
+    }
+
+    static class RemoteGTRecordAdapter implements Iterable<GTRecord> {
+
+        private final GTInfo info;
+        private final Iterator<GTRecord> input;
+
+        public RemoteGTRecordAdapter(GTInfo info, Iterator<GTRecord> input) {
+            this.info = info;
+            this.input = input;
+        }
+
+        @Override
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+                @Override
+                public boolean hasNext() {
+                    return input.hasNext();
+                }
+
+                @Override
+                public GTRecord next() {
+                    GTRecord x = input.next();
+                    return new GTRecord(info, x.getInternal());
+                }
+
+                @Override
+                public void remove() {
+
+                }
+            };
+        }
+    }
+
+    private class Scanner {
+        final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()];
+        int cur = 0;
+        Iterator<GTRecord> curIterator = null;
+        GTRecord next = null;
+
+        public Iterator<GTRecord> iterator() {
+            return new Iterator<GTRecord>() {
+
+                @Override
+                public boolean hasNext() {
+                    if (next != null)
+                        return true;
+
+                    if (curIterator == null) {
+                        if (cur >= scanRequests.size())
+                            return false;
+
+                        try {
+                            CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info);
+                            inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur));
+                            curIterator = inputScanners[cur].iterator();
+                            //curIterator = new RemoteGTRecordAdapter(info, inputScanners[cur].iterator()).iterator();
+                        } catch (IOException e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+
+                    if (curIterator.hasNext() == false) {
+                        curIterator = null;
+                        cur++;
+                        return hasNext();
+                    }
+
+                    next = curIterator.next();
+                    return true;
+                }
+
+                @Override
+                public GTRecord next() {
+                    // fetch next record
+                    if (next == null) {
+                        hasNext();
+                        if (next == null)
+                            throw new NoSuchElementException();
+                    }
+
+                    GTRecord result = next;
+                    next = null;
+                    return result;
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+
+        public void close() throws IOException {
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] != null) {
+                    inputScanners[i].close();
+                }
+            }
+        }
+
+        public int getScannedRowCount() {
+            int result = 0;
+            for (int i = 0; i < inputScanners.length; i++) {
+                if (inputScanners[i] == null)
+                    break;
+
+                result += inputScanners[i].getScannedRowCount();
+            }
+            return result;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/11611cc2/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
new file mode 100644
index 0000000..50743f6
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -0,0 +1,370 @@
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.translate.DerivedFilterTranslator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+public class CubeStorageQuery implements ICachableStorageQuery {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
+
+    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
+
+    private final CubeInstance cubeInstance;
+    private final CubeDesc cubeDesc;
+
+    public CubeStorageQuery(CubeInstance cube) {
+        this.cubeInstance = cube;
+        this.cubeDesc = cube.getDescriptor();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TupleFilter filter = sqlDigest.filter;
+
+        // build dimension & metrics
+        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
+
+        // all dimensions = groups + filter dimensions
+        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
+        filterDims.removeAll(groups);
+
+        // expand derived (xxxD means contains host columns only, derived columns were translated)
+        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+        Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
+        filterDimsD.removeAll(groupsD);
+        derivedPostAggregation.removeAll(groups);
+
+        // identify cuboid
+        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+        dimensionsD.addAll(groupsD);
+        dimensionsD.addAll(filterDimsD);
+        Cuboid cuboid = identifyCuboid(dimensionsD);
+        context.setCuboid(cuboid);
+
+        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
+        context.setExactAggregation(isExactAggregation);
+
+        if (isExactAggregation) {
+            metrics = replaceHolisticCountDistinct(metrics);
+        }
+
+        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
+        TupleFilter filterD = translateDerived(filter, groupsD);
+
+        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+        // TODO enable coprocessor
+        //        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
+        setLimit(filter, context);
+
+        List<CubeScanner> scanners = Lists.newArrayList();
+        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+            scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation));
+        }
+
+        if (scanners.isEmpty())
+            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+
+        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
+    }
+
+    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+        for (FunctionDesc func : sqlDigest.aggregations) {
+            if (!func.isDimensionAsMetric()) {
+                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+                metrics.add(findAggrFuncFromCubeDesc(func));
+            }
+        }
+
+        for (TblColRef column : sqlDigest.allColumns) {
+            // skip measure columns
+            if (sqlDigest.metricColumns.contains(column)) {
+                continue;
+            }
+            dimensions.add(column);
+        }
+    }
+
+    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (measure.getFunction().equals(aggrFunc))
+                return measure.getFunction();
+        }
+        return aggrFunc;
+    }
+
+    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+        Set<TblColRef> expanded = Sets.newHashSet();
+        for (TblColRef col : cols) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                for (TblColRef hostCol : hostInfo.columns) {
+                    expanded.add(hostCol);
+                    if (hostInfo.isOneToOne == false)
+                        derivedPostAggregation.add(hostCol);
+                }
+            } else {
+                expanded.add(col);
+            }
+        }
+        return expanded;
+    }
+
+    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+        long cuboidID = 0;
+        for (TblColRef column : dimensions) {
+            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+            cuboidID |= 1L << index;
+        }
+        return Cuboid.findById(cubeDesc, cuboidID);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+        Collection<? extends TupleFilter> toCheck;
+        if (filter instanceof CompareTupleFilter) {
+            toCheck = Collections.singleton(filter);
+        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+            toCheck = filter.getChildren();
+        } else {
+            return (Set<TblColRef>) Collections.EMPTY_SET;
+        }
+
+        Set<TblColRef> result = Sets.newHashSet();
+        for (TupleFilter f : toCheck) {
+            if (f instanceof CompareTupleFilter) {
+                CompareTupleFilter compFilter = (CompareTupleFilter) f;
+                // is COL=const ?
+                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+                    result.add(compFilter.getColumn());
+                }
+            }
+        }
+
+        // expand derived
+        Set<TblColRef> resultD = Sets.newHashSet();
+        for (TblColRef col : result) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                if (hostInfo.isOneToOne) {
+                    for (TblColRef hostCol : hostInfo.columns) {
+                        resultD.add(hostCol);
+                    }
+                }
+                //if not one2one, it will be pruned
+            } else {
+                resultD.add(col);
+            }
+        }
+        return resultD;
+    }
+
+    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+        boolean exact = true;
+
+        if (cuboid.requirePostAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because some column not on group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+        }
+
+        if (exact) {
+            logger.info("exactAggregation is true");
+        }
+        return exact;
+    }
+
+    private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
+        // for count distinct, try use its holistic version if possible
+        Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
+        for (FunctionDesc metric : metrics) {
+            if (metric.isCountDistinct() == false) {
+                result.add(metric);
+                continue;
+            }
+
+            FunctionDesc holisticVersion = null;
+            for (MeasureDesc measure : cubeDesc.getMeasures()) {
+                FunctionDesc measureFunc = measure.getFunction();
+                if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
+                    holisticVersion = measureFunc;
+                }
+            }
+            result.add(holisticVersion == null ? metric : holisticVersion);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return filter;
+
+        if (filter instanceof CompareTupleFilter) {
+            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+        }
+
+        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+        boolean modified = false;
+        for (TupleFilter child : children) {
+            TupleFilter translated = translateDerived(child, collector);
+            newChildren.add(translated);
+            if (child != translated)
+                modified = true;
+        }
+        if (modified) {
+            filter = replaceChildren(filter, newChildren);
+        }
+        return filter;
+    }
+
+    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+            r.addChildren(newChildren);
+            return r;
+        } else
+            throw new IllegalStateException("Cannot replaceChildren on " + filter);
+    }
+
+    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+        if (compf.getColumn() == null || compf.getValues().isEmpty())
+            return compf;
+
+        TblColRef derived = compf.getColumn();
+        if (cubeDesc.isDerived(derived) == false)
+            return compf;
+
+        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+        CubeSegment seg = cubeInstance.getLatestReadySegment();
+        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        TupleFilter translatedFilter = translated.getFirst();
+        boolean loosened = translated.getSecond();
+        if (loosened) {
+            collectColumnsRecursively(translatedFilter, collector);
+        }
+        return translatedFilter;
+    }
+
+    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter instanceof ColumnTupleFilter) {
+            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+        }
+        for (TupleFilter child : filter.getChildren()) {
+            collectColumnsRecursively(child, collector);
+        }
+    }
+
+    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+        if (cubeDesc.isDerived(col)) {
+            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+            for (TblColRef h : hostInfo.columns)
+                collector.add(h);
+        } else {
+            collector.add(col);
+        }
+    }
+
+    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+        boolean hasMemHungryCountDistinct = false;
+        for (FunctionDesc func : metrics) {
+            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+                hasMemHungryCountDistinct = true;
+            }
+        }
+
+        // need to limit the memory usage for memory hungry count distinct
+        if (hasMemHungryCountDistinct == false) {
+            return;
+        }
+
+        int rowSizeEst = dimensions.size() * 3;
+        for (FunctionDesc func : metrics) {
+            rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+        }
+
+        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
+        context.setThreshold((int) rowEst);
+    }
+
+    private void setLimit(TupleFilter filter, StorageContext context) {
+        boolean goodAggr = context.isExactAggregation();
+        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+        boolean goodSort = context.hasSort() == false;
+        if (goodAggr && goodFilter && goodSort) {
+            logger.info("Enable limit " + context.getLimit());
+            context.enableLimit();
+        }
+    }
+
+    // ============================================================================
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return null;
+    }
+
+    @Override
+    public String getStorageUUID() {
+        return cubeInstance.getUuid();
+    }
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
+}


Mime
View raw message