kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lid...@apache.org
Subject [2/3] kylin git commit: KYLIN-1267 Improve serialization of AggregationCache
Date Fri, 08 Jan 2016 03:42:40 GMT
KYLIN-1267 Improve serialization of AggregationCache


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aa9a8298
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aa9a8298
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aa9a8298

Branch: refs/heads/2.x-staging
Commit: aa9a8298703fca570a15c981e397859faff76df0
Parents: 4b54ca8
Author: lidongsjtu <dongli@ebay.com>
Authored: Fri Jan 8 11:40:51 2016 +0800
Committer: lidongsjtu <dongli@ebay.com>
Committed: Fri Jan 8 11:40:51 2016 +0800

----------------------------------------------------------------------
 .../kylin/gridtable/GTAggregateScanner.java     | 112 +++++++++++--------
 .../kylin/measure/MeasureAggregators.java       |   5 +
 .../org/apache/kylin/measure/MeasureCodec.java  |  13 +++
 3 files changed, 84 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/aa9a8298/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index f3afaba..f8fe661 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -5,6 +5,8 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -19,14 +21,13 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.util.KryoUtils;
 import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -134,8 +135,7 @@ public class GTAggregateScanner implements IGTScanner {
         final List<Dump> dumps;
         final int keyLength;
         final boolean[] compareMask;
-
-        final Kryo kryo = KryoUtils.getKryo();
+        final MeasureCodec measureCodec;
 
         final Comparator<byte[]> bytesComparator = new Comparator<byte[]>() {
             @Override
@@ -166,6 +166,15 @@ public class GTAggregateScanner implements IGTScanner {
             keyLength = compareMask.length;
             dumps = Lists.newArrayList();
             aggBufMap = createBuffMap();
+            measureCodec = createMeasureCodec();
+        }
+
+        private MeasureCodec createMeasureCodec() {
+            DataType[] types = new DataType[metrics.trueBitCount()];
+            for (int i = 0; i < types.length; i++) {
+                types[i] = info.getColumnType(metrics.trueBitAt(i));
+            }
+            return new MeasureCodec(types);
         }
 
         private boolean[] createCompareMask() {
@@ -351,9 +360,9 @@ public class GTAggregateScanner implements IGTScanner {
             }
         }
 
-        class Dump implements Iterable<Pair<byte[], MeasureAggregator[]>> {
+        class Dump implements Iterable<Pair<byte[], byte[]>> {
             File dumpedFile;
-            Input input;
+            ObjectInputStream ois;
             SortedMap<byte[], MeasureAggregator[]> buffMap;
 
             public Dump(SortedMap<byte[], MeasureAggregator[]> buffMap) throws IOException
{
@@ -361,16 +370,15 @@ public class GTAggregateScanner implements IGTScanner {
             }
 
             @Override
-            public Iterator<Pair<byte[], MeasureAggregator[]>> iterator() {
+            public Iterator<Pair<byte[], byte[]>> iterator() {
                 try {
                     if (dumpedFile == null || !dumpedFile.exists()) {
                         throw new RuntimeException("Dumped file cannot be found at: " + (dumpedFile
== null ? "<null>" : dumpedFile.getAbsolutePath()));
                     }
 
-                    input = new Input(new FileInputStream(dumpedFile));
-
-                    final int count = kryo.readObject(input, Integer.class);
-                    return new Iterator<Pair<byte[], MeasureAggregator[]>>()
{
+                    ois = new ObjectInputStream(new FileInputStream(dumpedFile));
+                    final int count = ois.readInt();
+                    return new Iterator<Pair<byte[], byte[]>>() {
                         int cursorIdx = 0;
 
                         @Override
@@ -379,10 +387,12 @@ public class GTAggregateScanner implements IGTScanner {
                         }
 
                         @Override
-                        public Pair<byte[], MeasureAggregator[]> next() {
+                        public Pair<byte[], byte[]> next() {
                             try {
                                 cursorIdx++;
-                                return (Pair<byte[], MeasureAggregator[]>) kryo.readObject(input,
Pair.class);
+                                byte[] key = (byte[]) ois.readObject();
+                                byte[] value = (byte[]) ois.readObject();
+                                return new Pair<>(key, value);
                             } catch (Exception e) {
                                 throw new RuntimeException("Cannot read AggregationCache
from dumped file: " + e.getMessage());
                             }
@@ -400,28 +410,37 @@ public class GTAggregateScanner implements IGTScanner {
 
             public void flush() throws IOException {
                 if (buffMap != null) {
-                    Output output = null;
+                    ObjectOutputStream oos = null;
+                    Object[] aggrResult = null;
+                    final ByteBuffer metricsBuf = ByteBuffer.allocate(info.getMaxColumnLength(metrics));
                     try {
                         dumpedFile = File.createTempFile("KYLIN_AGGR_", ".tmp");
 
                         logger.info("AggregationCache will dump to file: " + dumpedFile.getAbsolutePath());
-                        output = new Output(new FileOutputStream(dumpedFile));
-                        kryo.writeObject(output, buffMap.size());
+                        oos = new ObjectOutputStream(new FileOutputStream(dumpedFile));
+                        oos.writeInt(buffMap.size());
                         for (Entry<byte[], MeasureAggregator[]> entry : buffMap.entrySet())
{
-                            kryo.writeObject(output, new Pair(entry.getKey(), entry.getValue()));
+                            metricsBuf.clear();
+
+                            MeasureAggregators aggs = new MeasureAggregators(entry.getValue());
+                            aggrResult = new Object[metrics.trueBitCount()];
+                            aggs.collectStates(aggrResult);
+                            measureCodec.encode(aggrResult, metricsBuf);
+                            oos.writeObject(entry.getKey());
+                            oos.writeObject(metricsBuf.array());
                         }
                     } finally {
                         buffMap = null;
-                        if (output != null)
-                            output.close();
+                        if (oos != null)
+                            oos.close();
                     }
                 }
             }
 
             public void terminate() throws IOException {
                 buffMap = null;
-                if (input != null)
-                    input.close();
+                if (ois != null)
+                    ois.close();
                 if (dumpedFile != null && dumpedFile.exists())
                     dumpedFile.delete();
             }
@@ -429,8 +448,10 @@ public class GTAggregateScanner implements IGTScanner {
 
         class DumpMerger implements Iterable<Pair<byte[], MeasureAggregator[]>>
{
             final PriorityQueue<Pair<byte[], Integer>> minHeap;
-            final List<Iterator<Pair<byte[], MeasureAggregator[]>>> dumpIterators;
-            final List<MeasureAggregator[]> dumpCurrentValues;
+            final List<Iterator<Pair<byte[], byte[]>>> dumpIterators;
+            final List<Object[]> dumpCurrentValues;
+            final MeasureAggregator[] resultMeasureAggregators = newAggregators();
+            final MeasureAggregators resultAggrs = new MeasureAggregators(resultMeasureAggregators);
 
             public DumpMerger(List<Dump> dumps) {
                 minHeap = new PriorityQueue<>(dumps.size(), new Comparator<Pair<byte[],
Integer>>() {
@@ -442,26 +463,26 @@ public class GTAggregateScanner implements IGTScanner {
                 dumpIterators = Lists.newArrayListWithCapacity(dumps.size());
                 dumpCurrentValues = Lists.newArrayListWithCapacity(dumps.size());
 
-                Iterator<Pair<byte[], MeasureAggregator[]>> it;
+                Iterator<Pair<byte[], byte[]>> it;
                 for (int i = 0; i < dumps.size(); i++) {
                     it = dumps.get(i).iterator();
+                    dumpCurrentValues.add(i, null);
                     if (it.hasNext()) {
                         dumpIterators.add(i, it);
-                        Pair<byte[], MeasureAggregator[]> entry = it.next();
-                        minHeap.offer(new Pair(entry.getKey(), i));
-                        dumpCurrentValues.add(i, entry.getValue());
+                        enqueueFromDump(i);
                     } else {
                         dumpIterators.add(i, null);
-                        dumpCurrentValues.add(i, null);
                     }
                 }
             }
 
             private void enqueueFromDump(int index) {
                 if (dumpIterators.get(index) != null && dumpIterators.get(index).hasNext())
{
-                    Pair<byte[], MeasureAggregator[]> entry = dumpIterators.get(index).next();
-                    minHeap.offer(new Pair(entry.getKey(), index));
-                    dumpCurrentValues.set(index, entry.getValue());
+                    Pair<byte[], byte[]> pair = dumpIterators.get(index).next();
+                    minHeap.offer(new Pair(pair.getKey(), index));
+                    Object[] metricValues = new Object[metrics.trueBitCount()];
+                    measureCodec.decode(ByteBuffer.wrap(pair.getValue()), metricValues);
+                    dumpCurrentValues.set(index, metricValues);
                 }
             }
 
@@ -473,27 +494,26 @@ public class GTAggregateScanner implements IGTScanner {
                         return !minHeap.isEmpty();
                     }
 
+                    private void internalAggregate() {
+                        Pair<byte[], Integer> peekEntry = minHeap.poll();
+                        resultAggrs.aggregate(dumpCurrentValues.get(peekEntry.getValue()));
+                        enqueueFromDump(peekEntry.getValue());
+                    }
+
                     @Override
                     public Pair<byte[], MeasureAggregator[]> next() {
                         // Use minimum heap to merge sort the keys,
                         // also do aggregation for measures with same keys in different dumps
-                        Pair<byte[], Integer> peekEntry = minHeap.poll();
-                        MeasureAggregator[] mergedAggr = dumpCurrentValues.get(peekEntry.getValue());
-                        enqueueFromDump(peekEntry.getValue());
-
-                        while (!minHeap.isEmpty() && bytesComparator.compare(peekEntry.getKey(),
minHeap.peek().getKey()) == 0) {
-                            Pair<byte[], Integer> newPeek = minHeap.poll();
+                        resultAggrs.reset();
 
-                            MeasureAggregator[] newPeekAggr = dumpCurrentValues.get(newPeek.getValue());
-                            for (int i = 0; i < newPeekAggr.length; i++) {
-                                if (aggrMask[i])
-                                    mergedAggr[i].aggregate(newPeekAggr[i].getState());
-                            }
+                        byte[] peekKey = minHeap.peek().getKey();
+                        internalAggregate();
 
-                            enqueueFromDump(newPeek.getValue());
+                        while (!minHeap.isEmpty() && bytesComparator.compare(peekKey,
minHeap.peek().getKey()) == 0) {
+                            internalAggregate();
                         }
 
-                        return new Pair(peekEntry.getKey(), mergedAggr);
+                        return new Pair(peekKey, resultMeasureAggregators);
                     }
 
                     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa9a8298/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
index eb8a20b..ede690b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -34,6 +34,11 @@ public class MeasureAggregators implements Serializable {
     private final MeasureAggregator[] aggs;
     private final int descLength;
 
+    public MeasureAggregators(MeasureAggregator... aggs) {
+        this.descLength = aggs.length;
+        this.aggs = aggs;
+    }
+
     public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
         this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/aa9a8298/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
index b02addd..2794fa8 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -21,6 +21,7 @@ package org.apache.kylin.measure;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
@@ -46,11 +47,23 @@ public class MeasureCodec {
         init(dataTypes);
     }
 
+    public MeasureCodec(DataType... dateTypes) {
+        init(dateTypes);
+    }
+
     public MeasureCodec(String... dataTypes) {
         init(dataTypes);
     }
 
     private void init(String[] dataTypes) {
+        DataType[] typeInstances = new DataType[dataTypes.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            typeInstances[i] = DataType.getType(dataTypes[i]);
+        }
+        init(typeInstances);
+    }
+
+    private void init(DataType[] dataTypes) {
         nMeasures = dataTypes.length;
         serializers = new DataTypeSerializer[nMeasures];
 


Mime
View raw message