kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [15/25] kylin git commit: KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure
Date Thu, 03 Mar 2016 03:31:02 GMT
KYLIN-1387 Streaming cubing doesn't generate cuboids files on HDFS, cause cube merge failure


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fd77e1a7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fd77e1a7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fd77e1a7

Branch: refs/heads/helix-rebase
Commit: fd77e1a772f31c3b2e5c9521b79a2f5130ac5a3a
Parents: 12c3cc6
Author: shaofengshi <shaofengshi@apache.org>
Authored: Tue Feb 2 17:34:46 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Wed Mar 2 17:27:46 2016 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/CompoundCuboidWriter.java  | 57 ++++++++++++++
 .../kylin/cube/inmemcubing/ICuboidWriter.java   |  4 +-
 .../kylin/job/constant/ExecutableConstants.java |  1 +
 .../kylin/engine/mr/steps/KVGTRecordWriter.java | 81 ++++++++++++++++++++
 .../mr/steps/MapContextGTRecordWriter.java      | 69 ++---------------
 .../streaming/cube/StreamingCubeBuilder.java    | 12 ++-
 .../storage/hbase/steps/HBaseCuboidWriter.java  | 24 +++---
 .../hbase/steps/HBaseMROutput2Transition.java   |  2 +-
 .../kylin/storage/hbase/steps/HBaseMRSteps.java |  2 +-
 .../hbase/steps/HBaseStreamingOutput.java       |  8 +-
 .../hbase/steps/SequenceFileCuboidWriter.java   | 75 ++++++++++++++++++
 11 files changed, 254 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
