kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mahong...@apache.org
Subject incubator-kylin git commit: KYLIN-808 fix bug and add more logs
Date Thu, 04 Jun 2015 02:37:22 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 44f6db6d1 -> 925bc8f1f


KYLIN-808 fix bug and add more logs


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/925bc8f1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/925bc8f1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/925bc8f1

Branch: refs/heads/0.8.0
Commit: 925bc8f1fe963a4fa58afbb20707a457c56bddce
Parents: 44f6db6
Author: honma <honma@ebay.com>
Authored: Thu Jun 4 10:37:02 2015 +0800
Committer: honma <honma@ebay.com>
Committed: Thu Jun 4 10:37:15 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamConsumer.java | 35 ++++++++++----------
 .../kylin/streaming/MicroStreamBatch.java       | 12 +++++++
 .../apache/kylin/streaming/StreamBuilder.java   |  3 +-
 3 files changed, 31 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/925bc8f1/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index a229c65..d7d5461 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -74,7 +74,8 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
     private final ExecutorService executorService = Executors.newSingleThreadExecutor();
 
     private static final int BATCH_PUT_THRESHOLD = 10000;
-
+    private int totalConsumedMessageCount = 0;
+    private int totalRawMessageCount = 0;
 
     public CubeStreamConsumer(String cubeName) {
         this.kylinConfig = KylinConfig.getInstanceFromEnv();
@@ -88,11 +89,15 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
             logger.info("nothing to build, skip to next iteration");
             return;
         }
+
+        totalConsumedMessageCount += microStreamBatch.size();
+        totalRawMessageCount += microStreamBatch.getRawMessageCount();
+
         final List<List<String>> parsedStreamMessages = microStreamBatch.getStreams();
         long startOffset = microStreamBatch.getOffset().getFirst();
         long endOffset = microStreamBatch.getOffset().getSecond();
         LinkedBlockingQueue<List<String>> blockingQueue = new LinkedBlockingQueue<List<String>>(parsedStreamMessages);
-        blockingQueue.put(Collections.<String>emptyList());
+        blockingQueue.put(Collections.<String> emptyList());
 
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
@@ -110,12 +115,13 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer
{
         final HTableInterface hTable = createHTable(cubeSegment);
 
         final CubeStreamRecordWriter gtRecordWriter = new CubeStreamRecordWriter(cubeDesc,
hTable);
-        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
-                dictionaryMap, gtRecordWriter);
+        InMemCubeBuilder inMemCubeBuilder = new InMemCubeBuilder(blockingQueue, cubeInstance.getDescriptor(),
dictionaryMap, gtRecordWriter);
 
         executorService.submit(inMemCubeBuilder).get();
         gtRecordWriter.flush();
         commitSegment(cubeSegment);
+
+        logger.info("Consumed {} messages out of {} raw messages", totalConsumedMessageCount,
totalRawMessageCount);
     }
 
     private void writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>>
dictionaryMap, long startOffset, long endOffset) {
@@ -126,12 +132,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
             signature.setLastModifiedTime(System.currentTimeMillis());
             signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
             signature.setSize(endOffset - startOffset);
-            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(),
-                    tblColRef.getName(),
-                    tblColRef.getColumnDesc().getZeroBasedIndex(),
-                    tblColRef.getDatatype(),
-                    signature,
-                    ReadableTable.DELIM_AUTO);
+            DictionaryInfo dictInfo = new DictionaryInfo(tblColRef.getTable(), tblColRef.getName(),
tblColRef.getColumnDesc().getZeroBasedIndex(), tblColRef.getDatatype(), signature, ReadableTable.DELIM_AUTO);
             logger.info("writing dictionary for TblColRef:" + tblColRef.toString());
             DictionaryManager dictionaryManager = DictionaryManager.getInstance(kylinConfig);
             try {
@@ -163,7 +164,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
             }
             this.nColumns = keyValueCreators.size();
             this.hTable = hTable;
