kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [31/50] [abbrv] incubator-kylin git commit: KYLIN-728
Date Fri, 15 May 2015 03:06:43 GMT
KYLIN-728


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0d87e8f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0d87e8f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0d87e8f6

Branch: refs/heads/streaming-localdict
Commit: 0d87e8f689e42d5a21330815eca6b2f83afb3f02
Parents: 876ac60
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Tue May 12 13:59:54 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Tue May 12 13:59:54 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/util/BytesSerializer.java      |   6 +-
 .../org/apache/kylin/common/util/SSHClient.java |   5 +-
 .../kylin/cube/cuboid/CuboidScheduler.java      |  14 +-
 .../apache/kylin/dict/DictionarySerializer.java |  54 +++++
 .../invertedindex/model/IIKeyValueCodec.java    |  31 +--
 .../job/hadoop/cubev2/InMemCubeBuilder.java     | 197 +++++++++++++------
 .../kylin/job/hadoop/cubev2/InMemCuboidJob.java |   2 +-
 .../hadoop/cubev2/MapContextGTRecordWriter.java |   2 +-
 .../metadata/serializer/DataTypeSerializer.java |  32 +--
 .../kylin/storage/cube/CubeCodeSystem.java      |   1 +
 .../storage/cube/CubeHBaseReadonlyStore.java    |  14 +-
 .../kylin/storage/gridtable/GTBuilder.java      |   1 -
 .../kylin/storage/gridtable/GTComboStore.java   | 112 +++++++++++
 .../apache/kylin/storage/gridtable/GTInfo.java  |  10 +-
 .../storage/gridtable/GTInvertedIndex.java      |   4 +-
 .../kylin/storage/gridtable/GTRawScanner.java   |  24 +--
 .../kylin/storage/gridtable/GTRecord.java       |   4 +-
 .../kylin/storage/gridtable/GTRowBlock.java     |  19 +-
 .../storage/gridtable/GTSampleCodeSystem.java   |   5 +-
 .../kylin/storage/gridtable/IGTScanner.java     |   6 +-
 .../kylin/storage/gridtable/IGTStore.java       |  18 +-
 .../apache/kylin/storage/gridtable/ScanKey.java |  34 ++++
 .../storage/gridtable/diskstore/FileSystem.java |  22 +++
 .../gridtable/diskstore/GTDiskStore.java        | 160 +++++++++++++++
 .../gridtable/diskstore/HadoopFileSystem.java   |  88 +++++++++
 .../gridtable/diskstore/LocalFileSystem.java    |  60 ++++++
 .../gridtable/memstore/GTSimpleMemStore.java    |  49 +++--
 .../apache/kylin/storage/util/SizeOfUtil.java   |  21 ++
 .../invertedindex/IIStreamBuilder.java          |   6 +-
 29 files changed, 803 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java b/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