new file mode 100644
index 0000000..46eef50
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/CompoundCuboidWriter.java
@@ -0,0 +1,57 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements. See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License. You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.kylin.cube.inmemcubing;
+
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+
+/**
+ */
+public class CompoundCuboidWriter implements ICuboidWriter {
+
+    private Iterable<ICuboidWriter> cuboidWriters;
+
+    public CompoundCuboidWriter(Iterable<ICuboidWriter> cuboidWriters) {
+        this.cuboidWriters = cuboidWriters;
+
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.write(cuboidId, record);
+        }
+    }
+
+    @Override
+    public void flush() throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.flush();
+        }
+
+    }
+
+    @Override
+    public void close() throws IOException {
+        for (ICuboidWriter writer : cuboidWriters) {
+            writer.close();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
index 9e26e5e..e6cfa02 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/ICuboidWriter.java
@@ -27,7 +27,7 @@ public interface ICuboidWriter {
 
     void write(long cuboidId, GTRecord record) throws IOException;
 
-    void flush();
+    void flush() throws IOException;
     
-    void close();
+    void close() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index ba50880..d370b0d 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -56,6 +56,7 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
     public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
     public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
+    public static final String STEP_NAME_GARBAGE_COLLECTION_HDFS = "Garbage Collection on
HDFS";
 
     public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
     public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data
to HFile";

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
new file mode 100644
index 0000000..e201705
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/KVGTRecordWriter.java
@@ -0,0 +1,81 @@
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
+import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ */
+public abstract class KVGTRecordWriter implements ICuboidWriter {
+
+    private static final Log logger = LogFactory.getLog(KVGTRecordWriter.class);
+    private Long lastCuboidId;
+    protected CubeSegment cubeSegment;
+    protected CubeDesc cubeDesc;
+
+    private AbstractRowKeyEncoder rowKeyEncoder;
+    private int dimensions;
+    private int measureCount;
+    private byte[] keyBuf;
+    private int[] measureColumnsIndex;
+    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+    private ByteArrayWritable outputKey = new ByteArrayWritable();
+    private ByteArrayWritable outputValue = new ByteArrayWritable();
+    private long cuboidRowCount = 0;
+
+    //for shard
+
+    public KVGTRecordWriter(CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        this.cubeDesc = cubeDesc;
+        this.cubeSegment = cubeSegment;
+        this.measureCount = cubeDesc.getMeasures().size();
+    }
+
+    @Override
+    public void write(long cuboidId, GTRecord record) throws IOException {
+
+        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
+            if (lastCuboidId != null) {
+                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
+                cuboidRowCount = 0;
+            }
+            // output another cuboid
+            initVariables(cuboidId);
+            lastCuboidId = cuboidId;
+        }
+
+        cuboidRowCount++;
+        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
+
+        //output measures
+        valueBuf.clear();
+        record.exportColumns(measureColumnsIndex, valueBuf);
+
+        outputKey.set(keyBuf, 0, keyBuf.length);
+        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+        writeAsKeyValue(outputKey, outputValue);
+    }
+
+    protected abstract void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value)
throws IOException;
+
+    private void initVariables(Long cuboidId) {
+        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc,
cuboidId));
+        keyBuf = rowKeyEncoder.createBuf();
+
+        dimensions = Long.bitCount(cuboidId);
+        measureColumnsIndex = new int[measureCount];
+        for (int i = 0; i < measureCount; i++) {
+            measureColumnsIndex[i] = dimensions + i;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
index 8416d95..bee152b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java
@@ -1,76 +1,32 @@
 package org.apache.kylin.engine.mr.steps;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.MapContext;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.gridtable.GTRecord;
+
+import java.io.IOException;
 
 /**
  */
-public class MapContextGTRecordWriter implements ICuboidWriter {
+public class MapContextGTRecordWriter extends KVGTRecordWriter {
 
     private static final Log logger = LogFactory.getLog(MapContextGTRecordWriter.class);
     protected MapContext<?, ?, ByteArrayWritable, ByteArrayWritable> mapContext;
-    private Long lastCuboidId;
-    protected CubeSegment cubeSegment;
-    protected CubeDesc cubeDesc;
-
-    private AbstractRowKeyEncoder rowKeyEncoder;
-    private int dimensions;
-    private int measureCount;
-    private byte[] keyBuf;
-    private int[] measureColumnsIndex;
-    private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private ByteArrayWritable outputKey = new ByteArrayWritable();
-    private ByteArrayWritable outputValue = new ByteArrayWritable();
-    private long cuboidRowCount = 0;
-
-    //for shard
 
     public MapContextGTRecordWriter(MapContext<?, ?, ByteArrayWritable, ByteArrayWritable>
mapContext, CubeDesc cubeDesc, CubeSegment cubeSegment) {
+        super(cubeDesc, cubeSegment);
         this.mapContext = mapContext;
-        this.cubeDesc = cubeDesc;
-        this.cubeSegment = cubeSegment;
-        this.measureCount = cubeDesc.getMeasures().size();
     }
 
     @Override
-    public void write(long cuboidId, GTRecord record) throws IOException {
-
-        if (lastCuboidId == null || !lastCuboidId.equals(cuboidId)) {
-            if (lastCuboidId != null) {
-                logger.info("Cuboid " + lastCuboidId + " has " + cuboidRowCount + " rows");
-                cuboidRowCount = 0;
-            }
-            // output another cuboid
-            initVariables(cuboidId);
-            lastCuboidId = cuboidId;
-        }
-
-        cuboidRowCount++;
-        rowKeyEncoder.encode(record, record.getInfo().getPrimaryKey(), keyBuf);
-
-        //output measures
-        valueBuf.clear();
-        record.exportColumns(measureColumnsIndex, valueBuf);
-
-        outputKey.set(keyBuf, 0, keyBuf.length);
-        outputValue.set(valueBuf.array(), 0, valueBuf.position());
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws
IOException {
         try {
-            mapContext.write(outputKey, outputValue);
+            mapContext.write(key, value);
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            throw new IOException(e);
         }
     }
 
@@ -83,15 +39,4 @@ public class MapContextGTRecordWriter implements ICuboidWriter {
     public void close() {
 
     }
-
-    private void initVariables(Long cuboidId) {
-        rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, Cuboid.findById(cubeDesc,
cuboidId));
-        keyBuf = rowKeyEncoder.createBuf();
-
-        dimensions = Long.bitCount(cuboidId);
-        measureColumnsIndex = new int[measureCount];
-        for (int i = 0; i < measureCount; i++) {
-            measureColumnsIndex[i] = dimensions + i;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index d7056cf..20ff01d 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -98,6 +98,14 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
             throw new RuntimeException(e);
         } catch (ExecutionException e) {
             throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+        } catch (IOException e) {
+            throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+        } finally {
+            try {
+                cuboidWriter.close();
+            } catch (IOException e) {
+                throw new RuntimeException("error build cube from StreamingBatch", e.getCause());
+            }
         }
     }
 
@@ -106,7 +114,9 @@ public class StreamingCubeBuilder implements StreamingBatchBuilder {
         CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         try {
-            return cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(),
streamingBatch.getTimeRange().getSecond(), false, false);
+            CubeSegment segment = cubeManager.appendSegments(cubeInstance, streamingBatch.getTimeRange().getFirst(),
streamingBatch.getTimeRange().getSecond(), false, false);
+            segment.setLastBuildJobID(segment.getUuid()); // give a fake job id
+            return segment;
         } catch (IOException e) {
             throw new RuntimeException("failed to create IBuildable", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
index c4dc0b5..ddc868d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java
@@ -33,9 +33,8 @@
  */
 package org.apache.kylin.storage.hbase.steps;
 
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.collect.Lists;
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Put;
@@ -51,13 +50,14 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
 
 /**
  */
-public final class HBaseCuboidWriter implements ICuboidWriter {
+public class HBaseCuboidWriter implements ICuboidWriter {
 
-    private static final Logger logger = LoggerFactory.getLogger(HBaseStreamingOutput.class);
+    private static final Logger logger = LoggerFactory.getLogger(HBaseCuboidWriter.class);
 
     private static final int BATCH_PUT_THRESHOLD = 10000;
 
@@ -125,8 +125,8 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
         }
     }
 
-    public final void flush() {
-        try {
+    @Override
+    public final void flush() throws IOException {
             if (!puts.isEmpty()) {
                 long t = System.currentTimeMillis();
                 if (hTable != null) {
@@ -136,14 +136,12 @@ public final class HBaseCuboidWriter implements ICuboidWriter {
                 logger.info("commit total " + puts.size() + " puts, totally cost:" + (System.currentTimeMillis()
- t) + "ms");
                 puts.clear();
             }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
     }
 
     @Override
-    public void close() {
-
+    public void close() throws IOException {
+        flush();
+        IOUtils.closeQuietly(hTable);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 4c2737d..7bb3647 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -80,7 +80,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
 
             @Override
             public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow) {
-                jobFlow.addTask(steps.createMergeGCStep());
+                steps.addMergingGarbageCollectionSteps(jobFlow);
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index c3bd7b5..535f877 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@ -161,7 +161,7 @@ public class HBaseMRSteps extends JobBuilderSupport {
         toDeletePaths.addAll(getMergingHDFSPaths());
 
         HDFSPathGarbageCollectionStep step = new HDFSPathGarbageCollectionStep();
-        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
+        step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HDFS);
         step.setDeletePaths(toDeletePaths);
         step.setJobId(jobId);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index 770be3c..4cc4794 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -18,9 +18,11 @@
 package org.apache.kylin.storage.hbase.steps;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -31,6 +33,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.inmemcubing.CompoundCuboidWriter;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.BatchConstants;
@@ -54,7 +57,10 @@ public class HBaseStreamingOutput implements IStreamingOutput {
 
             final HTableInterface hTable;
             hTable = createHTable(cubeSegment);
-            return new HBaseCuboidWriter(cubeSegment, hTable);
+            List<ICuboidWriter> cuboidWriters = Lists.newArrayList();
+            cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable));
+            cuboidWriters.add(new SequenceFileCuboidWriter(cubeSegment.getCubeDesc(), cubeSegment));
+            return new CompoundCuboidWriter(cuboidWriters);
         } catch (IOException e) {
             throw new RuntimeException("failed to get ICuboidWriter", e);
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fd77e1a7/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
new file mode 100644
index 0000000..4d76522
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SequenceFileCuboidWriter.java
@@ -0,0 +1,75 @@
+package org.apache.kylin.storage.hbase.steps;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.ByteArrayWritable;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.steps.KVGTRecordWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ */
+public class SequenceFileCuboidWriter extends KVGTRecordWriter {
+
+    private static final Logger logger = LoggerFactory.getLogger(SequenceFileCuboidWriter.class);
+    private SequenceFile.Writer writer = null;
+
+    public SequenceFileCuboidWriter(CubeDesc cubeDesc, CubeSegment segment) {
+        super(cubeDesc, segment);
+    }
+
+
+    @Override
+    protected void writeAsKeyValue(ByteArrayWritable key, ByteArrayWritable value) throws
IOException {
+        if (writer == null) {
+            synchronized (SequenceFileCuboidWriter.class) {
+                if (writer == null) {
+                    JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(cubeSegment,
"SYSTEM");
+                    String cuboidRoot = jobBuilderSupport.getCuboidRootPath(cubeSegment);
+                    Path cuboidPath = new Path(cuboidRoot);
+                    FileSystem fs = HadoopUtil.getFileSystem(cuboidRoot);
+                    try {
+                        if (fs.exists(cuboidPath)) {
+                            fs.delete(cuboidPath, true);
+                        }
+
+                        fs.mkdirs(cuboidPath);
+                    } finally {
+                        IOUtils.closeQuietly(fs);
+                    }
+
+                    Path cuboidFile = new Path(cuboidPath, "data.seq");
+                    logger.debug("Cuboid is written to " + cuboidFile);
+                    writer = SequenceFile.createWriter(HadoopUtil.getCurrentConfiguration(),
SequenceFile.Writer.file(cuboidFile), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class));
+                }
+            }
+        }
+
+        Text outputValue = new Text();
+        Text outputKey = new Text();
+        outputKey.set(key.array(), key.offset(), key.length());
+        outputValue.set(value.array(), value.offset(), value.length());
+        writer.append(outputKey, outputValue);
+    }
+
+    @Override
+    public void flush() throws IOException {
+        if (writer != null) {
+            writer.hflush();
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        IOUtils.closeQuietly(writer);
+    }
+}


Mime
View raw message