-            this.byteBuffer = ByteBuffer.allocate(1<<20);
+            this.byteBuffer = ByteBuffer.allocate(1 << 20);
         }
 
         private byte[] copy(byte[] array, int offset, int length) {
@@ -175,7 +176,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         private ByteBuffer createKey(Long cuboidId, GTRecord record) {
             byteBuffer.clear();
             byteBuffer.put(Bytes.toBytes(cuboidId));
-            final int cardinality = BitSet.valueOf(new long[]{cuboidId}).cardinality();
+            final int cardinality = BitSet.valueOf(new long[] { cuboidId }).cardinality();
             for (int i = 0; i < cardinality; i++) {
                 final ByteArray byteArray = record.get(i);
                 byteBuffer.put(byteArray.array(), byteArray.offset(), byteArray.length());
@@ -223,7 +224,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
         final List<TblColRef> allDimensions = cubeInstance.getAllDimensions();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
-        for (TblColRef column: columnsNeedToBuildDictionary) {
+        for (TblColRef column : columnsNeedToBuildDictionary) {
             final int index = allDimensions.indexOf(column);
             Preconditions.checkArgument(index >= 0);
             tblColRefMap.put(index, column);
@@ -261,12 +262,11 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer
{
         final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         final Map<Long, Integer[]> allCuboidsBitSet = Maps.newHashMap();
 
-
         Lists.transform(allCuboidIds, new Function<Long, Integer[]>() {
             @Nullable
             @Override
             public Integer[] apply(@Nullable Long cuboidId) {
-                BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+                BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
                 Integer[] result = new Integer[bitSet.cardinality()];
 
                 long mask = Long.highestOneBit(baseCuboidId);
@@ -284,7 +284,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
         final Map<Long, HyperLogLogPlusCounter> result = Maps.newHashMapWithExpectedSize(allCuboidIds.size());
         for (Long cuboidId : allCuboidIds) {
             result.put(cuboidId, new HyperLogLogPlusCounter(14));
-            BitSet bitSet = BitSet.valueOf(new long[]{cuboidId});
+            BitSet bitSet = BitSet.valueOf(new long[] { cuboidId });
             Integer[] cuboidBitSet = new Integer[bitSet.cardinality()];
 
             long mask = Long.highestOneBit(baseCuboidId);
@@ -348,12 +348,11 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer
{
 
     private void getSubCuboidIds(CuboidScheduler cuboidScheduler, long parentCuboidId, List<Long>
result) {
         result.add(parentCuboidId);
-        for (Long cuboidId: cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
+        for (Long cuboidId : cuboidScheduler.getSpanningCuboid(parentCuboidId)) {
             getSubCuboidIds(cuboidScheduler, cuboidId, result);
         }
     }
 
-
     private HTableInterface createHTable(final CubeSegment cubeSegment) throws Exception
{
         final String hTableName = cubeSegment.getStorageLocationIdentifier();
         CubeHTableUtil.createHTable(cubeSegment.getCubeDesc(), hTableName, null);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/925bc8f1/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
index 268c98c..e1ff60c 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/MicroStreamBatch.java
@@ -15,6 +15,8 @@ public final class MicroStreamBatch {
 
     private final Pair<Long, Long> offset;
 
+    private int rawMessageCount;
+
     public MicroStreamBatch() {
         this.streams = Lists.newLinkedList();
         this.timestamp = Pair.newPair(Long.MAX_VALUE, Long.MIN_VALUE);
@@ -43,6 +45,15 @@ public final class MicroStreamBatch {
         return streams.size();
     }
 
+    public final void incRawMessageCount() {
+        this.rawMessageCount++;
+    }
+
+    public final int getRawMessageCount()
+    {
+        return this.rawMessageCount;
+    }
+
     public final void add(ParsedStreamMessage parsedStreamMessage) {
         if (offset.getFirst() > parsedStreamMessage.getOffset()) {
             offset.setFirst(parsedStreamMessage.getOffset());
@@ -66,6 +77,7 @@ public final class MicroStreamBatch {
         result.offset.setSecond(Math.min(result.offset.getSecond(), another.offset.getSecond()));
         result.timestamp.setFirst(Math.min(result.timestamp.getFirst(), another.timestamp.getFirst()));
         result.timestamp.setSecond(Math.min(result.timestamp.getSecond(), another.timestamp.getSecond()));
+        result.rawMessageCount = one.rawMessageCount + another.rawMessageCount;
         return result;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/925bc8f1/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index c9d2795..50308f4 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -110,7 +110,7 @@ public class StreamBuilder implements Runnable {
                         batch = MicroStreamBatch.union(batch, batches.get(i));
                     }
                 }
-                consumer.consume(batches.get(0));
+                consumer.consume(batch);
             }
         } catch (InterruptedException e) {
             throw new RuntimeException("stream fetcher thread should not be interrupted",
e);
@@ -187,6 +187,7 @@ public class StreamBuilder implements Runnable {
                     final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
                     if (parsedStreamMessage.getTimestamp() - microStreamBatch.getTimestamp().getFirst()
> condition.getBatchInterval()) {
                         streamMessageQueue.take();
+                        microStreamBatch.incRawMessageCount();
                         if (getStreamFilter().filter(parsedStreamMessage)) {
                             if (microStreamBatch.size() >= condition.getBatchSize()) {
                                 return microStreamBatch;


Mime
View raw message