kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [09/10] incubator-kylin git commit: add stream builder
Date Fri, 27 Feb 2015 06:28:41 GMT
add stream builder


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/a5365764
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/a5365764
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/a5365764

Branch: refs/heads/streaming
Commit: a5365764c1fc53d6c82c33d2d622d5f142d4aeca
Parents: 4d63e46
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Tue Feb 17 13:59:40 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Tue Feb 17 13:59:40 2015 +0800

----------------------------------------------------------------------
 .../kylin/streaming/kafka/StreamBuilder.java    | 94 ++++++++++++++++++++
 1 file changed, 94 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a5365764/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java b/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
new file mode 100644
index 0000000..145b5c8
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/kafka/StreamBuilder.java
@@ -0,0 +1,94 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming.kafka;
+
+import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by qianzhou on 2/17/15.
+ */
+public abstract class StreamBuilder implements Runnable {
+
+    private List<BlockingQueue<Stream>> streamQueues;
+    private static final Logger logger = LoggerFactory.getLogger(StreamBuilder.class);
+    private final int batchBuildCount;
+
+    public StreamBuilder(List<BlockingQueue<Stream>> streamQueues, int batchBuildCount)
{
+        this.streamQueues = streamQueues;
+        this.batchBuildCount = batchBuildCount;
+    }
+
+
+    private int getEarliestStreamIndex(Stream[] streamHead) {
+        long ts = Long.MAX_VALUE;
+        int idx = 0;
+        for (int i = 0; i < streamHead.length; i++) {
+            if (streamHead[i].getTimestamp() < ts) {
+                ts = streamHead[i].getTimestamp();
+                idx = i;
+            }
+        }
+        return idx;
+    }
+
+    protected abstract void build(List<Stream> streamsToBuild);
+
+    @Override
+    public void run() {
+        try {
+            Stream[] streamHead = new Stream[streamQueues.size()];
+            for (int i = 0; i < streamQueues.size(); i++) {
+                streamHead[i] = streamQueues.get(i).take();
+            }
+            List<Stream> streamToBuild = Lists.newArrayListWithCapacity(batchBuildCount);
+            while (true) {
+                if (streamToBuild.size() >= batchBuildCount) {
+                    build(streamToBuild);
+                    streamToBuild.clear();
+                }
+                int idx = getEarliestStreamIndex(streamHead);
+                streamToBuild.add(streamHead[idx]);
+                streamHead[idx] = streamQueues.get(idx).take();
+            }
+        } catch (InterruptedException e) {
+            logger.error("", e);
+        }
+    }
+}


Mime
View raw message