kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qhz...@apache.org
Subject incubator-kylin git commit: KYLIN-941 1. make MeasureAggregators implements Serializable to avoid HBase Connection inside the closure 2. another implementation to write HFile to minimize shuffle between partitions
Date Fri, 18 Sep 2015 03:34:37 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging 0d5f65572 -> df03c53fb


KYLIN-941
1. make MeasureAggregators implements Serializable to avoid HBase Connection inside the closure
2. another implementation to write HFile to minimize shuffle between
partitions


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/df03c53f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/df03c53f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/df03c53f

Branch: refs/heads/2.x-staging
Commit: df03c53fbfaa9b721755906bab5659807c7dec4d
Parents: 0d5f655
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Sun Sep 6 19:47:22 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Fri Sep 18 11:24:45 2015 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/DoggedCubeBuilder.java     |   1 +
 .../kylin/cube/inmemcubing/ICuboidWriter.java   |   6 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |  12 +-
 .../DoggedCubeBuilderStressTest.java            |   5 +
 .../cube/inmemcubing/DoggedCubeBuilderTest.java |   1 +
 .../cube/inmemcubing/InMemCubeBuilderTest.java  |   9 +
 .../java/org/apache/kylin/dict/Dictionary.java  |   3 +-
 .../apache/kylin/dict/TimeStrDictionary.java    |   8 +-
 .../org/apache/kylin/dict/TrieDictionary.java   |  45 ++-
 .../kylin/metadata/measure/DoubleMutable.java   |   4 +-
 .../kylin/metadata/measure/LongMutable.java     |   4 +-
 .../metadata/measure/MeasureAggregator.java     |   4 +-
 .../metadata/measure/MeasureAggregators.java    |  29 +-
 .../apache/kylin/metadata/model/ColumnDesc.java |   4 +-
 .../apache/kylin/metadata/model/DataType.java   |   3 +-
 .../kylin/metadata/model/DatabaseDesc.java      |   3 +-
 .../apache/kylin/metadata/model/TblColRef.java  |   4 +-
 .../mr/steps/MapContextGTRecordWriter.java      |   5 +
 .../apache/kylin/engine/spark/SparkCubing.java  | 332 ++++++++++---------
 .../kylin/engine/spark/SparkCuboidWriter.java   |  28 ++
 .../engine/spark/cube/BufferedCuboidWriter.java | 149 +++++++++
 .../spark/cube/DefaultTupleConverter.java       |  94 ++++++
 .../spark/cube/ListBackedCuboidWriter.java      |  58 ++++
 .../kylin/engine/spark/cube/TupleConverter.java |  28 ++
 .../kylin/engine/spark/util/IteratorUtils.java  |  79 +++++
 .../spark/cube/BufferedCuboidWriterTest.java    |  68 ++++
 .../engine/spark/util/IteratorUtilsTest.java    |  86 +++++
 .../storage/hbase/steps/HBaseCuboidWriter.java  |   5 +
 28 files changed, 873 insertions(+), 204 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index d565ab5..6882eb9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -117,6 +117,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 else
                     throw new IOException(e);
             } finally {
+                output.close();
                 closeGirdTables(splits);
                 logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
                 ensureExit(splits);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index c6bd248..9e26e5e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -17,10 +17,10 @@
 
 package org.apache.kylin.cube.inmemcubing;
 
-import java.io.IOException;
-
 import org.apache.kylin.gridtable.GTRecord;
 
+import java.io.IOException;
+
 /**
  */
 public interface ICuboidWriter {
@@ -28,4 +28,6 @@ public interface ICuboidWriter {
     void write(long cuboidId, GTRecord record) throws IOException;
 
     void flush();
+    
+    void close();
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index 662416e..2536250 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -132,9 +132,13 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     @Override
     public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
         ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
-        for (CuboidResult cuboidResult : result.values()) {
-            outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
-            cuboidResult.table.close();
+        try {
+            for (CuboidResult cuboidResult : result.values()) {
+                outputCuboid(cuboidResult.cuboidId, cuboidResult.table, output);
+                cuboidResult.table.close();
+            }
+        } finally {
+            output.close();
         }
     }
 
@@ -143,9 +147,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         build(input, new ICuboidCollector() {
             @Override
             public void collect(CuboidResult cuboidResult) {
+                logger.info("collecting CuboidResult cuboid id:" + cuboidResult.cuboidId);
                 result.put(cuboidResult.cuboidId, cuboidResult);
             }
         });
+        logger.info("total CuboidResult count:" + result.size());
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
index a737b9c..935e840 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -96,5 +96,10 @@ public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
         public void flush() {
 
         }
+
+        @Override
+        public void close() {
+            
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
index 50ba688..ed0a166 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
@@ -152,6 +152,7 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
 
         }
 
+        @Override
         public void close() {
             writer.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index 1c303e5..9f2e7d2 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -207,7 +207,16 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
         @Override
         public void flush() {
+            if (verbose) {
+                System.out.println("flush");
+            }
+        }
 
+        @Override
+        public void close() {
+            if (verbose) {
+                System.out.println("close");
+            }
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
index 7c484ff..a163f52 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.dict;
 
 import java.io.PrintStream;
+import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
 
 import org.apache.kylin.common.persistence.Writable;
@@ -39,7 +40,7 @@ import org.apache.kylin.common.util.BytesUtil;
  * 
  * @author yangli9
  */
-abstract public class Dictionary<T> implements Writable {
+abstract public class Dictionary<T> implements Writable, Serializable {
 
     public static final byte NULL = (byte) 0xff;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index 66ccfe2..9608265 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -12,8 +12,8 @@ import org.apache.kylin.common.util.DateFormat;
  */
 public class TimeStrDictionary extends Dictionary<String> {
 
-    private long maxid = Integer.MAX_VALUE;
-    private int maxLenghOfPositiveLong = 19;
+    private static final long MAX_ID = Integer.MAX_VALUE;
+    private static final int MAX_LENGTH_OF_POSITIVE_LONG = 19;
 
     @Override
     public int getMinId() {
@@ -32,7 +32,7 @@ public class TimeStrDictionary extends Dictionary<String> {
 
     @Override
     public int getSizeOfValue() {
-        return maxLenghOfPositiveLong;
+        return MAX_LENGTH_OF_POSITIVE_LONG;
     }
 
     @Override
@@ -40,7 +40,7 @@ public class TimeStrDictionary extends Dictionary<String> {
         long millis = DateFormat.stringToMillis(value);
         long seconds = millis / 1000;
 
-        if (seconds > maxid) {
+        if (seconds > MAX_ID) {
             return nullId();
         } else if (seconds < 0) {
             throw new IllegalArgumentException("Illegal value: " + value + ", parsed seconds: " + seconds);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 39a8e7e..3a05d0a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -18,22 +18,19 @@
 
 package org.apache.kylin.dict;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInput;
-import java.io.DataInputStream;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.lang.ref.SoftReference;
-import java.util.Arrays;
-import java.util.HashMap;
-
+import com.google.common.base.Preconditions;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ClassUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.*;
+import java.lang.ref.SoftReference;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+
 /**
  * A dictionary based on Trie data structure that maps enumerations of byte[] to
  * int IDs.
@@ -457,6 +454,24 @@ public class TrieDictionary<T> extends Dictionary<T> {
         init(all);
     }
 
+    private void writeObject(java.io.ObjectOutputStream stream)
+            throws IOException {
+        stream.writeInt(trieBytes.length);
+        stream.write(trieBytes);
+    }
+
+    private void readObject(java.io.ObjectInputStream stream)
+            throws IOException, ClassNotFoundException {
+        int length = stream.readInt();
+        byte[] trieBytes = new byte[length];
+        int currentCount;
+        int idx = 0;
+        while ((currentCount = stream.read(trieBytes, idx, length - idx)) > 0) {
+            idx += currentCount;
+        }
+        init(trieBytes);
+    }
+
     @Override
     public void dump(PrintStream out) {
         out.println("Total " + nValues + " values");
@@ -515,7 +530,15 @@ public class TrieDictionary<T> extends Dictionary<T> {
         b.print();
         TrieDictionary<String> dict = b.build(0);
 
-        dict.enableIdToValueBytesCache();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        new ObjectOutputStream(baos).writeObject(dict);
+        
+        TrieDictionary<String> dict2 = (TrieDictionary<String>) new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray())).readObject();
+        Preconditions.checkArgument(dict.contains(dict2));
+        Preconditions.checkArgument(dict2.contains(dict));
+        Preconditions.checkArgument(dict.equals(dict2));
+        
+        dict2.enableIdToValueBytesCache();
         for (int i = 0; i <= dict.getMaxId(); i++) {
             System.out.println(Bytes.toString(dict.getValueBytesFromId(i)));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
index 2d1b8bb..b4d781c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMutable.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.metadata.measure;
 
-public class DoubleMutable implements Comparable<DoubleMutable> {
+import java.io.Serializable;
+
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
 
     private double v;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
index 711669f..fd49876 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMutable.java
@@ -18,7 +18,9 @@
 
 package org.apache.kylin.metadata.measure;
 
-public class LongMutable implements Comparable<LongMutable> {
+import java.io.Serializable;
+
+public class LongMutable implements Comparable<LongMutable>, Serializable {
 
     private long v;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
index 4153cbd..ddf0782 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -21,11 +21,13 @@ package org.apache.kylin.metadata.measure;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 
+import java.io.Serializable;
+
 /**
  * @author yangli9
  * 
  */
-abstract public class MeasureAggregator<V> {
+abstract public class MeasureAggregator<V> implements Serializable {
 
     public static MeasureAggregator<?> create(String funcName, String returnType) {
         if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
index 6d8658d..71ed41c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.measure;
 
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,28 +31,28 @@ import org.apache.kylin.metadata.model.MeasureDesc;
  * 
  */
 @SuppressWarnings({ "rawtypes", "unchecked" })
-public class MeasureAggregators {
+public class MeasureAggregators implements Serializable {
 
-    private MeasureDesc[] descs;
-    private MeasureAggregator[] aggs;
+    private final MeasureAggregator[] aggs;
+    private final int descLength;
 
     public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
         this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
     }
 
     public MeasureAggregators(MeasureDesc... measureDescs) {
-        descs = measureDescs;
-        aggs = new MeasureAggregator[descs.length];
+        descLength = measureDescs.length;
+        aggs = new MeasureAggregator[descLength];
 
         Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
-        for (int i = 0; i < descs.length; i++) {
-            FunctionDesc func = descs[i].getFunction();
+        for (int i = 0; i < descLength; i++) {
+            FunctionDesc func = measureDescs[i].getFunction();
             aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
-            measureIndexMap.put(descs[i].getName(), i);
+            measureIndexMap.put(measureDescs[i].getName(), i);
         }
         // fill back dependent aggregator
-        for (int i = 0; i < descs.length; i++) {
-            String depMsrRef = descs[i].getDependentMeasureRef();
+        for (int i = 0; i < descLength; i++) {
+            String depMsrRef = measureDescs[i].getDependentMeasureRef();
             if (depMsrRef != null) {
                 int index = measureIndexMap.get(depMsrRef);
                 aggs[i].setDependentAggregator(aggs[index]);
@@ -64,17 +65,17 @@ public class MeasureAggregators {
             aggs[i].reset();
         }
     }
-
+    
     public void aggregate(Object[] values) {
-        assert values.length == descs.length;
+        assert values.length == descLength;
 
-        for (int i = 0; i < descs.length; i++) {
+        for (int i = 0; i < descLength; i++) {
             aggs[i].aggregate(values[i]);
         }
     }
 
     public void collectStates(Object[] states) {
-        for (int i = 0; i < descs.length; i++) {
+        for (int i = 0; i < descLength; i++) {
             states[i] = aggs[i].getState();
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 6392eca..12371ce 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -24,12 +24,14 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import java.io.Serializable;
+
 /**
  * Column Metadata from Source. All name should be uppercase.
  * <p/>
  */
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class ColumnDesc {
+public class ColumnDesc implements Serializable {
 
     @JsonProperty("id")
     private String id;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 9a89499..2b2cbed 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -31,7 +32,7 @@ import java.util.regex.Pattern;
  * @author yangli9
  * 
  */
-public class DataType {
+public class DataType implements Serializable {
 
     public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
             + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
index 3269e43..215e86c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -26,7 +27,7 @@ import java.util.Set;
 /**
  * @author xjiang
  */
-public class DatabaseDesc {
+public class DatabaseDesc implements Serializable {
     private String name;
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index caac52a..bce4996 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -20,9 +20,11 @@ package org.apache.kylin.metadata.model;
 
 import org.apache.commons.lang.StringUtils;
 
+import java.io.Serializable;
+
 /**
  */
-public class TblColRef {
+public class TblColRef implements Serializable {
 
     private static final String INNER_TABLE_NAME = "_kylin_table";
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 0428058..e7dbaa1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -82,6 +82,11 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
 
     }
 
+    @Override
+    public void close() {
+        
+    }
+
     private void initVariables(Long cuboidId) {
         bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index dd87782..89e50ab 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -17,9 +17,7 @@
 */
 package org.apache.kylin.engine.spark;
 
-import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.hash.Hasher;
@@ -42,7 +40,6 @@ import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -50,15 +47,16 @@ import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.inmemcubing.AbstractInMemCubeBuilder;
 import org.apache.kylin.cube.inmemcubing.InMemCubeBuilder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.*;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
+import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
+import org.apache.kylin.engine.spark.util.IteratorUtils;
 import org.apache.kylin.job.common.OptionsHelper;
 import org.apache.kylin.metadata.measure.MeasureAggregators;
 import org.apache.kylin.metadata.measure.MeasureCodec;
@@ -66,26 +64,25 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.storage.hbase.steps.HBaseConnection;
 import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
 import org.apache.kylin.storage.hbase.steps.CubeHTableUtil;
+import org.apache.kylin.storage.hbase.steps.HBaseConnection;
+import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
 import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.Function2;
-import org.apache.spark.api.java.function.PairFlatMapFunction;
-import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.api.java.function.*;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import scala.Tuple2;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
+import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.Executors;
@@ -213,6 +210,15 @@ public class SparkCubing extends AbstractSparkApplication {
             zeroValue.put(id, new HyperLogLogPlusCounter(14));
         }
 
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+        CubeDesc cubeDesc = CubeManager.getInstance(kylinConfig).getCube(cubeName).getDescriptor();
+        CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
+        CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
+        final int[] rowKeyColumnIndexes = flatTableDesc.getRowKeyColumnIndexes();
+        final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+        final List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
+
         final HashMap<Long, HyperLogLogPlusCounter> samplingResult = rowJavaRDD.aggregate(zeroValue,
                 new Function2<HashMap<Long, HyperLogLogPlusCounter>,
                         List<String>,
@@ -220,29 +226,20 @@ public class SparkCubing extends AbstractSparkApplication {
 
                     @Override
                     public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, List<String> v2) throws Exception {
-                        prepare();
-                        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-                        final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-                        final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-                        final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-                        final CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-                        final int nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
                         ByteArray[] row_hashcodes = new ByteArray[nRowKey];
                         for (int i = 0; i < nRowKey; ++i) {
                             row_hashcodes[i] = new ByteArray();
                         }
                         for (int i = 0; i < nRowKey; i++) {
                             Hasher hc = Hashing.murmur3_32().newHasher();
-                            String colValue = v2.get(flatTableDesc.getRowKeyColumnIndexes()[i]);
+                            String colValue = v2.get(rowKeyColumnIndexes[i]);
                             if (colValue != null) {
                                 row_hashcodes[i].set(hc.putString(colValue).hash().asBytes());
                             } else {
                                 row_hashcodes[i].set(hc.putInt(0).hash().asBytes());
                             }
                         }
-                        final CuboidScheduler cuboidScheduler = new CuboidScheduler(cubeDesc);
-                        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-                        final List<Long> allCuboidIds = cuboidScheduler.getAllCuboidIds();
+
                         final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
                         for (Long cuboidId : allCuboidIds) {
                             BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
@@ -260,7 +257,6 @@ public class SparkCubing extends AbstractSparkApplication {
                             allCuboidsBitSet.put(cuboidId, cuboidBitSet);
                         }
 
-                        HashMap<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
                         for (Long cuboidId : allCuboidIds) {
                             Hasher hc = Hashing.murmur3_32().newHasher();
                             HyperLogLogPlusCounter counter = v1.get(cuboidId);
@@ -269,9 +265,8 @@ public class SparkCubing extends AbstractSparkApplication {
                                 hc.putBytes(row_hashcodes[cuboidBitSet[position]].array());
                             }
                             counter.add(hc.hash().asBytes());
-                            result.put(cuboidId, counter);
                         }
-                        return result;
+                        return v1;
                     }
                 },
                 new Function2<HashMap<Long, HyperLogLogPlusCounter>,
@@ -281,29 +276,14 @@ public class SparkCubing extends AbstractSparkApplication {
                     public HashMap<Long, HyperLogLogPlusCounter> call(HashMap<Long, HyperLogLogPlusCounter> v1, HashMap<Long, HyperLogLogPlusCounter> v2) throws Exception {
                         Preconditions.checkArgument(v1.size() == v2.size());
                         Preconditions.checkArgument(v1.size() > 0);
-                        final HashMap<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(v1.size());
-                        for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
-                            final HyperLogLogPlusCounter counter1 = entry.getValue();
-                            final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
-                            if (counter2 != null) {
-                                counter1.merge(counter2);
-                            }
-                            result.put(entry.getKey(), counter1);
-                        }
-                        return result;
-                    }
-
-                    private HashMap<Long, HyperLogLogPlusCounter> copy(HashMap<Long, HyperLogLogPlusCounter> v1, HashMap<Long, HyperLogLogPlusCounter> v2) {
-                        final HashMap<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(v1.size());
                         for (Map.Entry<Long, HyperLogLogPlusCounter> entry : v1.entrySet()) {
                             final HyperLogLogPlusCounter counter1 = entry.getValue();
                             final HyperLogLogPlusCounter counter2 = v2.get(entry.getKey());
                             if (counter2 != null) {
                                 counter1.merge(counter2);
                             }
-                            result.put(entry.getKey(), counter1);
                         }
-                        return result;
+                        return v1;
                     }
 
                 });
@@ -313,142 +293,99 @@ public class SparkCubing extends AbstractSparkApplication {
     /*
     return hfile location
      */
-    private String build(JavaRDD<List<List<String>>> javaRDD, final String cubeName, final String segmentId) throws Exception {
-
-
-        final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
+    private String build(JavaRDD<List<String>> javaRDD, final String cubeName, final String segmentId, final byte[][] splitKeys) throws Exception {
+        CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        CubeDesc cubeDesc = cubeInstance.getDescriptor();
+        CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+        List<TblColRef> baseCuboidColumn = Cuboid.findById(cubeDesc, Cuboid.getBaseCuboidId(cubeDesc)).getColumns();
+        final Map<TblColRef, Integer> columnLengthMap = Maps.newHashMap();
+        for (TblColRef tblColRef : baseCuboidColumn) {
+            columnLengthMap.put(tblColRef, cubeSegment.getColumnLength(tblColRef));
+        }
+        final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+        for (DimensionDesc dim : cubeDesc.getDimensions()) {
+            // dictionary
+            for (TblColRef col : dim.getColumnRefs()) {
+                if (cubeDesc.getRowkey().isUseDictionary(col)) {
+                    Dictionary<?> dict = cubeSegment.getDictionary(col);
+                    if (dict == null) {
+                        System.err.println("Dictionary for " + col + " was not found.");
+                    }
+                    dictionaryMap.put(col, dict);
+                    System.out.println("col:" + col + " dictionary size:" + dict.getSize());
+                }
+            }
+        }
+        final JavaPairRDD<byte[], byte[]> javaPairRDD = javaRDD.glom().mapPartitionsToPair(new PairFlatMapFunction<Iterator<List<List<String>>>, byte[], byte[]>() {
 
             @Override
             public Iterable<Tuple2<byte[], byte[]>> call(Iterator<List<List<String>>> listIterator) throws Exception {
+                long t = System.currentTimeMillis();
                 prepare();
 
                 final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
                 final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-                final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
-                final Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
-
-                for (DimensionDesc dim : cubeDesc.getDimensions()) {
-                    // dictionary
-                    for (TblColRef col : dim.getColumnRefs()) {
-                        if (cubeDesc.getRowkey().isUseDictionary(col)) {
-                            Dictionary<?> dict = cubeSegment.getDictionary(col);
-                            if (dict == null) {
-                                System.err.println("Dictionary for " + col + " was not found.");
-                            }
-                            dictionaryMap.put(col, dict);
+                LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue();
+                System.out.println("load properties finished");
+                AbstractInMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
+                inMemCubeBuilder.setReserveMemoryMB(2400);
+                final SparkCuboidWriter sparkCuboidWriter = new BufferedCuboidWriter(new DefaultTupleConverter(cubeDesc, columnLengthMap));
+                final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, sparkCuboidWriter));
+                try {
+                    while (listIterator.hasNext()) {
+                        for (List<String> row : listIterator.next()) {
+                            blockingQueue.put(row);
                         }
                     }
+                    blockingQueue.put(Collections.<String>emptyList());
+                    future.get();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
-                final Iterator<Iterator<Tuple2<byte[], byte[]>>> iterator = Iterators.transform(listIterator, new Function<List<List<String>>, Iterator<Tuple2<byte[], byte[]>>>() {
-                    @Nullable
-                    @Override
-                    public Iterator<Tuple2<byte[], byte[]>> apply(@Nullable List<List<String>> input) {
-                        if (input.isEmpty()) {
-                            return Collections.emptyIterator();
-                        }
-                        LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>();
-                        final List<Tuple2<byte[], byte[]>> result = Lists.newLinkedList();
-                        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(cubeInstance.getDescriptor(), dictionaryMap);
-                        final Future<?> future = Executors.newCachedThreadPool().submit(inMemCubeBuilder.buildAsRunnable(blockingQueue, new ICuboidWriter() {
-
-                            final int measureCount = cubeDesc.getMeasures().size();
-                            int[] measureColumnsIndex = new int[measureCount];
-                            ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-
-                            @Override
-                            public void write(long cuboidId, GTRecord record) throws IOException {
-                                int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
-                                Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
-                                for (TblColRef column : cuboid.getColumns()) {
-                                    bytesLength += cubeSegment.getColumnLength(column);
-                                    final Dictionary<?> dictionary = cubeSegment.getDictionary(column);
-                                }
-
-                                final int dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
-                                for (int i = 0; i < measureCount; i++) {
-                                    measureColumnsIndex[i] = dimensions + i;
-                                }
-
-                                byte[] key = new byte[bytesLength];
-                                System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
-                                int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
-                                for (int x = 0; x < dimensions; x++) {
-                                    final ByteArray byteArray = record.get(x);
-                                    System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length());
-                                    offSet += byteArray.length();
-                                }
-
-
-                                //output measures
-                                valueBuf.clear();
-                                record.exportColumns(measureColumnsIndex, valueBuf);
-
-                                byte[] value = new byte[valueBuf.position()];
-                                System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position());
-                                result.add(new Tuple2<byte[], byte[]>(key, value));
-                            }
-
-                            @Override
-                            public void flush() {
-
-                            }
-                        }));
-                        try {
-                            for (List<String> row : input) {
-                                blockingQueue.put(row);
-                            }
-                            blockingQueue.put(Collections.<String>emptyList());
-                            future.get();
-                        } catch (Exception e) {
-                            throw new RuntimeException(e);
-                        }
-                        return result.iterator();
-                    }
-                });
-                return new Iterable<Tuple2<byte[], byte[]>>() {
-                    @Override
-                    public Iterator<Tuple2<byte[], byte[]>> iterator() {
-                        return Iterators.concat(iterator);
-                    }
-                };
+                System.out.println("build partition cost: " + (System.currentTimeMillis() - t) + "ms");
+                return sparkCuboidWriter.getResult();
             }
         });
-        final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-        final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
-        final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
+
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         Configuration conf = getConfigurationForHFile(cubeSegment.getStorageLocationIdentifier());
         Path path = new Path(kylinConfig.getHdfsWorkingDirectory(), "hfile_" + UUID.randomUUID().toString());
         Preconditions.checkArgument(!FileSystem.get(conf).exists(path));
         String url = conf.get("fs.defaultFS") + path.toString();
         System.out.println("use " + url + " as hfile");
+        List<MeasureDesc> measuresDescs = Lists.newArrayList();
+        for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
+            for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
+                for (MeasureDesc measure : colDesc.getMeasures()) {
+                    measuresDescs.add(measure);
+                }
+            }
+        }
+        final int measureSize = measuresDescs.size();
+        final String[] dataTypes = new String[measureSize];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measuresDescs.get(i).getFunction().getReturnType();
+        }
+        final MeasureAggregators aggs = new MeasureAggregators(measuresDescs);
+        writeToHFile2(javaPairRDD, dataTypes, measureSize, aggs, splitKeys, conf, url);
+        return url;
+    }
+
+    private void writeToHFile(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
         javaPairRDD.reduceByKey(new Function2<byte[], byte[], byte[]>() {
 
             @Override
             public byte[] call(byte[] v1, byte[] v2) throws Exception {
-                prepare();
-                final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
-                final CubeManager cubeManager = CubeManager.getInstance(kylinConfig);
-                final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
-                final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-                List<MeasureDesc> measuresDescs = Lists.newArrayList();
-                for (HBaseColumnFamilyDesc cfDesc : cubeDesc.getHBaseMapping().getColumnFamily()) {
-                    for (HBaseColumnDesc colDesc : cfDesc.getColumns()) {
-                        for (MeasureDesc measure : colDesc.getMeasures()) {
-                            measuresDescs.add(measure);
-                        }
-                    }
-                }
-                MeasureCodec codec = new MeasureCodec(measuresDescs);
-                MeasureAggregators aggs = new MeasureAggregators(measuresDescs);
-                Object[] input = new Object[measuresDescs.size()];
-                Object[] result = new Object[measuresDescs.size()];
-                
+                MeasureCodec codec = new MeasureCodec(dataTypes);
+                Object[] input = new Object[measureSize];
+                Object[] result = new Object[measureSize];
+
                 codec.decode(ByteBuffer.wrap(v1), input);
                 aggs.aggregate(input);
                 codec.decode(ByteBuffer.wrap(v2), input);
                 aggs.aggregate(input);
-                
+
                 aggs.collectStates(result);
                 final ByteBuffer buffer = ByteBuffer.allocate(Math.max(v1.length, v2.length));
                 buffer.clear();
@@ -464,8 +401,75 @@ public class SparkCubing extends AbstractSparkApplication {
                 KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2());
                 return new Tuple2(key, value);
             }
-        }).saveAsNewAPIHadoopFile(url, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
-        return url;
+        }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
+    }
+
+    private void writeToHFile2(final JavaPairRDD<byte[], byte[]> javaPairRDD, final String[] dataTypes, final int measureSize, final MeasureAggregators aggs, final byte[][] splitKeys, final Configuration conf, final String hFileLocation) {
+        javaPairRDD.repartitionAndSortWithinPartitions(new Partitioner() {
+            @Override
+            public int numPartitions() {
+                return splitKeys.length + 1;
+            }
+
+            @Override
+            public int getPartition(Object key) {
+                Preconditions.checkArgument(key instanceof byte[]);
+                for (int i = 0, n = splitKeys.length; i < n; ++i) {
+                    if (UnsignedBytes.lexicographicalComparator().compare((byte[]) key, splitKeys[i]) < 0) {
+                        return i;
+                    }
+                }
+                return splitKeys.length;
+            }
+        }, UnsignedBytes.lexicographicalComparator()).mapPartitions(new FlatMapFunction<Iterator<Tuple2<byte[], byte[]>>, Tuple2<byte[], byte[]>>() {
+            @Override
+            public Iterable<Tuple2<byte[], byte[]>> call(final Iterator<Tuple2<byte[], byte[]>> tuple2Iterator) throws Exception {
+                return new Iterable<Tuple2<byte[], byte[]>>() {
+                    final ByteBuffer buffer = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+                    final MeasureCodec codec = new MeasureCodec(dataTypes);
+                    final Object[] input = new Object[measureSize];
+                    final Object[] result = new Object[measureSize];
+
+                    @Override
+                    public Iterator<Tuple2<byte[], byte[]>> iterator() {
+                        return IteratorUtils.merge(tuple2Iterator, UnsignedBytes.lexicographicalComparator(), new Function<Iterable<byte[]>, byte[]>() {
+                            @Override
+                            public byte[] call(Iterable<byte[]> v1) throws Exception {
+                                final LinkedList<byte[]> list = Lists.newLinkedList(v1);
+                                if (list.size() == 1) {
+                                    return list.get(0);
+                                }
+                                aggs.reset();
+                                for (byte[] v : list) {
+                                    try {
+                                        codec.decode(ByteBuffer.wrap(v), input);
+                                    } catch (BufferUnderflowException e) {
+                                        e.printStackTrace();
+                                        System.out.println("buffer under flow v.length:" + v.length);
+                                        System.out.println("value:" + Arrays.toString(v));
+                                        throw e;
+                                    }
+                                    aggs.aggregate(input);
+                                }
+                                aggs.collectStates(result);
+                                buffer.clear();
+                                codec.encode(result, buffer);
+                                byte[] bytes = new byte[buffer.position()];
+                                System.arraycopy(buffer.array(), buffer.arrayOffset(), bytes, 0, buffer.position());
+                                return bytes;
+                            }
+                        });
+                    }
+                };
+            }
+        }, true).mapToPair(new PairFunction<Tuple2<byte[], byte[]>, ImmutableBytesWritable, KeyValue>() {
+            @Override
+            public Tuple2<ImmutableBytesWritable, KeyValue> call(Tuple2<byte[], byte[]> tuple2) throws Exception {
+                ImmutableBytesWritable key = new ImmutableBytesWritable(tuple2._1());
+                KeyValue value = new KeyValue(tuple2._1(), "F1".getBytes(), "M".getBytes(), tuple2._2());
+                return new Tuple2(key, value);
+            }
+        }).saveAsNewAPIHadoopFile(hFileLocation, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat.class, conf);
     }
 
     private static void prepare() throws Exception {
@@ -475,8 +479,8 @@ public class SparkCubing extends AbstractSparkApplication {
         System.setProperty(KylinConfig.KYLIN_CONF, confPath);
         ClassUtil.addClasspath(confPath);
     }
-    
-    private void createHTable(String cubeName, String segmentId, Map<Long, HyperLogLogPlusCounter> samplingResult) throws Exception {
+
+    private byte[][] createHTable(String cubeName, String segmentId, Map<Long, HyperLogLogPlusCounter> samplingResult) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -485,18 +489,19 @@ public class SparkCubing extends AbstractSparkApplication {
         final byte[][] splitKeys = CreateHTableJob.getSplitsFromCuboidStatistics(cubeSizeMap, kylinConfig, cubeSegment);
         CubeHTableUtil.createHTable(cubeDesc, cubeSegment.getStorageLocationIdentifier(), splitKeys);
         System.out.println(cubeSegment.getStorageLocationIdentifier() + " table created");
+        return splitKeys;
     }
-    
+
     private Configuration getConfigurationForHFile(String hTableName) throws IOException {
         final Configuration conf = HBaseConnection.newHBaseConfiguration(KylinConfig.getInstanceFromEnv().getStorageUrl());
         Job job = Job.getInstance(conf);
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
         job.setMapOutputValueClass(KeyValue.class);
         HTable table = new HTable(conf, hTableName);
-        HFileOutputFormat.configureIncrementalLoad (job, table);
+        HFileOutputFormat.configureIncrementalLoad(job, table);
         return conf;
     }
-    
+
     private void bulkLoadHFile(String cubeName, String segmentId, String hfileLocation) throws Exception {
         final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
@@ -521,7 +526,7 @@ public class SparkCubing extends AbstractSparkApplication {
 
         int ret = ToolRunner.run(new LoadIncrementalHFiles(hbaseConf), newArgs);
         System.out.println("incremental load result:" + ret);
-        
+
         cubeSegment.setStatus(SegmentStatusEnum.READY);
         try {
             CubeUpdate cubeBuilder = new CubeUpdate(cubeInstance);
@@ -538,6 +543,9 @@ public class SparkCubing extends AbstractSparkApplication {
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         final String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         SparkConf conf = new SparkConf().setAppName("Simple Application");
+        conf.set("spark.executor.memory", "6g");
+        conf.set("spark.storage.memoryFraction", "0.3");
+
         JavaSparkContext sc = new JavaSparkContext(conf);
         HiveContext sqlContext = new HiveContext(sc.sc());
         final DataFrame intermediateTable = sqlContext.sql("select * from " + hiveTable);
@@ -568,9 +576,9 @@ public class SparkCubing extends AbstractSparkApplication {
             }
         });
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(rowJavaRDD, cubeName);
-        createHTable(cubeName, segmentId, samplingResult);
-        
-        final String hfile = build(rowJavaRDD.glom(), cubeName, segmentId);
+        final byte[][] splitKeys = createHTable(cubeName, segmentId, samplingResult);
+
+        final String hfile = build(rowJavaRDD, cubeName, segmentId, splitKeys);
         bulkLoadHFile(cubeName, segmentId, hfile);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java
new file mode 100644
index 0000000..f741ed6
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCuboidWriter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark;
+
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import scala.Tuple2;
+
+/**
+ */
+public interface SparkCuboidWriter extends ICuboidWriter {
+    
+    Iterable<Tuple2<byte[], byte[]>> getResult();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java
new file mode 100644
index 0000000..6656e9b
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriter.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.cube;
+
+import org.apache.kylin.engine.spark.SparkCuboidWriter;
+import org.apache.kylin.gridtable.GTRecord;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ */
+public class BufferedCuboidWriter implements SparkCuboidWriter {
+
+    private final LinkedBlockingQueue<Tuple2<byte[], byte[]>> blockingQueue;
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private final TupleConverter tupleConverter;
+
+    public BufferedCuboidWriter(TupleConverter tupleConverter) {
+        this.blockingQueue = new LinkedBlockingQueue<>(10000);
+        this.tupleConverter = tupleConverter;
+    }
+
+    volatile Throwable error;
+
+    class RunnableWrapper implements Runnable {
+
+        private final Runnable runnable;
+
+        RunnableWrapper(Runnable runnable) {
+            this.runnable = runnable;
+        }
+
+        @Override
+        public void run() {
+            try {
+                runnable.run();
+            } catch (Throwable e) {
+                e.printStackTrace();
+                error = e;
+            }
+        }
+    }
+
+    private void checkError() {
+        if (error != null) {
+            if (error instanceof RuntimeException) {
+                throw (RuntimeException) error;
+            } else {
+                throw new RuntimeException(error);
+            }
+        }
+    }
+
+    @Override
+    public void write(final long cuboidId, final GTRecord record) throws IOException {
+        checkError();
+        executorService.submit(new RunnableWrapper(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    blockingQueue.put(tupleConverter.convert(cuboidId, record));
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }));
+    }
+
+    @Override
+    public void flush() {
+        checkError();
+    }
+
+    @Override
+    public void close() {
+        checkError();
+        executorService.submit(new RunnableWrapper(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    blockingQueue.put(new Tuple2(new byte[0], new byte[0]));
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }));
+        executorService.shutdown();
+    }
+
+    @Override
+    public Iterable<Tuple2<byte[], byte[]>> getResult() {
+        return new Iterable<Tuple2<byte[], byte[]>>() {
+            @Override
+            public Iterator<Tuple2<byte[], byte[]>> iterator() {
+                return new Iterator<Tuple2<byte[], byte[]>>() {
+                    Tuple2<byte[], byte[]> current = null;
+                    @Override
+                    public boolean hasNext() {
+                        if (current == null) {
+                            try {
+                                current = blockingQueue.take();
+                            } catch (InterruptedException e) {
+                                throw new RuntimeException(e);
+                            }
+                        }
+                        return current._1().length > 0 && current._2().length > 0;
+                    }
+
+                    @Override
+                    public Tuple2<byte[], byte[]> next() {
+                        if (hasNext()) {
+                            Tuple2<byte[], byte[]> result = current;
+                            current = null;
+                            return result;
+                        } else {
+                            throw new NoSuchElementException();
+                        }
+                    }
+
+                    @Override
+                    public void remove() {
+                        throw new UnsupportedOperationException();
+                    }
+                };
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
new file mode 100644
index 0000000..c687b78
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/DefaultTupleConverter.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.cube;
+
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.metadata.model.TblColRef;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Map;
+
+/**
+ */
+public final class DefaultTupleConverter implements TupleConverter {
+
+    private final static ThreadLocal<ByteBuffer> valueBuf = new ThreadLocal<>();
+    private final static ThreadLocal<int[]> measureColumnsIndex = new ThreadLocal<>();
+    private final CubeDesc cubeDesc;
+    private final int measureCount;
+    private final Map<TblColRef, Integer> columnLengthMap;
+
+    public DefaultTupleConverter(CubeDesc cubeDesc, Map<TblColRef, Integer> columnLengthMap) {
+        this.cubeDesc = cubeDesc;
+        this.measureCount = cubeDesc.getMeasures().size();
+        this.columnLengthMap = columnLengthMap;
+    }
+
+    private ByteBuffer getValueBuf() {
+        if (valueBuf.get() == null) {
+            valueBuf.set(ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE));
+        }
+        return valueBuf.get();
+    }
+
+    private int[] getMeasureColumnsIndex() {
+        if (measureColumnsIndex.get() == null) {
+            measureColumnsIndex.set(new int[measureCount]);
+        }
+        return measureColumnsIndex.get();
+    }
+    
+    @Override
+    public final Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
+        int bytesLength = RowConstants.ROWKEY_CUBOIDID_LEN;
+        Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
+        for (TblColRef column : cuboid.getColumns()) {
+            bytesLength += columnLengthMap.get(column);
+        }
+
+        final int dimensions = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+        int[] measureColumnsIndex = getMeasureColumnsIndex();
+        for (int i = 0; i < measureCount; i++) {
+            measureColumnsIndex[i] = dimensions + i;
+        }
+
+        byte[] key = new byte[bytesLength];
+        System.arraycopy(Bytes.toBytes(cuboidId), 0, key, 0, RowConstants.ROWKEY_CUBOIDID_LEN);
+        int offSet = RowConstants.ROWKEY_CUBOIDID_LEN;
+        for (int x = 0; x < dimensions; x++) {
+            final ByteArray byteArray = record.get(x);
+            System.arraycopy(byteArray.array(), byteArray.offset(), key, offSet, byteArray.length());
+            offSet += byteArray.length();
+        }
+
+        ByteBuffer valueBuf = getValueBuf();
+        valueBuf.clear();
+        record.exportColumns(measureColumnsIndex, valueBuf);
+
+        byte[] value = new byte[valueBuf.position()];
+        System.arraycopy(valueBuf.array(), 0, value, 0, valueBuf.position());
+        return new Tuple2<>(key, value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java
new file mode 100644
index 0000000..eaf09c6
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/ListBackedCuboidWriter.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.cube;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.engine.spark.SparkCuboidWriter;
+import org.apache.kylin.gridtable.GTRecord;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/**
+ */
+public class ListBackedCuboidWriter implements SparkCuboidWriter {
+    
+    private final ArrayList<Tuple2<byte[], byte[]>> result;
+    private final TupleConverter tupleConverter;
+
+    public ListBackedCuboidWriter(TupleConverter tupleConverter) {
+        this.result= Lists.newArrayList();
+        this.tupleConverter = tupleConverter;
+    }
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+        result.add(tupleConverter.convert(cuboidId, record));
+    }
+
+    @Override
+    public void flush() {
+
+    }
+
+    @Override
+    public void close() {
+
+    }
+    
+    @Override
+    public Iterable<Tuple2<byte[], byte[]>> getResult() {
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java
new file mode 100644
index 0000000..09b161c
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/cube/TupleConverter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.cube;
+
+import org.apache.kylin.gridtable.GTRecord;
+import scala.Tuple2;
+
+/**
+ */
+public interface TupleConverter {
+    
+    Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
new file mode 100644
index 0000000..a6a5f4f
--- /dev/null
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/IteratorUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.util;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import scala.Tuple2;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.NoSuchElementException;
+
+/**
+ */
+public class IteratorUtils {
+
+    public static <K, V> Iterator<Tuple2<K, V>> merge(final Iterator<Tuple2<K, V>> input, final Comparator<K> comparator, final Function<Iterable<V>, V> converter) {
+        return new Iterator<Tuple2<K, V>>() {
+
+            Tuple2<K, V> current = input.hasNext()?input.next():null;
+            @Override
+            public boolean hasNext() {
+                return current != null;
+            }
+
+            @Override
+            public Tuple2<K, V> next() {
+                if (!hasNext()) {
+                    throw new NoSuchElementException();
+                }
+                final LinkedList<V> values = Lists.newLinkedList();
+                K currentKey = current._1();
+                values.add(current._2());
+                while(input.hasNext()) {
+                    Tuple2<K, V> next = input.next();
+                    if (comparator.compare(currentKey, next._1()) == 0) {
+                        values.add(next._2());
+                    } else {
+                        current = next;
+                        try {
+                            return new Tuple2<>(currentKey, converter.call(values));
+                        } catch (Exception e) {
+                            throw new RuntimeException(e);
+                        }
+                    }
+                }
+                if (!input.hasNext()) {
+                    current = null;
+                }
+                try {
+                    return new Tuple2<>(currentKey, converter.call(values));
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+
+            @Override
+            public void remove() {
+                throw new UnsupportedOperationException();
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
new file mode 100644
index 0000000..89b7599
--- /dev/null
+++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/cube/BufferedCuboidWriterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.cube;
+
+import org.apache.kylin.gridtable.GTRecord;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ */
+public class BufferedCuboidWriterTest {
+    
+    @Test
+    public void test() throws ExecutionException, InterruptedException {
+        final BufferedCuboidWriter bufferedCuboidWriter = new BufferedCuboidWriter(new TupleConverter() {
+            @Override
+            public Tuple2<byte[], byte[]> convert(long cuboidId, GTRecord record) {
+                return new Tuple2<>(Long.valueOf(cuboidId).toString().getBytes(), Long.valueOf(cuboidId).toString().getBytes());
+            }
+        });
+        final int testCount = 10000000;
+        final Future<?> future = Executors.newCachedThreadPool().submit(new Runnable() {
+            @Override
+            public void run() {
+                int i = 0;
+                
+                while (i++ < testCount) {
+                    try {
+                        bufferedCuboidWriter.write(i, null);
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                        throw new RuntimeException(e);
+                    }
+                }
+                bufferedCuboidWriter.close();
+            }
+        });
+        long actualCount = 0;
+        for (Tuple2<byte[], byte[]> tuple2 : bufferedCuboidWriter.getResult()) {
+            assertEquals(Long.parseLong(new String(tuple2._1())), ++actualCount);
+        }
+        future.get();
+        assertEquals(actualCount, testCount);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/IteratorUtilsTest.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/IteratorUtilsTest.java b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/IteratorUtilsTest.java
new file mode 100644
index 0000000..7ecdabf
--- /dev/null
+++ b/engine-spark/src/test/java/org/apache/kylin/engine/spark/util/IteratorUtilsTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.engine.spark.util;
+
+import com.google.common.collect.Lists;
+import org.apache.spark.api.java.function.Function;
+import org.junit.Test;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ */
+public class IteratorUtilsTest {
+
+    private static ArrayList<Tuple2<Integer, Integer>> getResult(List<Tuple2<Integer, Integer>> list) {
+
+        return Lists.newArrayList(IteratorUtils.merge(list.iterator(), new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1 - o2;
+            }
+        }, new Function<Iterable<Integer>, Integer>() {
+            @Override
+            public Integer call(Iterable<Integer> v1) throws Exception {
+                int sum = 0;
+                for (Integer integer : v1) {
+                    sum += integer;
+                }
+                return sum;
+            }
+        }));
+    }
+    
+    @Test
+    public void test() {
+        List<Tuple2<Integer, Integer>> list = Lists.newArrayList();
+        ArrayList<Tuple2<Integer, Integer>> result = null;
+
+        result = getResult(list);
+        assertTrue(result.size() == 0);
+        System.out.println(result);
+
+        list.clear();
+        list.add(new Tuple2(0, 1));
+        list.add(new Tuple2(0, 2));
+        list.add(new Tuple2(1, 2));
+        list.add(new Tuple2(1, 3));
+        list.add(new Tuple2(2, 3));
+        list.add(new Tuple2(2, 3));
+        list.add(new Tuple2(3, 0));
+        result = getResult(list);
+        assertTrue(result.size() == 4);
+        assertEquals(result.get(0), new Tuple2(0, 3));
+        assertEquals(result.get(1), new Tuple2(1, 5));
+        assertEquals(result.get(2), new Tuple2(2, 6));
+        assertEquals(result.get(3), new Tuple2(3, 0));
+        System.out.println(result);
+
+        list.clear();
+        list.add(new Tuple2(0, 1));
+        result = getResult(list);
+        assertTrue(result.size() == 1);
+        System.out.println(result);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/df03c53f/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index 1271070..e698b27 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -140,4 +140,9 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
         }
     }
 
+    @Override
+    public void close() {
+        
+    }
+
 }


Mime
View raw message