index 1d1f5ae..26342f5 100644
--- a/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
+++ b/common/src/main/java/org/apache/kylin/common/util/BytesSerializer.java
@@ -26,10 +26,10 @@ import java.nio.ByteBuffer;
  */
 public interface BytesSerializer<T> {
 
-    public static final int SERIALIZE_BUFFER_SIZE = 65536;
+    int SERIALIZE_BUFFER_SIZE = 65536;
 
-    abstract public void serialize(T value, ByteBuffer out);
+    void serialize(T value, ByteBuffer out);
 
-    abstract public T deserialize(ByteBuffer in);
+    T deserialize(ByteBuffer in);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/SSHClient.java b/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
index ccaabf0..32eb72a 100644
--- a/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
+++ b/common/src/main/java/org/apache/kylin/common/util/SSHClient.java
@@ -49,6 +49,7 @@ public class SSHClient {
     public SSHClient(String hostname, int port, String username, String password) {
         this.hostname = hostname;
         this.username = username;
+        this.port = port;
         if (password != null && new File(password).exists()) {
             this.identityPath = new File(password).getAbsolutePath();
             this.password = null;
@@ -324,7 +325,7 @@ public class SSHClient {
                     throw ee;
                 }
                 if (timeout < 0)
-                    throw new Exception("Remote commmand not finished within " + timeoutSeconds + " seconds.");
+                    throw new Exception("Remote command not finished within " + timeoutSeconds + " seconds.");
             }
             channel.disconnect();
             session.disconnect();
@@ -340,7 +341,7 @@ public class SSHClient {
             jsch.addIdentity(identityPath);
         }
 
-        Session session = jsch.getSession(username, hostname, 22);
+        Session session = jsch.getSession(username, hostname, port);
         if (password != null) {
             session.setPassword(password);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
index 6f64116..07be092 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cuboid/CuboidScheduler.java
@@ -22,11 +22,7 @@ package org.apache.kylin.cube.cuboid;
  * @author George Song (ysong1)
  * 
  */
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.cube.model.CubeDesc;
@@ -38,21 +34,21 @@ public class CuboidScheduler {
     private final CubeDesc cubeDef;
     private final int size;
     private final long max;
-    private final Map<Long, Collection<Long>> cache;
+    private final Map<Long, List<Long>> cache;
 
     public CuboidScheduler(CubeDesc cube) {
         this.cubeDef = cube;
         this.size = cube.getRowkey().getRowKeyColumns().length;
         this.max = (long) Math.pow(2, size) - 1;
-        this.cache = new ConcurrentHashMap<Long, Collection<Long>>();
+        this.cache = new ConcurrentHashMap<Long, List<Long>>();
     }
 
-    public Collection<Long> getSpanningCuboid(long cuboid) {
+    public List<Long> getSpanningCuboid(long cuboid) {
         if (cuboid > max || cuboid < 0) {
             throw new IllegalArgumentException("Cuboid " + cuboid + " is out of scope 0-" + max);
         }
 
-        Collection<Long> result = cache.get(cuboid);
+        List<Long> result = cache.get(cuboid);
         if (result != null) {
             return result;
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
----------------------------------------------------------------------
diff --git a/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java b/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
new file mode 100644
index 0000000..4b61591
--- /dev/null
+++ b/dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.dict;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.kylin.common.util.ClassUtil;
+
+import java.io.*;
+
+/**
+ * Created by qianzhou on 5/5/15.
+ */
+public final class DictionarySerializer {
+
+    private DictionarySerializer() {}
+
+    public static Dictionary<?> deserialize(InputStream inputStream) {
+        try {
+            final DataInputStream dataInputStream = new DataInputStream(inputStream);
+            final String type = dataInputStream.readUTF();
+            final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance();
+            dictionary.readFields(dataInputStream);
+            return dictionary;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static Dictionary<?> deserialize(ImmutableBytesWritable dictBytes) {
+        return deserialize(new ByteArrayInputStream(dictBytes.get(), dictBytes.getOffset(), dictBytes.getLength()));
+    }
+
+    public static void serialize(Dictionary<?> dict, OutputStream outputStream) {
+        try {
+            DataOutputStream out = new DataOutputStream(outputStream);
+            out.writeUTF(dict.getClass().getName());
+            dict.write(out);
+            out.flush();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static ImmutableBytesWritable serialize(Dictionary<?> dict) {
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            DataOutputStream out = new DataOutputStream(baos);
+            out.writeUTF(dict.getClass().getName());
+            dict.write(out);
+            return new ImmutableBytesWritable(baos.toByteArray());
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 8dbaed7..b236879 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -23,8 +23,8 @@ import com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionarySerializer;
 import org.apache.kylin.invertedindex.index.ColumnValueContainer;
 import org.apache.kylin.invertedindex.index.CompressedValueContainer;
 import org.apache.kylin.invertedindex.index.Slice;
@@ -32,7 +32,6 @@ import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.DataType;
 
-import java.io.*;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -73,31 +72,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
         if (dictionary == null) {
             return new IIRow(key, value, new ImmutableBytesWritable(BytesUtil.EMPTY_BYTE_ARRAY));
         } else {
-            return new IIRow(key, value, serialize(dictionary));
-        }
-    }
-
-    private static Dictionary<?> deserialize(ImmutableBytesWritable dictBytes) {
-        try {
-            final DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(dictBytes.get(), dictBytes.getOffset(), dictBytes.getLength()));
-            final String type = dataInputStream.readUTF();
-            final Dictionary dictionary = ClassUtil.forName(type, Dictionary.class).newInstance();
-            dictionary.readFields(dataInputStream);
-            return dictionary;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private static ImmutableBytesWritable serialize(Dictionary<?> dict) {
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            DataOutputStream out = new DataOutputStream(baos);
-            out.writeUTF(dict.getClass().getName());
-            dict.write(out);
-            return new ImmutableBytesWritable(baos.toByteArray());
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+            return new IIRow(key, value, DictionarySerializer.serialize(dictionary));
         }
     }
 
@@ -221,7 +196,7 @@ public class IIKeyValueCodec implements KeyValueCodec {
                         } else {
                             final ImmutableBytesWritable dictBytes = row.getDictionary();
                             if (dictBytes.getLength() != 0) {
-                                final Dictionary<?> dictionary = deserialize(dictBytes);
+                                final Dictionary<?> dictionary = DictionarySerializer.deserialize(dictBytes);
                                 CompressedValueContainer c = new CompressedValueContainer(dictionary.getSizeOfId(), dictionary.getMaxId() - dictionary.getMinId() + 1, 0);
                                 c.fromBytes(row.getValue());
                                 valueContainers[curCol] = c;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
index 29cdc9a..87ad2d3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCubeBuilder.java
@@ -38,6 +38,7 @@ import com.google.common.collect.Maps;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.Pair;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
@@ -51,10 +52,9 @@ import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.serializer.DataTypeSerializer;
 import org.apache.kylin.storage.cube.CubeGridTable;
 import org.apache.kylin.storage.gridtable.*;
-import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.storage.util.SizeOfUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,6 +69,7 @@ import java.util.concurrent.BlockingQueue;
 @SuppressWarnings("rawtypes")
 public class InMemCubeBuilder implements Runnable {
 
+    private static final double AGGREGATION_CACHE_FACTOR = 3;
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
 
     private BlockingQueue<List<String>> queue;
@@ -78,14 +79,11 @@ public class InMemCubeBuilder implements Runnable {
     private Map<TblColRef, Dictionary<?>> dictionaryMap = null;
     private CubeJoinedFlatTableDesc intermediateTableDesc;
     private MeasureCodec measureCodec;
-    private int measureNumber;
     private String[] metricsAggrFuncs = null;
     private Map<Integer, Integer> dependentMeasures = null; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
     public static final LongWritable ONE = new LongWritable(1l);
 
     protected IGTRecordWriter gtRecordWriter;
-    private GridTable baseCuboidGT;
-    private DataTypeSerializer[] serializers;
 
 
     /**
@@ -95,30 +93,31 @@ public class InMemCubeBuilder implements Runnable {
      * @param gtRecordWriter
      */
     public InMemCubeBuilder(BlockingQueue<List<String>> queue, CubeInstance cube, Map<TblColRef, Dictionary<?>> dictionaryMap, IGTRecordWriter gtRecordWriter) {
+        if (dictionaryMap == null || dictionaryMap.isEmpty()) {
+            throw new IllegalArgumentException();
+        }
         this.queue = queue;
         this.desc = cube.getDescriptor();
         this.cuboidScheduler = new CuboidScheduler(desc);
         this.dictionaryMap = dictionaryMap;
         this.gtRecordWriter = gtRecordWriter;
-        baseCuboidId = Cuboid.getBaseCuboidId(desc);
-
-        intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
-        measureCodec = new MeasureCodec(desc.getMeasures());
-        measureNumber = desc.getMeasures().size();
-
-        dependentMeasures = Maps.newHashMap();
+        this.baseCuboidId = Cuboid.getBaseCuboidId(desc);
+        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(desc, null);
+        this.measureCodec = new MeasureCodec(desc.getMeasures());
 
-        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+        Map<String, Integer> measureIndexMap = Maps.newHashMap();
         List<String> metricsAggrFuncsList = Lists.newArrayList();
-        for (int i = 0, n = desc.getMeasures().size(); i < n; i++) {
+        final int measureCount = desc.getMeasures().size();
+        for (int i = 0; i < measureCount; i++) {
             MeasureDesc measureDesc = desc.getMeasures().get(i);
             metricsAggrFuncsList.add(measureDesc.getFunction().getExpression());
 
             measureIndexMap.put(desc.getMeasures().get(i).getName(), i);
         }
-        metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
+        this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
 
-        for (int i = 0; i < measureNumber; i++) {
+        this.dependentMeasures = Maps.newHashMap();
+        for (int i = 0; i < measureCount; i++) {
             String depMsrRef = desc.getMeasures().get(i).getDependentMeasureRef();
             if (depMsrRef != null) {
                 int index = measureIndexMap.get(depMsrRef);
@@ -126,21 +125,19 @@ public class InMemCubeBuilder implements Runnable {
             }
         }
 
-        if (dictionaryMap == null || dictionaryMap.isEmpty())
-            throw new IllegalArgumentException();
     }
 
 
-    private GridTable newGridTableByCuboidID(long cuboidID) {
+    private GridTable newGridTableByCuboidID(long cuboidID, boolean memStore) {
         GTInfo info = CubeGridTable.newGTInfo(desc, cuboidID, dictionaryMap);
-        GTSimpleMemStore store = new GTSimpleMemStore(info);
+        GTComboStore store = new GTComboStore(info, memStore);
         GridTable gridTable = new GridTable(info, store);
         return gridTable;
     }
 
-    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+    private GridTable aggregateCuboid(GridTable parentCuboid, long parentCuboidId, long cuboidId, boolean inMem) throws IOException {
         logger.info("Calculating cuboid " + cuboidId + " from parent " + parentCuboidId);
-        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumBitSet(parentCuboidId);
+        Pair<BitSet, BitSet> columnBitSets = getDimensionAndMetricColumnBitSet(parentCuboidId);
         BitSet parentDimensions = columnBitSets.getFirst();
         BitSet measureColumns = columnBitSets.getSecond();
         BitSet childDimensions = (BitSet) parentDimensions.clone();
@@ -160,14 +157,14 @@ public class InMemCubeBuilder implements Runnable {
             mask = mask >> 1;
         }
 
-        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns);
+        return scanAndAggregateGridTable(parentCuboid, cuboidId, childDimensions, measureColumns, inMem);
 
     }
 
-    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns) throws IOException {
+    private GridTable scanAndAggregateGridTable(GridTable gridTable, long cuboidId, BitSet aggregationColumns, BitSet measureColumns, boolean inMem) throws IOException {
         GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
         IGTScanner scanner = gridTable.scan(req);
-        GridTable newGridTable = newGridTableByCuboidID(cuboidId);
+        GridTable newGridTable = newGridTableByCuboidID(cuboidId, inMem);
         GTBuilder builder = newGridTable.rebuild();
 
         BitSet allNeededColumns = new BitSet();
@@ -181,7 +178,7 @@ public class InMemCubeBuilder implements Runnable {
         try {
             BitSet dependentMetrics = new BitSet(allNeededColumns.cardinality());
             for (Integer i : dependentMeasures.keySet()) {
-                dependentMetrics.set((allNeededColumns.cardinality() - measureNumber + dependentMeasures.get(i)));
+                dependentMetrics.set((allNeededColumns.cardinality() - desc.getMeasures().size() + dependentMeasures.get(i)));
             }
 
             Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
@@ -197,13 +194,13 @@ public class InMemCubeBuilder implements Runnable {
 
                 for (Integer i : dependentMeasures.keySet()) {
                     for (int index = 0, c = dependentMetrics.nextSetBit(0); c >= 0; index++, c = dependentMetrics.nextSetBit(c + 1)) {
-                        if (c == allNeededColumns.cardinality() - measureNumber + dependentMeasures.get(i)) {
+                        if (c == allNeededColumns.cardinality() - desc.getMeasures().size() + dependentMeasures.get(i)) {
                             assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
 
                             byteBuffer.clear();
                             BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
                             byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
-                            newRecord.set(allNeededColumns.cardinality() - measureNumber + i, byteArray);
+                            newRecord.set(allNeededColumns.cardinality() - desc.getMeasures().size() + i, byteArray);
                         }
                     }
 
@@ -219,16 +216,16 @@ public class InMemCubeBuilder implements Runnable {
         return newGridTable;
     }
 
-    private Pair<BitSet, BitSet> getDimensionAndMetricColumBitSet(long cuboidId) {
+    private Pair<BitSet, BitSet> getDimensionAndMetricColumnBitSet(long cuboidId) {
         BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
         BitSet dimension = new BitSet();
         dimension.set(0, bitSet.cardinality());
         BitSet metrics = new BitSet();
-        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.measureNumber);
+        metrics.set(bitSet.cardinality(), bitSet.cardinality() + this.desc.getMeasures().size());
         return new Pair<BitSet, BitSet>(dimension, metrics);
     }
 
-    private Object[] buildKey(List<String> row, DataTypeSerializer[] serializers) {
+    private Object[] buildKey(List<String> row) {
         int keySize = intermediateTableDesc.getRowKeyColumnIndexes().length;
         Object[] key = new Object[keySize];
 
@@ -280,7 +277,8 @@ public class InMemCubeBuilder implements Runnable {
     @Override
     public void run() {
         try {
-            createBaseCuboidGT();
+            logger.info("Create base cuboid " + baseCuboidId);
+            final GridTable baseCuboidGT = newGridTableByCuboidID(baseCuboidId, true);
 
             GTBuilder baseGTBuilder = baseCuboidGT.rebuild();
             final GTRecord baseGTRecord = new GTRecord(baseCuboidGT.getInfo());
@@ -339,7 +337,7 @@ public class InMemCubeBuilder implements Runnable {
                 }
             };
 
-            Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumBitSet(baseCuboidId);
+            Pair<BitSet, BitSet> dimensionMetricsBitSet = getDimensionAndMetricColumnBitSet(baseCuboidId);
             GTScanRequest req = new GTScanRequest(baseCuboidGT.getInfo(), null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
             IGTScanner aggregationScanner = new GTAggregateScanner(queueScanner, req);
 
@@ -352,8 +350,18 @@ public class InMemCubeBuilder implements Runnable {
             aggregationScanner.close();
 
             logger.info("Base cuboid has " + counter + " rows;");
-            if (counter > 0)
-                createNDCuboidGT(null, -1l, baseCuboidId);
+            SimpleGridTableTree tree = new SimpleGridTableTree();
+            tree.data = baseCuboidGT;
+            tree.id = baseCuboidId;
+            tree.parent = null;
+            if (counter > 0) {
+                List<Long> children = cuboidScheduler.getSpanningCuboid(baseCuboidId);
+                Collections.sort(children);
+                for (Long childId : children) {
+                    createNDCuboidGT(tree, baseCuboidId, childId);
+                }
+            }
+            baseCuboidGT.getStore().drop();
 
         } catch (IOException e) {
             logger.error("Fail to build cube", e);
@@ -364,7 +372,7 @@ public class InMemCubeBuilder implements Runnable {
 
     private void buildGTRecord(List<String> row, GTRecord record) {
 
-        Object[] dimensions = buildKey(row, serializers);
+        Object[] dimensions = buildKey(row);
         Object[] metricsValues = buildValue(row);
         Object[] recordValues = new Object[dimensions.length + metricsValues.length];
         System.arraycopy(dimensions, 0, recordValues, 0, dimensions.length);
@@ -372,43 +380,84 @@ public class InMemCubeBuilder implements Runnable {
         record.setValues(recordValues);
     }
 
-    private void createBaseCuboidGT() throws IOException {
-
-        logger.info("Create base cuboid " + baseCuboidId);
-        Cuboid baseCuboid = Cuboid.findById(this.desc, baseCuboidId);
-        serializers = new DataTypeSerializer[baseCuboid.getColumns().size()];
-
-        for (int i = 0; i < baseCuboid.getColumns().size(); i++) {
-            serializers[i] = DataTypeSerializer.create(baseCuboid.getColumns().get(i).getType());
+    private boolean checkMemory(long threshold) {
+        final long freeMemory = Runtime.getRuntime().freeMemory();
+        logger.info("available memory:" + (freeMemory>>10) + " KB");
+        if (freeMemory >= threshold) {
+            logger.info("no need to flush to disk");
+            return true;
+        } else {
+            return false;
         }
-
-        this.baseCuboidGT = newGridTableByCuboidID(baseCuboidId);
     }
 
+    private boolean gc(TreeNode<GridTable> parentNode) {
+        final long parentCuboidMem = SizeOfUtil.deepSizeOf(parentNode.data.getStore());
+        long threshold = (long) (parentCuboidMem * (AGGREGATION_CACHE_FACTOR + 1));
+        logger.info((threshold >> 10) + " KB is needed to create " + parentNode.id + "'s child");
+        if (checkMemory(threshold)) {
+            return true;
+        }
+        final List<TreeNode<GridTable>> gridTables = parentNode.getAncestorList();
+        for (TreeNode<GridTable> gridTable : gridTables) {
+            logger.info("wait 10 seconds for gc");
+            try {
+                Thread.sleep(10 * 1000);
+            } catch (InterruptedException e) {
+                logger.error("this should not happen", e);
+            }
+            if (checkMemory(threshold)) {
+                return true;
+            } else {
+                logger.info("memory is low, try to select one node to flush to disk from:" + StringUtils.join(",", gridTables));
+                final IGTStore store = gridTable.data.getStore();
+                assert store instanceof GTComboStore;
+                if (store.memoryUsage() > 0) {
+                    logger.info("cuboid id:" + gridTable.id + " selected, memory used:" + (SizeOfUtil.deepSizeOf(store)>>10) + " KB");
+                    long t = System.currentTimeMillis();
+                    ((GTComboStore) store).switchToDiskStore();
+                    logger.info("switch to disk store cost:" + (System.currentTimeMillis() - t) + "ms");
+                }
+            }
+        }
+        logger.info("no store has been flushed to disk");
+        return true;
+    }
 
-    private void createNDCuboidGT(GridTable parentCuboid, long parentCuboidId, long cuboidId) throws IOException {
+    private void createNDCuboidGT(SimpleGridTableTree parentNode, long parentCuboidId, long cuboidId) throws IOException {
 
-        GridTable thisCuboid;
         long startTime = System.currentTimeMillis();
-        if (parentCuboidId < 0) {
-            thisCuboid = this.baseCuboidGT;
-        } else {
-            thisCuboid = aggregateCuboid(parentCuboid, parentCuboidId, cuboidId);
+        assert parentNode.data.getStore() instanceof GTComboStore;
+        if (parentNode.data.getStore().memoryUsage() <= 0) {
+            long t = System.currentTimeMillis();
+            ((GTComboStore) parentNode.data.getStore()).switchToMemStore();
+            logger.info("switch to mem store cost:" + (System.currentTimeMillis() - t) + "ms");
         }
 
-        logger.info("Cuboid " + cuboidId + " build takes (second): " + (System.currentTimeMillis() - startTime) / 1000);
-
-        ArrayList<Long> children = (ArrayList<Long>) cuboidScheduler.getSpanningCuboid(cuboidId);
-        Collections.sort(children); // sort cuboids
-        for (Long childId : children) {
-            createNDCuboidGT(thisCuboid, cuboidId, childId);
+        boolean inMem = gc(parentNode);
+        GridTable currentCuboid = aggregateCuboid(parentNode.data, parentCuboidId, cuboidId, inMem);
+        SimpleGridTableTree node = new SimpleGridTableTree();
+        node.parent = parentNode;
+        node.data = currentCuboid;
+        node.id = cuboidId;
+        parentNode.children.add(node);
+
+        logger.info("Cuboid " + cuboidId + " build takes " + (System.currentTimeMillis() - startTime) + "ms");
+
+        List<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+        if (!children.isEmpty()) {
+            Collections.sort(children); // sort cuboids
+            for (Long childId : children) {
+                createNDCuboidGT(node, cuboidId, childId);
+            }
         }
 
-
         startTime = System.currentTimeMillis();
         //output the grid table
-        outputGT(cuboidId, thisCuboid);
-        logger.info("Cuboid" + cuboidId + " output takes (second) " + (System.currentTimeMillis() - startTime) / 1000);
+        outputGT(cuboidId, currentCuboid);
+        currentCuboid.getStore().drop();
+        parentNode.children.remove(node);
+        logger.info("Cuboid" + cuboidId + " output takes " + (System.currentTimeMillis() - startTime) + "ms");
 
     }
 
@@ -419,4 +468,30 @@ public class InMemCubeBuilder implements Runnable {
             this.gtRecordWriter.write(cuboidId, record);
         }
     }
+
+    private static class TreeNode<T> {
+        T data;
+        long id;
+        TreeNode<T> parent;
+        List<TreeNode<T>> children = Lists.newArrayList();
+
+        List<TreeNode<T>> getAncestorList() {
+            ArrayList<TreeNode<T>> result = Lists.newArrayList();
+            TreeNode<T> parent = this.parent;
+            while (parent != null) {
+                result.add(parent);
+                parent = parent.parent;
+            }
+            return Lists.reverse(result);
+        }
+
+        @Override
+        public String toString() {
+            return id + "";
+        }
+    }
+
+    private static class SimpleGridTableTree extends TreeNode<GridTable> {}
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
index fc165ea..db690b9 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidJob.java
@@ -104,7 +104,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             // set job configuration
             job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
             job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_NAME, segmentName);
-            long timeout = 1000*60*60l; // 1 hour
+            long timeout = 1000*60*60L; // 1 hour
             job.getConfiguration().set("mapred.task.timeout", String.valueOf(timeout));
             Configuration conf = HBaseConfiguration.create(getConf());
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
index 41237d7..3ba80d1 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/MapContextGTRecordWriter.java
@@ -72,7 +72,7 @@ public class MapContextGTRecordWriter implements IGTRecordWriter {
         try {
             mapContext.write(outputKey, outputValue);
         } catch (InterruptedException e) {
-            throw new IOException(e);
+            throw new RuntimeException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
index ac6f409..739cde4 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/serializer/DataTypeSerializer.java
@@ -20,8 +20,11 @@ package org.apache.kylin.metadata.serializer;
 
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.metadata.model.DataType;
 
@@ -31,20 +34,23 @@ import org.apache.kylin.metadata.model.DataType;
  */
 abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
 
-    final static HashMap<String, Class<?>> implementations = new HashMap<String, Class<?>>();
+    final static Map<String, Class<?>> implementations;
     static {
-        implementations.put("varchar", StringSerializer.class);
-        implementations.put("decimal", BigDecimalSerializer.class);
-        implementations.put("double", DoubleSerializer.class);
-        implementations.put("float", DoubleSerializer.class);
-        implementations.put("bigint", LongSerializer.class);
-        implementations.put("long", LongSerializer.class);
-        implementations.put("integer", LongSerializer.class);
-        implementations.put("int", LongSerializer.class);
-        implementations.put("smallint", LongSerializer.class);
-        implementations.put("date", DateTimeSerializer.class);
-        implementations.put("datetime", DateTimeSerializer.class);
-        implementations.put("timestamp", DateTimeSerializer.class);
+        HashMap<String, Class<?>> impl = Maps.newHashMap();
+        impl.put("varchar", StringSerializer.class);
+        impl.put("decimal", BigDecimalSerializer.class);
+        impl.put("double", DoubleSerializer.class);
+        impl.put("float", DoubleSerializer.class);
+        impl.put("bigint", LongSerializer.class);
+        impl.put("long", LongSerializer.class);
+        impl.put("integer", LongSerializer.class);
+        impl.put("int", LongSerializer.class);
+        impl.put("smallint", LongSerializer.class);
+        impl.put("date", DateTimeSerializer.class);
+        impl.put("datetime", DateTimeSerializer.class);
+        impl.put("timestamp", DateTimeSerializer.class);
+        implementations = Collections.unmodifiableMap(impl);
+
     }
 
     public static DataTypeSerializer<?> create(String dataType) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 70567e5..9b7550f 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -8,6 +8,7 @@ import java.util.Map;
 
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dict.Dictionary;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
index f721148..0784587 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeHBaseReadonlyStore.java
@@ -49,13 +49,8 @@ public class CubeHBaseReadonlyStore implements IGTStore {
     }
 
     @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public String getStorageDescription() {
-        return cubeSeg.toString();
+    public long memoryUsage() {
+        return 0;
     }
 
     @Override
@@ -127,6 +122,11 @@ public class CubeHBaseReadonlyStore implements IGTStore {
         };
     }
 
+    @Override
+    public void drop() throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
     private Scan buildScan(ByteArray pkStart, ByteArray pkEnd, List<Pair<byte[], byte[]>> selectedColumns) {
         Scan scan = new Scan();
         scan.setCaching(SCAN_CACHE);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
index 7195e7f..7552ab3 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTBuilder.java
@@ -51,7 +51,6 @@ public class GTBuilder implements Closeable, Flushable {
         blockWriter.readyForFlush();
         storeWriter.write(block);
         writtenRowBlockCount++;
-
         if (block.isFull()) {
             blockWriter.clearForNext();
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
new file mode 100644
index 0000000..c7d0c2b
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTComboStore.java
@@ -0,0 +1,112 @@
+package org.apache.kylin.storage.gridtable;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.storage.gridtable.diskstore.GTDiskStore;
+import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+/**
+ * Created by qianzhou on 5/6/15.
+ */
+public class GTComboStore implements IGTStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(GTComboStore.class);
+
+    private final GTInfo gtInfo;
+
+    private void convert(IGTStore input, IGTStore output) throws IOException {
+        final IGTStoreScanner scanner = input.scan(ScanKey.makeScanKey(gtInfo, new GTRecord(gtInfo)), ScanKey.makeScanKey(gtInfo, new GTRecord(gtInfo)), null, null);
+        final IGTStoreWriter writer = output.rebuild(-1);
+        while (scanner.hasNext()) {
+            writer.write(scanner.next());
+        }
+    }
+
+    private GTDiskStore gtDiskStore;
+    private GTSimpleMemStore gtSimpleMemStore;
+
+    public GTComboStore(GTInfo gtInfo) {
+        this(gtInfo, true);
+    }
+
+    public GTComboStore(GTInfo gtInfo, boolean useMemStore) {
+        this.gtInfo = gtInfo;
+        if (useMemStore) {
+            this.gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
+        } else {
+            this.gtDiskStore = new GTDiskStore(gtInfo);
+        }
+    }
+
+    private IGTStore getCurrent() {
+        if (gtSimpleMemStore != null) {
+            return gtSimpleMemStore;
+        } else {
+            return gtDiskStore;
+        }
+    }
+
+    public void switchToMemStore() {
+        try {
+            if (gtSimpleMemStore == null) {
+                gtSimpleMemStore = new GTSimpleMemStore(gtInfo);
+                convert(gtDiskStore, gtSimpleMemStore);
+                gtDiskStore.drop();
+                gtDiskStore = null;
+            }
+        } catch (IOException e) {
+            logger.error("fail to switch to mem store", e);
+            throw new RuntimeException(e);
+        }
+        logger.info("switch to mem store");
+    }
+
+    public void switchToDiskStore() {
+        try {
+            if (gtDiskStore == null) {
+                gtDiskStore = new GTDiskStore(gtInfo);
+                convert(gtSimpleMemStore, gtDiskStore);
+                gtSimpleMemStore.drop();
+                gtSimpleMemStore = null;
+            }
+        } catch (IOException e) {
+            logger.error("fail to switch to disk store", e);
+            throw new RuntimeException(e);
+        }
+        logger.info("switch to disk store");
+    }
+
+    @Override
+    public long memoryUsage() {
+        return getCurrent().memoryUsage();
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return getCurrent().rebuild(shard);
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return getCurrent().append(shard, fillLast);
+    }
+
+    @Override
+    public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        return getCurrent().scan(pkStart, pkEnd, selectedColBlocks, additionalPushDown);
+    }
+
+    @Override
+    public void drop() throws IOException {
+        if (gtSimpleMemStore != null) {
+            gtSimpleMemStore.drop();
+        }
+        if (gtDiskStore != null) {
+            gtDiskStore.drop();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index 954e464..fdabb60 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -17,7 +17,6 @@ public class GTInfo {
     IGTCodeSystem codeSystem;
 
     // column schema
-    int nColumns;
     DataType[] colTypes;
     BitSet colAll;
     BitSet colPreferIndex;
@@ -37,7 +36,7 @@ public class GTInfo {
     }
 
     public int getColumnCount() {
-        return nColumns;
+        return colTypes.length;
     }
     
     public DataType getColumnType(int i) {
@@ -74,7 +73,7 @@ public class GTInfo {
     
     public int getMaxColumnLength() {
         int max = 0;
-        for (int i = 0; i < nColumns; i++)
+        for (int i = 0; i < colTypes.length; i++)
             max = Math.max(max, codeSystem.maxCodeLength(i));
         return max;
     }
@@ -95,7 +94,7 @@ public class GTInfo {
 
     public TblColRef colRef(int i) {
         if (colRefs == null) {
-            colRefs = new TblColRef[nColumns];
+            colRefs = new TblColRef[colTypes.length];
         }
         if (colRefs[i] == null) {
             colRefs[i] = GTUtil.tblColRef(i, colTypes[i].toString());
@@ -124,7 +123,7 @@ public class GTInfo {
 
     private void validateColumnBlocks() {
         colAll = new BitSet();
-        colAll.flip(0, nColumns);
+        colAll.flip(0, colTypes.length);
         
         if (colBlocks == null) {
             colBlocks = new BitSet[2];
@@ -185,7 +184,6 @@ public class GTInfo {
 
         /** required */
         public Builder setColumns(DataType... colTypes) {
-            info.nColumns = colTypes.length;
             info.colTypes = colTypes;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
index 7c6abec..2756659 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInvertedIndex.java
@@ -34,7 +34,7 @@ public class GTInvertedIndex {
         this.colPreferIndex = info.colPreferIndex;
         this.colBlocks = info.selectColumnBlocks(colPreferIndex);
 
-        index = new GTInvertedIndexOfColumn[info.nColumns];
+        index = new GTInvertedIndexOfColumn[info.getColumnCount()];
         for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) {
             index[i] = new GTInvertedIndexOfColumn(info.codeSystem.getFilterCodeSystem());
         }
@@ -43,7 +43,7 @@ public class GTInvertedIndex {
     public void add(GTRowBlock block) {
 
         @SuppressWarnings("unchecked")
-        Set<ByteArray>[] distinctValues = new Set[info.nColumns];
+        Set<ByteArray>[] distinctValues = new Set[info.getColumnCount()];
         for (int i = colPreferIndex.nextSetBit(0); i >= 0; i = colPreferIndex.nextSetBit(i + 1)) {
             distinctValues[i] = new HashSet<ByteArray>();
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
index 895ccf3..ff97cd5 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRawScanner.java
@@ -24,34 +24,14 @@ public class GTRawScanner implements IGTScanner {
     public GTRawScanner(GTInfo info, IGTStore store, GTScanRequest req) throws IOException {
         this.info = info;
 
-        ByteArray start = makeScanKey(req.getPkStart());
-        ByteArray end = makeScanKey(req.getPkEnd());
+        ByteArray start = ScanKey.makeScanKey(info, req.getPkStart());
+        ByteArray end = ScanKey.makeScanKey(info, req.getPkEnd());
         this.selectedColBlocks = info.selectColumnBlocks(req.getColumns());
 
         this.storeScanner = store.scan(start, end, selectedColBlocks, req);
         this.oneRecord = new GTRecord(info);
     }
 
-    private ByteArray makeScanKey(GTRecord rec) {
-        int firstPKCol = info.primaryKey.nextSetBit(0);
-        if (rec == null || rec.cols[firstPKCol].array() == null)
-            return null;
-
-        BitSet selectedColumns = new BitSet();
-        int len = 0;
-        for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
-            if (rec.cols[i].array() == null) {
-                break;
-            }
-            selectedColumns.set(i);
-            len += rec.cols[i].length();
-        }
-
-        ByteArray buf = ByteArray.allocate(len);
-        rec.exportColumns(selectedColumns, buf);
-        return buf;
-    }
-
     @Override
     public GTInfo getInfo() {
         return info;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
index 8516f05..2a38731 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
@@ -16,7 +16,7 @@ public class GTRecord implements Comparable<GTRecord> {
 
     public GTRecord(GTInfo info) {
         this.info = info;
-        this.cols = new ByteArray[info.nColumns];
+        this.cols = new ByteArray[info.getColumnCount()];
         for (int i = 0; i < cols.length; i++)
             this.cols[i] = new ByteArray();
         this.maskForEqualHashComp = info.colAll;
@@ -55,7 +55,7 @@ public class GTRecord implements Comparable<GTRecord> {
 
     /** decode and return the values of this record */
     public Object[] getValues() {
-        return getValues(info.colAll, new Object[info.nColumns]);
+        return getValues(info.colAll, new Object[info.getColumnCount()]);
     }
 
     /** decode and return the values of this record */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
index 7f79948..ec24da6 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTRowBlock.java
@@ -1,5 +1,7 @@
 package org.apache.kylin.storage.gridtable;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.BitSet;
 
@@ -54,7 +56,7 @@ public class GTRowBlock {
     public Writer getWriter() {
         return new Writer();
     }
-    
+
     public class Writer {
         ByteBuffer[] cellBlockBuffers;
         
@@ -181,6 +183,21 @@ public class GTRowBlock {
         return len;
     }
 
+    public void export(DataOutputStream dataOutputStream) throws IOException {
+        dataOutputStream.writeInt(seqId);
+        dataOutputStream.writeInt(nRows);
+        export(dataOutputStream, primaryKey);
+        for (ByteArray cb : cellBlocks) {
+            export(dataOutputStream, cb);
+        }
+    }
+
+    public void export(DataOutputStream dataOutputStream, ByteArray array) throws IOException {
+        dataOutputStream.writeInt(array.length());
+        dataOutputStream.write(array.array(), array.offset(), array.length());
+    }
+
+
     /** write data to given buffer, like serialize */
     public void export(ByteBuffer buf) {
         buf.putInt(seqId);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
index cb8698c..9c758fa 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
@@ -3,6 +3,7 @@ package org.apache.kylin.storage.gridtable;
 import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.metadata.filter.IFilterCodeSystem;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
@@ -30,8 +31,8 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
     public void init(GTInfo info) {
         this.info = info;
 
-        this.serializers = new DataTypeSerializer[info.nColumns];
-        for (int i = 0; i < info.nColumns; i++) {
+        this.serializers = new DataTypeSerializer[info.getColumnCount()];
+        for (int i = 0; i < info.getColumnCount(); i++) {
             this.serializers[i] = DataTypeSerializer.create(info.colTypes[i]);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
index 285a301..3d3c3c8 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTScanner.java
@@ -4,10 +4,10 @@ import java.io.Closeable;
 
 public interface IGTScanner extends Iterable<GTRecord>, Closeable {
     
-    public GTInfo getInfo();
+    GTInfo getInfo();
     
-    public int getScannedRowCount();
+    int getScannedRowCount();
     
-    public int getScannedRowBlockCount();
+    int getScannedRowBlockCount();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
index f5eb077..0152571 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTStore.java
@@ -8,22 +8,22 @@ import java.util.Iterator;
 import org.apache.kylin.common.util.ByteArray;
 
 public interface IGTStore {
+
+    long memoryUsage();
     
-    public GTInfo getInfo();
-    
-    public String getStorageDescription();
-    
-    public IGTStoreWriter rebuild(int shard) throws IOException;
+    IGTStoreWriter rebuild(int shard) throws IOException;
     
-    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
+    IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException;
     
-    public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
+    IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException;
+
+    void drop() throws IOException;
     
-    public interface IGTStoreWriter extends Closeable {
+    interface IGTStoreWriter extends Closeable {
         void write(GTRowBlock block) throws IOException;
     }
     
-    public interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
+    interface IGTStoreScanner extends Iterator<GTRowBlock>, Closeable {
     }
     
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java
new file mode 100644
index 0000000..5c0c436
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/ScanKey.java
@@ -0,0 +1,34 @@
+package org.apache.kylin.storage.gridtable;
+
+import org.apache.kylin.common.util.ByteArray;
+
+import java.util.BitSet;
+
+/**
+ * Created by qianzhou on 5/6/15.
+ */
+public final class ScanKey {
+
+    private ScanKey() {
+    }
+
+    static ByteArray makeScanKey(GTInfo info, GTRecord rec) {
+        int firstPKCol = info.primaryKey.nextSetBit(0);
+        if (rec == null || rec.cols[firstPKCol].array() == null)
+            return null;
+
+        BitSet selectedColumns = new BitSet();
+        int len = 0;
+        for (int i = info.primaryKey.nextSetBit(0); i >= 0; i = info.primaryKey.nextSetBit(i + 1)) {
+            if (rec.cols[i].array() == null) {
+                break;
+            }
+            selectedColumns.set(i);
+            len += rec.cols[i].length();
+        }
+
+        ByteArray buf = ByteArray.allocate(len);
+        rec.exportColumns(selectedColumns, buf);
+        return buf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
new file mode 100644
index 0000000..2ab2c7e
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/FileSystem.java
@@ -0,0 +1,22 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Created by qianzhou on 5/4/15.
+ */
+interface FileSystem {
+
+    boolean checkExistence(String path);
+
+    boolean delete(String path);
+
+    boolean createDirectory(String path);
+
+    boolean createFile(String path);
+
+    OutputStream getWriter(String path);
+
+    InputStream getReader(String path);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
new file mode 100644
index 0000000..f48fce3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/GTDiskStore.java
@@ -0,0 +1,160 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRowBlock;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.IGTStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.UUID;
+
+/**
+ * Created by qianzhou on 5/4/15.
+ */
+public class GTDiskStore implements IGTStore {
+
+    private static final Logger logger = LoggerFactory.getLogger(GTDiskStore.class);
+
+    private final String identifier;
+    private final FileSystem fileSystem;
+    private final DiskStoreWriter writer;
+    private final GTInfo gtInfo;
+
+    public GTDiskStore(GTInfo gtInfo) {
+        this.gtInfo = gtInfo;
+        this.fileSystem = new LocalFileSystem();
+        this.identifier = generateIdentifier(fileSystem);
+        logger.info("disk store created, identifier:" + identifier);
+        this.writer = new DiskStoreWriter(fileSystem.getWriter(getRowBlockFile(identifier)));
+    }
+
+    private String generateIdentifier(FileSystem fs) {
+        while (true) {
+            String identifier = UUID.randomUUID().toString();
+            final String path = getRootDirectory(identifier);
+            if (fs.createDirectory(path)) {
+                return identifier;
+            }
+        }
+    }
+
+    private String getRootDirectory(String identifier) {
+        return "/tmp/kylin/gtdiskstore/" + identifier;
+    }
+
+    private String getRowBlockFile(String identifier) {
+        return getRootDirectory(identifier) + "/rowblock";
+    }
+
+    private class DiskStoreWriter implements IGTStoreWriter {
+
+        private final DataOutputStream outputStream;
+
+        DiskStoreWriter(OutputStream outputStream) {
+            this.outputStream = new DataOutputStream(outputStream);
+        }
+
+        @Override
+        public void write(GTRowBlock block) throws IOException {
+            final int blockSize = block.exportLength();
+            outputStream.writeInt(blockSize);
+            block.export(outputStream);
+            outputStream.flush();
+        }
+
+        @Override
+        public void close() throws IOException {
+            outputStream.close();
+        }
+    }
+
+    @Override
+    public long memoryUsage() {
+        return 0;
+    }
+
+    @Override
+    public IGTStoreWriter rebuild(int shard) throws IOException {
+        return writer;
+    }
+
+    @Override
+    public IGTStoreWriter append(int shard, GTRowBlock.Writer fillLast) throws IOException {
+        return writer;
+    }
+
+    private class DiskStoreScanner implements IGTStoreScanner {
+
+        private final DataInputStream inputStream;
+        private int blockSize = 0;
+
+        DiskStoreScanner(InputStream inputStream) {
+            this.inputStream = new DataInputStream(inputStream);
+        }
+
+        @Override
+        public void close() throws IOException {
+            inputStream.close();
+        }
+
+        @Override
+        public boolean hasNext() {
+            try {
+                blockSize = inputStream.readInt();
+                return blockSize > 0;
+            } catch (EOFException e) {
+                return false;
+            } catch (IOException e) {
+                logger.error("input stream fail", e);
+                throw new RuntimeException(e);
+            }
+        }
+
+        @Override
+        public GTRowBlock next() {
+            GTRowBlock block = new GTRowBlock(gtInfo);
+            ByteBuffer buffer = ByteBuffer.allocate(blockSize);
+            int count = blockSize;
+            while (count > 0) {
+                try {
+                    count -= inputStream.read(buffer.array(), buffer.position(), buffer.remaining());
+                } catch (IOException e) {
+                    logger.error("input stream fail", e);
+                    throw new RuntimeException(e);
+                }
+            }
+            Preconditions.checkArgument(count == 0, "invalid read count:" + count + " block size:" + blockSize);
+            block.load(buffer);
+            return block;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    @Override
+    public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) throws IOException {
+        return new DiskStoreScanner(fileSystem.getReader(getRowBlockFile(identifier)));
+    }
+
+    @Override
+    public void drop() throws IOException {
+        try {
+            writer.close();
+        } catch (Exception e) {
+            logger.error("error to close writer", e);
+        }
+        fileSystem.delete(getRowBlockFile(identifier));
+        fileSystem.delete(getRootDirectory(identifier));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
new file mode 100644
index 0000000..e1efd1b
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/HadoopFileSystem.java
@@ -0,0 +1,88 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Created by qianzhou on 5/6/15.
+ */
+public class HadoopFileSystem implements FileSystem {
+
+    private static final Logger logger = LoggerFactory.getLogger(HadoopFileSystem.class);
+
+    final org.apache.hadoop.fs.FileSystem fileSystem;
+
+    public HadoopFileSystem() {
+        try {
+            fileSystem = org.apache.hadoop.fs.FileSystem.get(HadoopUtil.getCurrentConfiguration());
+        } catch (IOException e) {
+            logger.error("error construct HadoopFileSystem", e);
+            throw new RuntimeException(e);
+        }
+    }
+    @Override
+    public boolean checkExistence(String path) {
+        try {
+            return fileSystem.exists(new Path(path));
+        } catch (IOException e) {
+            logger.error("error checkExistence, path:" + path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean delete(String path) {
+        try {
+            return fileSystem.delete(new Path(path), true);
+        } catch (IOException e) {
+            logger.error("error delete, path:" + path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean createDirectory(String path) {
+        try {
+            return fileSystem.mkdirs(new Path(path));
+        } catch (IOException e) {
+            logger.error("error createDirectory, path:" + path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean createFile(String path) {
+        try {
+            return fileSystem.createNewFile(new Path(path));
+        } catch (IOException e) {
+            logger.error("error createFile, path:" + path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public OutputStream getWriter(String path) {
+        try {
+            return fileSystem.create(new Path(path));
+        } catch (IOException e) {
+            logger.error("error getWriter, path:" + path, e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStream getReader(String path) {
+        try {
+            return fileSystem.open(new Path(path));
+        } catch (IOException e) {
+            logger.error("error getReader, path:" + path, e);
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
new file mode 100644
index 0000000..1c14e3f
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/diskstore/LocalFileSystem.java
@@ -0,0 +1,60 @@
+package org.apache.kylin.storage.gridtable.diskstore;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+
+/**
+ * Created by qianzhou on 5/4/15.
+ */
+class LocalFileSystem implements FileSystem {
+
+    private static Logger logger = LoggerFactory.getLogger(LocalFileSystem.class);
+    @Override
+    public boolean checkExistence(String path) {
+        return new File(path).exists();
+    }
+
+    @Override
+    public boolean delete(String path) {
+        return new File(path).delete();
+    }
+
+    @Override
+    public boolean createDirectory(String path) {
+        return new File(path).mkdirs();
+    }
+
+    @Override
+    public boolean createFile(String path) {
+        try {
+            return new File(path).createNewFile();
+        } catch (IOException e) {
+            logger.warn("create file failed:" + path, e);
+            return false;
+        }
+    }
+
+    @Override
+    public OutputStream getWriter(String path) {
+        try {
+            return new FileOutputStream(path);
+        } catch (FileNotFoundException e) {
+            //should not happen
+            logger.error("path:" + path + " nout found");
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public InputStream getReader(String path) {
+        try {
+            return new FileInputStream(path);
+        } catch (FileNotFoundException e) {
+            //should not happen
+            logger.error("path:" + path + " nout found");
+            throw new RuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
index 32c7f36..329c048 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/memstore/GTSimpleMemStore.java
@@ -1,44 +1,41 @@
 package org.apache.kylin.storage.gridtable.memstore;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.Lists;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.storage.gridtable.GTInfo;
 import org.apache.kylin.storage.gridtable.GTRowBlock;
 import org.apache.kylin.storage.gridtable.GTScanRequest;
 import org.apache.kylin.storage.gridtable.IGTStore;
 
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+
 public class GTSimpleMemStore implements IGTStore {
 
-    final GTInfo info;
     final List<GTRowBlock> rowBlockList;
 
     public GTSimpleMemStore(GTInfo info) {
-        this.info = info;
-        this.rowBlockList = new ArrayList<GTRowBlock>();
+        this.rowBlockList = Lists.newLinkedList();
 
         if (info.isShardingEnabled())
             throw new UnsupportedOperationException();
     }
 
     @Override
-    public GTInfo getInfo() {
-        return info;
-    }
-
-    @Override
-    public String getStorageDescription() {
-        return this.toString();
+    public long memoryUsage() {
+        if (rowBlockList.size() == 0) {
+            return 0;
+        } else {
+            return rowBlockList.get(0).exportLength() * Long.valueOf(rowBlockList.size());
+        }
     }
 
     @Override
     public IGTStoreWriter rebuild(int shard) {
         rowBlockList.clear();
-        return new Writer();
+        return new Writer(rowBlockList);
     }
 
     @Override
@@ -47,10 +44,16 @@ public class GTSimpleMemStore implements IGTStore {
             GTRowBlock last = rowBlockList.get(rowBlockList.size() - 1);
             fillLast.copyFrom(last);
         }
-        return new Writer();
+        return new Writer(rowBlockList);
     }
 
-    private class Writer implements IGTStoreWriter {
+    private static class Writer implements IGTStoreWriter {
+
+        private final List<GTRowBlock> rowBlockList;
+
+        Writer(List<GTRowBlock> rowBlockList) {
+            this.rowBlockList = rowBlockList;
+        }
         @Override
         public void close() throws IOException {
         }
@@ -66,7 +69,7 @@ public class GTSimpleMemStore implements IGTStore {
                 rowBlockList.add(copy);
             }
         }
-    };
+    }
 
     @Override
     public IGTStoreScanner scan(ByteArray pkStart, ByteArray pkEnd, BitSet selectedColBlocks, GTScanRequest additionalPushDown) {
@@ -95,4 +98,10 @@ public class GTSimpleMemStore implements IGTStore {
         };
     }
 
+    @Override
+    public void drop() throws IOException {
+        //will there be any concurrent issue? If yes, ArrayList should be replaced
+        rowBlockList.clear();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java b/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java
new file mode 100644
index 0000000..d390415
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/util/SizeOfUtil.java
@@ -0,0 +1,21 @@
+package org.apache.kylin.storage.util;
+
+import net.sf.ehcache.pool.sizeof.ReflectionSizeOf;
+
+/**
+ * Created by qianzhou on 5/11/15.
+ */
+public final class SizeOfUtil {
+
+    private SizeOfUtil(){}
+
+    private static final ReflectionSizeOf DEFAULT_SIZE_OF = new ReflectionSizeOf();
+
+    public static final long deepSizeOf(Object obj) {
+        return DEFAULT_SIZE_OF.deepSizeOf(Integer.MAX_VALUE, true, obj).getCalculated();
+    }
+
+    public static final long sizeOf(Object obj) {
+        return DEFAULT_SIZE_OF.sizeOf(obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0d87e8f6/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index 773425b..7c1d435 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -71,10 +71,6 @@ public class IIStreamBuilder extends StreamBuilder {
     private StreamingManager streamingManager;
 
     public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard) {
-        this(queue, streaming, hTableName, iiDesc, shard, true);
-    }
-
-    public IIStreamBuilder(BlockingQueue<StreamMessage> queue, String streaming, String hTableName, IIDesc iiDesc, int shard, boolean useLocalDict) {
         super(queue, iiDesc.getSliceSize());
         this.streaming = streaming;
         this.desc = iiDesc;
@@ -86,7 +82,7 @@ public class IIStreamBuilder extends StreamBuilder {
             logger.error("cannot open htable name:" + hTableName, e);
             throw new RuntimeException("cannot open htable name:" + hTableName, e);
         }
-        this.sliceBuilder = new SliceBuilder(desc, (short) shard, useLocalDict);
+        this.sliceBuilder = new SliceBuilder(desc, (short) shard, iiDesc.isUseLocalDictionary());
         this.streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
     }
 


Mime
View raw message