kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qhz...@apache.org
Subject incubator-kylin git commit: KYLIN-808
Date Thu, 04 Jun 2015 02:41:12 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8.0 925bc8f1f -> da381a4a8


KYLIN-808


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/da381a4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/da381a4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/da381a4a

Branch: refs/heads/0.8.0
Commit: da381a4a8fa45c3e87c398b0c5c1f863d66ecf57
Parents: 925bc8f
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Thu Jun 4 10:38:17 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Thu Jun 4 10:40:07 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/CubeStreamConsumer.java |  2 +-
 .../kylin/job/streaming/StreamingBootstrap.java |  5 ++--
 .../apache/kylin/job/BuildIIWithStreamTest.java |  5 +++-
 .../job/streaming/CubeStreamConsumerTest.java   |  2 +-
 .../apache/kylin/streaming/StreamBuilder.java   | 27 ++++++++++++++------
 5 files changed, 28 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/da381a4a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
index d7d5461..18373a9 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/CubeStreamConsumer.java
@@ -101,7 +101,7 @@ public class CubeStreamConsumer implements MicroStreamBatchConsumer {
 
         final CubeInstance cubeInstance = cubeManager.reloadCubeLocal(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
-        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName),
System.currentTimeMillis(), false, false);
+        final CubeSegment cubeSegment = cubeManager.appendSegments(cubeManager.getCube(cubeName),
microStreamBatch.getTimestamp().getSecond(), false, false);
         final Map<Long, HyperLogLogPlusCounter> samplingResult = sampling(cubeInstance.getDescriptor(),
parsedStreamMessages);
 
         final Configuration conf = HadoopUtil.getCurrentConfiguration();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/da381a4a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index cdade80..b7ed60c 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -159,7 +159,7 @@ public class StreamingBootstrap {
         final String cubeName = streamingConfig.getCubeName();
         final CubeInstance cubeInstance = CubeManager.getInstance(kylinConfig).getCube(cubeName);
 
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, new MicroBatchCondition(Integer.MAX_VALUE,
5 * 60 * 1000), new CubeStreamConsumer(cubeName));
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(allClustersData, new MicroBatchCondition(Integer.MAX_VALUE,
5 * 60 * 1000), new CubeStreamConsumer(cubeName), cubeInstance.getDateRangeEnd());
         cubeStreamBuilder.setStreamParser(getStreamParser(streamingConfig, cubeInstance.getAllColumns()));
         cubeStreamBuilder.setStreamFilter(getStreamFilter(streamingConfig));
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
@@ -232,7 +232,8 @@ public class StreamingBootstrap {
         for (int i = startShard; i < endShard; ++i) {
             final StreamBuilder task = new StreamBuilder(consumer.getStreamQueue(i % parallelism),
                     new MicroBatchCondition(iiDesc.getSliceSize(), Integer.MAX_VALUE),
-                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(),
iiDesc, i));
+                    new IIStreamConsumer(streamingConfig.getName(), iiSegment.getStorageLocationIdentifier(),
iiDesc, i),
+                    0L);
             task.setStreamParser(getStreamParser(streamingConfig, ii.getDescriptor().listAllColumns()));
             if (i == endShard - 1) {
                 streamingBuilderPool.submit(task).get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/da381a4a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
index 43dc769..e84b176 100644
--- a/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
+++ b/job/src/test/java/org/apache/kylin/job/BuildIIWithStreamTest.java
@@ -214,7 +214,10 @@ public class BuildIIWithStreamTest {
         ToolRunner.run(new IICreateHTableJob(), args);
 
         ExecutorService executorService = Executors.newSingleThreadExecutor();
-        final StreamBuilder streamBuilder = new StreamBuilder(queue, new MicroBatchCondition(segment.getIIDesc().getSliceSize(),
Integer.MAX_VALUE), new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(),
0));
+        final StreamBuilder streamBuilder = new StreamBuilder(queue,
+                new MicroBatchCondition(segment.getIIDesc().getSliceSize(), Integer.MAX_VALUE),
+                new IIStreamConsumer(iiName, segment.getStorageLocationIdentifier(), segment.getIIDesc(),
0),
+                0);
 
         List<String[]> sorted = getSortedRows(reader, desc.getTimestampColumn());
         int count = sorted.size();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/da381a4a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
index 8377851..328ec72 100644
--- a/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/streaming/CubeStreamConsumerTest.java
@@ -64,7 +64,7 @@ public class CubeStreamConsumerTest {
     @Test
     public void test() throws Exception {
         LinkedBlockingDeque<StreamMessage> queue = new LinkedBlockingDeque<>();
-        StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE,
30 * 1000), new CubeStreamConsumer(CUBE_NAME));
+        StreamBuilder cubeStreamBuilder = new StreamBuilder(queue, new MicroBatchCondition(Integer.MAX_VALUE,
30 * 1000), new CubeStreamConsumer(CUBE_NAME), System.currentTimeMillis());
         final Future<?> future = Executors.newSingleThreadExecutor().submit(cubeStreamBuilder);
         loadDataFromLocalFile(queue, 100000);
         future.get();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/da381a4a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
index 50308f4..cda6209 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamBuilder.java
@@ -48,6 +48,7 @@ import java.util.concurrent.*;
 public class StreamBuilder implements Runnable {
 
     private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+    private final long startTimestamp;
 
     private StreamParser streamParser = StringStreamParser.instance;
 
@@ -59,18 +60,20 @@ public class StreamBuilder implements Runnable {
 
     private final MicroBatchCondition condition;
 
-    public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition
condition, MicroStreamBatchConsumer consumer) {
+    public StreamBuilder(List<BlockingQueue<StreamMessage>> inputs, MicroBatchCondition
condition, MicroStreamBatchConsumer consumer, long startTimestamp) {
         Preconditions.checkArgument(inputs.size() > 0);
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
         this.condition = condition;
+        this.startTimestamp = startTimestamp;
         init(inputs);
     }
 
-    public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition,
MicroStreamBatchConsumer consumer) {
+    public StreamBuilder(BlockingQueue<StreamMessage> input, MicroBatchCondition condition,
MicroStreamBatchConsumer consumer, long startTimestamp) {
         this.streamMessageQueues = Lists.newArrayList();
         this.consumer = Preconditions.checkNotNull(consumer);
         this.condition = condition;
+        this.startTimestamp = startTimestamp;
         init(Preconditions.checkNotNull(input));
     }
 
@@ -87,11 +90,12 @@ public class StreamBuilder implements Runnable {
         try {
             final int inputCount = streamMessageQueues.size();
             final ExecutorService executorService = Executors.newFixedThreadPool(inputCount);
+            long start = startTimestamp;
             while (true) {
                 CountDownLatch countDownLatch = new CountDownLatch(inputCount);
                 ArrayList<Future<MicroStreamBatch>> futures = Lists.newArrayListWithExpectedSize(inputCount);
                 for (BlockingQueue<StreamMessage> streamMessageQueue : streamMessageQueues)
{
-                    futures.add(executorService.submit(new StreamFetcher(streamMessageQueue,
countDownLatch)));
+                    futures.add(executorService.submit(new StreamFetcher(streamMessageQueue,
countDownLatch, start, start + condition.getBatchInterval())));
                 }
                 countDownLatch.await();
                 ArrayList<MicroStreamBatch> batches = Lists.newArrayListWithExpectedSize(inputCount);
@@ -110,6 +114,9 @@ public class StreamBuilder implements Runnable {
                         batch = MicroStreamBatch.union(batch, batches.get(i));
                     }
                 }
+                batch.getTimestamp().setFirst(start);
+                batch.getTimestamp().setSecond(start + condition.getBatchInterval());
+                start += condition.getBatchInterval();
                 consumer.consume(batch);
             }
         } catch (InterruptedException e) {
@@ -127,16 +134,17 @@ public class StreamBuilder implements Runnable {
 
         private final BlockingQueue<StreamMessage> streamMessageQueue;
         private final CountDownLatch countDownLatch;
-        private long lastBuildTime = System.currentTimeMillis();
-        private long lastBatchTimestamp = -1;
+        private long startTimestamp;
+        private long endTimestamp;
 
-        public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch
countDownLatch) {
+        public StreamFetcher(BlockingQueue<StreamMessage> streamMessageQueue, CountDownLatch
countDownLatch, long startTimestamp, long endTimestamp) {
             this.streamMessageQueue = streamMessageQueue;
             this.countDownLatch = countDownLatch;
+            this.startTimestamp = startTimestamp;
+            this.endTimestamp = endTimestamp;
         }
 
         private void clearCounter() {
-            lastBuildTime = System.currentTimeMillis();
         }
 
         private StreamMessage peek(BlockingQueue<StreamMessage> queue, long timeout)
{
@@ -185,7 +193,10 @@ public class StreamBuilder implements Runnable {
                     }
 
                     final ParsedStreamMessage parsedStreamMessage = getStreamParser().parse(streamMessage);
-                    if (parsedStreamMessage.getTimestamp() - microStreamBatch.getTimestamp().getFirst()
> condition.getBatchInterval()) {
+                    final long timestamp = parsedStreamMessage.getTimestamp();
+                    if (timestamp < startTimestamp) {
+                        streamMessageQueue.take();
+                    } else if (timestamp < endTimestamp) {
                         streamMessageQueue.take();
                         microStreamBatch.incRawMessageCount();
                         if (getStreamFilter().filter(parsedStreamMessage)) {


Mime
View raw message