kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shaofeng...@apache.org
Subject [7/9] kylin git commit: KYLIN-2072 Cleanup old streaming code
Date Sun, 09 Oct 2016 14:17:39 GMT
KYLIN-2072 Cleanup old streaming code

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2200b595
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2200b595
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2200b595

Branch: refs/heads/KYLIN-2072
Commit: 2200b595af05901b04ef8a8b55564e7accedb044
Parents: 71f3735
Author: shaofengshi <shaofengshi@apache.org>
Authored: Sun Oct 9 13:10:50 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Sun Oct 9 22:17:10 2016 +0800

----------------------------------------------------------------------
 assembly/pom.xml                                |   4 -
 .../kylin/job/streaming/KafkaDataLoader.java    |  79 ----
 build/bin/cleanup_streaming_files.sh            |  42 --
 build/bin/kylin.sh                              |  61 ---
 build/bin/streaming_build.sh                    |  33 --
 build/bin/streaming_check.sh                    |  29 --
 build/bin/streaming_fillgap.sh                  |  26 --
 build/bin/streaming_rolllog.sh                  |  29 --
 .../metadata/streaming/StreamingConfig.java     |  85 ++++
 .../metadata/streaming/StreamingManager.java    | 248 ++++++++++++
 .../.settings/org.eclipse.core.resources.prefs  |   6 -
 .../.settings/org.eclipse.jdt.core.prefs        | 386 -------------------
 .../.settings/org.eclipse.jdt.ui.prefs          |   7 -
 engine-streaming/pom.xml                        | 121 ------
 .../kylin/engine/streaming/BootstrapConfig.java |  71 ----
 .../kylin/engine/streaming/IStreamingInput.java |  30 --
 .../engine/streaming/IStreamingOutput.java      |  34 --
 .../streaming/OneOffStreamingBuilder.java       |  71 ----
 .../engine/streaming/StreamingBatchBuilder.java |  43 ---
 .../kylin/engine/streaming/StreamingConfig.java |  85 ----
 .../engine/streaming/StreamingManager.java      | 248 ------------
 .../kylin/engine/streaming/cli/MonitorCLI.java  |  88 -----
 .../engine/streaming/cli/StreamingCLI.java      | 114 ------
 .../streaming/cube/StreamingCubeBuilder.java    | 168 --------
 .../diagnose/StreamingLogAnalyzer.java          |  96 -----
 .../streaming/monitor/StreamingMonitor.java     | 172 ---------
 .../engine/streaming/util/StreamingUtils.java   |  51 ---
 .../kylin/provision/BuildCubeWithStream.java    |   4 +-
 pom.xml                                         |   6 -
 .../rest/controller/StreamingController.java    |   2 +-
 .../kylin/rest/controller/TableController.java  |   2 +-
 .../apache/kylin/rest/service/BasicService.java |   2 +-
 .../kylin/rest/service/StreamingService.java    |   2 +-
 source-kafka/pom.xml                            |   6 -
 .../kafka/ByteBufferBackedInputStream.java      |  52 ---
 .../apache/kylin/source/kafka/KafkaSource.java  |   2 +-
 .../kylin/source/kafka/KafkaStreamingInput.java | 227 -----------
 .../source/kafka/TimedJsonStreamParser.java     |   1 +
 .../kafka/diagnose/KafkaInputAnalyzer.java      | 312 ---------------
 .../source/kafka/diagnose/KafkaVerify.java      | 101 -----
 .../source/kafka/diagnose/TimeHistogram.java    |  85 ----
 .../kafka/util/ByteBufferBackedInputStream.java |  52 +++
 .../kylin/source/kafka/util/KafkaRequester.java | 191 ---------
 .../kylin/source/kafka/util/KafkaUtils.java     | 173 ---------
 .../test/java/TimedJsonStreamParserTest.java    |   4 +-
 storage-hbase/pom.xml                           |   4 -
 .../hbase/steps/HBaseStreamingOutput.java       |  98 -----
 .../apache/kylin/tool/CubeMetaExtractor.java    |   4 +-
 48 files changed, 397 insertions(+), 3360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 0c80afc..e6f83a8 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -47,10 +47,6 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-engine-mr</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-engine-streaming</artifactId>
-        </dependency>
 
         <!-- Env & Test -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 454f6cf..0000000
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.source.kafka.config.BrokerConfig;
-import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-import org.apache.kylin.source.kafka.config.KafkaConfig;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader extends StreamDataLoader {
-    List<KafkaClusterConfig> kafkaClusterConfigs;
-
-    public KafkaDataLoader(KafkaConfig kafkaConfig) {
-        super(kafkaConfig);
-        this.kafkaClusterConfigs = kafkaConfig.getKafkaClusterConfigs();
-    }
-
-    public void loadIntoKafka(List<String> messages) {
-
-        KafkaClusterConfig clusterConfig = kafkaClusterConfigs.get(0);
-        String brokerList = StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new Function<BrokerConfig, String>() {
-            @Nullable
-            @Override
-            public String apply(BrokerConfig brokerConfig) {
-                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
-            }
-        }), ",");
-        Properties props = new Properties();
-        props.put("metadata.broker.list", brokerList);
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-        props.put("retry.backoff.ms", "1000");
-
-        ProducerConfig config = new ProducerConfig(props);
-
-        Producer<String, String> producer = new Producer<String, String>(config);
-
-        List<KeyedMessage<String, String>> keyedMessages = Lists.newArrayList();
-        for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(clusterConfig.getTopic(), String.valueOf(i), messages.get(i));
-            keyedMessages.add(keyedMessage);
-        }
-        producer.send(keyedMessages);
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/build/bin/cleanup_streaming_files.sh
----------------------------------------------------------------------
diff --git a/build/bin/cleanup_streaming_files.sh b/build/bin/cleanup_streaming_files.sh
deleted file mode 100644
index 9b31a4f..0000000
--- a/build/bin/cleanup_streaming_files.sh
+++ /dev/null
@@ -1,42 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-if [ $# != 1 ]
-then
-    echo 'invalid input'
-    exit -1
-fi
-
-cd $KYLIN_HOME/logs
-
-for pidfile in `find -L . -name "$1_1*"`
-do
-    pidfile=`echo "$pidfile" | cut -c 3-`
-    echo "pidfile:$pidfile"
-    pid=`cat $pidfile`
-    if [ `ps -ef | awk '{print $2}' | grep -w $pid | wc -l` = 1 ]
-    then
-        echo "pid:$pid still running"
-    else
-        echo "pid:$pid not running, try to delete files"
-        echo $pidfile | xargs rm
-        echo "streaming_$pidfile.log" | xargs rm
-    fi
-done
-

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index e767492..039be9f 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -139,67 +139,6 @@ then
         exit 1    
     fi
 
-# streaming command
-elif [ "$1" == "streaming" ]
-then
-    if [ $# -lt 4 ]
-    then
-        echo "invalid input args $@"
-        exit -1
-    fi
-    if [ "$2" == "start" ]
-    then
-        retrieveDependency
-        source ${dir}/find-kafka-dependency.sh
-        
-        # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
-        hbase ${KYLIN_EXTRA_START_OPTS} \
-        -Dlog4j.configuration=kylin-log4j.properties\
-        -Dkylin.hive.dependency=${hive_dependency} \
-        -Dkylin.kafka.dependency=${kafka_dependency} \
-        -Dkylin.hbase.dependency=${hbase_dependency} \
-        org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > ${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > ${KYLIN_HOME}/logs/$3_$4 &
-        echo "streaming started name: $3 id: $4"
-        exit 0
-    elif [ "$2" == "stop" ]
-    then
-        if [ ! -f "${KYLIN_HOME}/$3_$4" ]
-            then
-                echo "streaming is not running, please check"
-                exit 1
-            fi
-            pid=`cat ${KYLIN_HOME}/$3_$4`
-            if [ "$pid" = "" ]
-            then
-                echo "streaming is not running, please check"
-                exit 1
-            else
-                echo "stopping streaming:$pid"
-                kill $pid
-            fi
-            rm ${KYLIN_HOME}/$3_$4
-            exit 0
-        else
-            echo
-        fi
-
-# monitor command
-elif [ "$1" == "monitor" ]
-then
-    echo "monitor job"
-
-    retrieveDependency
-    source ${dir}/find-kafka-dependency.sh
-    
-    # KYLIN_EXTRA_START_OPTS is for customized settings, checkout bin/setenv.sh
-    hbase ${KYLIN_EXTRA_START_OPTS} \
-    -Dlog4j.configuration=kylin-log4j.properties\
-    -Dkylin.hive.dependency=${hive_dependency} \
-    -Dkylin.kafka.dependency=${kafka_dependency} \
-    -Dkylin.hbase.dependency=${hbase_dependency} \
-    org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > ${KYLIN_HOME}/logs/monitor.log 2>&1
-    exit 0
-
 elif [ "$1" = "version" ]
 then
     exec hbase -Dlog4j.configuration=kylin-log4j.properties org.apache.kylin.common.KylinVersion

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/build/bin/streaming_build.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_build.sh b/build/bin/streaming_build.sh
deleted file mode 100644
index ed19036..0000000
--- a/build/bin/streaming_build.sh
+++ /dev/null
@@ -1,33 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-CUBE=$1
-INTERVAL=$2
-DELAY=$3
-CURRENT_TIME_IN_SECOND=`date +%s`
-CURRENT_TIME=$((CURRENT_TIME_IN_SECOND * 1000))
-START=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY))
-END=$(($CURRENT_TIME - CURRENT_TIME%INTERVAL - DELAY + INTERVAL))
-
-ID="$START"_"$END"
-echo "building for ${ID}" >> ${KYLIN_HOME}/logs/build_trace.log
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${CUBE} ${ID} -start ${START} -end ${END} -cube ${CUBE}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/build/bin/streaming_check.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_check.sh b/build/bin/streaming_check.sh
deleted file mode 100644
index fef0139..0000000
--- a/build/bin/streaming_check.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-receivers=$1
-host=$2
-tablename=$3
-authorization=$4
-projectname=$5
-cubename=$6
-sh ${KYLIN_HOME}/bin/kylin.sh monitor -receivers ${receivers} -host ${host} -tableName ${tablename} -authorization ${authorization} -cubeName ${cubename} -projectName ${projectname}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/build/bin/streaming_fillgap.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_fillgap.sh b/build/bin/streaming_fillgap.sh
deleted file mode 100644
index c67809a..0000000
--- a/build/bin/streaming_fillgap.sh
+++ /dev/null
@@ -1,26 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-cube=$1
-
-cd ${KYLIN_HOME}
-sh ${KYLIN_HOME}/bin/kylin.sh streaming start ${cube} fillgap -cube ${cube} -fillGap true
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/build/bin/streaming_rolllog.sh
----------------------------------------------------------------------
diff --git a/build/bin/streaming_rolllog.sh b/build/bin/streaming_rolllog.sh
deleted file mode 100644
index 8018eb8..0000000
--- a/build/bin/streaming_rolllog.sh
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-source /etc/profile
-source ~/.bash_profile
-
-KYLIN_LOG_HOME=${KYLIN_HOME}/logs
-cd ${KYLIN_LOG_HOME}
-timestamp=`date +%Y_%m_%d_%H_%M_%S`
-tarfile=logs_archived_at_${timestamp}.tar
-files=`find -L . ! -name '*.tar' -type f -mtime +1` # keep two days' log
-echo ${files} | xargs tar -cvf ${tarfile}
-echo ${files} | xargs rm
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
new file mode 100644
index 0000000..9fd6ede
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingConfig.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.persistence.Serializer;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ */
+@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
+public class StreamingConfig extends RootPersistentEntity {
+
+    public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+    public static final String STREAMING_TYPE_KAFKA = "kafka";
+
+    @JsonProperty("name")
+    private String name;
+
+    @JsonProperty("type")
+    private String type = STREAMING_TYPE_KAFKA;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getResourcePath() {
+        return concatResourcePath(name);
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public static String concatResourcePath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+    }
+
+    @Override
+    public StreamingConfig clone() {
+        try {
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            SERIALIZER.serialize(this, new DataOutputStream(baos));
+            return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
+        } catch (IOException e) {
+            throw new RuntimeException(e);//in mem, should not happen
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
new file mode 100644
index 0000000..8cfe87d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/streaming/StreamingManager.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.metadata.streaming;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.metadata.MetadataConstants;
+import org.apache.kylin.metadata.cachesync.Broadcaster;
+import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
+import org.apache.kylin.metadata.cachesync.CaseInsensitiveStringCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ */
+public class StreamingManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new ConcurrentHashMap<KylinConfig, StreamingManager>();
+
+    public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
+
+    private KylinConfig config;
+
+    // name ==> StreamingConfig
+    private CaseInsensitiveStringCache<StreamingConfig> streamingMap;
+
+    public static void clearCache() {
+        CACHE.clear();
+    }
+
+    private StreamingManager(KylinConfig config) throws IOException {
+        this.config = config;
+        this.streamingMap = new CaseInsensitiveStringCache<StreamingConfig>(config, "streaming");
+        
+        // touch lower level metadata before registering my listener
+        reloadAllStreaming();
+        Broadcaster.getInstance(config).registerListener(new StreamingSyncListener(), "streaming");
+    }
+
+    private class StreamingSyncListener extends Broadcaster.Listener {
+        @Override
+        public void onClearAll(Broadcaster broadcaster) throws IOException {
+            clearCache();
+        }
+
+        @Override
+        public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) throws IOException {
+            if (event == Event.DROP)
+                removeStreamingLocal(cacheKey);
+            else
+                reloadStreamingConfigLocal(cacheKey);
+        }
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
+
+    public static StreamingManager getInstance(KylinConfig config) {
+        StreamingManager r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (StreamingManager.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+            try {
+                r = new StreamingManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one singleton exist");
+                }
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
+            }
+        }
+    }
+
+    private static String formatStreamingConfigPath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+    }
+
+    private static String formatStreamingOutputPath(String streaming, int partition) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition + ".json";
+    }
+
+    private static String formatStreamingOutputPath(String streaming, List<Integer> partitions) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
+    }
+
+    public StreamingConfig getStreamingConfig(String name) {
+        return streamingMap.get(name);
+    }
+
+    public List<StreamingConfig> listAllStreaming() {
+        return new ArrayList<>(streamingMap.values());
+    }
+
+    /**
+     * Reload StreamingConfig from resource store It will be triggered by an desc
+     * update event.
+     *
+     * @param name
+     * @throws IOException
+     */
+    public StreamingConfig reloadStreamingConfigLocal(String name) throws IOException {
+
+        // Save Source
+        String path = StreamingConfig.concatResourcePath(name);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+
+        // Here replace the old one
+        streamingMap.putLocal(ndesc.getName(), ndesc);
+        return ndesc;
+    }
+
+    // remove streamingConfig
+    public void removeStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        String path = streamingConfig.getResourcePath();
+        getStore().deleteResource(path);
+        streamingMap.remove(streamingConfig.getName());
+    }
+
+    public StreamingConfig getConfig(String name) {
+        name = name.toUpperCase();
+        return streamingMap.get(name);
+    }
+
+    public void removeStreamingLocal(String streamingName) {
+        streamingMap.removeLocal(streamingName);
+    }
+
+    /**
+     * Update CubeDesc with the input. Broadcast the event into cluster
+     *
+     * @param desc
+     * @return
+     * @throws IOException
+     */
+    public StreamingConfig updateStreamingConfig(StreamingConfig desc) throws IOException {
+        // Validate CubeDesc
+        if (desc.getUuid() == null || desc.getName() == null) {
+            throw new IllegalArgumentException("SteamingConfig Illegal.");
+        }
+        String name = desc.getName();
+        if (!streamingMap.containsKey(name)) {
+            throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
+        }
+
+        // Save Source
+        String path = desc.getResourcePath();
+        getStore().putResource(path, desc, STREAMING_SERIALIZER);
+
+        // Reload the StreamingConfig
+        StreamingConfig ndesc = loadStreamingConfigAt(path);
+        // Here replace the old one
+        streamingMap.put(ndesc.getName(), desc);
+
+        return ndesc;
+    }
+
+    public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
+        if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
+            throw new IllegalArgumentException();
+        }
+
+        if (streamingMap.containsKey(streamingConfig.getName()))
+            throw new IllegalArgumentException("StreamingConfig '" + streamingConfig.getName() + "' already exists");
+
+        String path = StreamingConfig.concatResourcePath(streamingConfig.getName());
+        getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
+        streamingMap.put(streamingConfig.getName(), streamingConfig);
+        return streamingConfig;
+    }
+
+    private StreamingConfig loadStreamingConfigAt(String path) throws IOException {
+        ResourceStore store = getStore();
+        StreamingConfig streamingDesc = store.getResource(path, StreamingConfig.class, STREAMING_SERIALIZER);
+
+        if (StringUtils.isBlank(streamingDesc.getName())) {
+            throw new IllegalStateException("StreamingConfig name must not be blank");
+        }
+        return streamingDesc;
+    }
+
+    private void reloadAllStreaming() throws IOException {
+        ResourceStore store = getStore();
+        logger.info("Reloading Streaming Metadata from folder " + store.getReadableResourcePath(ResourceStore.STREAMING_RESOURCE_ROOT));
+
+        streamingMap.clear();
+
+        List<String> paths = store.collectResourceRecursively(ResourceStore.STREAMING_RESOURCE_ROOT, MetadataConstants.FILE_SURFIX);
+        for (String path : paths) {
+            StreamingConfig streamingConfig;
+            try {
+                streamingConfig = loadStreamingConfigAt(path);
+            } catch (Exception e) {
+                logger.error("Error loading streaming desc " + path, e);
+                continue;
+            }
+            if (path.equals(streamingConfig.getResourcePath()) == false) {
+                logger.error("Skip suspicious desc at " + path + ", " + streamingConfig + " should be at " + streamingConfig.getResourcePath());
+                continue;
+            }
+            if (streamingMap.containsKey(streamingConfig.getName())) {
+                logger.error("Dup StreamingConfig name '" + streamingConfig.getName() + "' on path " + path);
+                continue;
+            }
+
+            streamingMap.putLocal(streamingConfig.getName(), streamingConfig);
+        }
+
+        logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/.settings/org.eclipse.core.resources.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.core.resources.prefs b/engine-streaming/.settings/org.eclipse.core.resources.prefs
deleted file mode 100644
index 29abf99..0000000
--- a/engine-streaming/.settings/org.eclipse.core.resources.prefs
+++ /dev/null
@@ -1,6 +0,0 @@
-eclipse.preferences.version=1
-encoding//src/main/java=UTF-8
-encoding//src/main/resources=UTF-8
-encoding//src/test/java=UTF-8
-encoding//src/test/resources=UTF-8
-encoding/<project>=UTF-8

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/.settings/org.eclipse.jdt.core.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.core.prefs b/engine-streaming/.settings/org.eclipse.jdt.core.prefs
deleted file mode 100644
index 5aaaf1e..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.core.prefs
+++ /dev/null
@@ -1,386 +0,0 @@
-eclipse.preferences.version=1
-org.eclipse.jdt.core.compiler.annotation.inheritNullAnnotations=disabled
-org.eclipse.jdt.core.compiler.annotation.missingNonNullByDefaultAnnotation=ignore
-org.eclipse.jdt.core.compiler.annotation.nonnull=org.eclipse.jdt.annotation.NonNull
-org.eclipse.jdt.core.compiler.annotation.nonnull.secondary=
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault=org.eclipse.jdt.annotation.NonNullByDefault
-org.eclipse.jdt.core.compiler.annotation.nonnullbydefault.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nullable
-org.eclipse.jdt.core.compiler.annotation.nullable.secondary=
-org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
-org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
-org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
-org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
-org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
-org.eclipse.jdt.core.compiler.compliance=1.7
-org.eclipse.jdt.core.compiler.debug.lineNumber=generate
-org.eclipse.jdt.core.compiler.debug.localVariable=generate
-org.eclipse.jdt.core.compiler.debug.sourceFile=generate
-org.eclipse.jdt.core.compiler.problem.annotationSuperInterface=warning
-org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
-org.eclipse.jdt.core.compiler.problem.autoboxing=ignore
-org.eclipse.jdt.core.compiler.problem.comparingIdentical=warning
-org.eclipse.jdt.core.compiler.problem.deadCode=warning
-org.eclipse.jdt.core.compiler.problem.deprecation=warning
-org.eclipse.jdt.core.compiler.problem.deprecationInDeprecatedCode=disabled
-org.eclipse.jdt.core.compiler.problem.deprecationWhenOverridingDeprecatedMethod=disabled
-org.eclipse.jdt.core.compiler.problem.discouragedReference=ignore
-org.eclipse.jdt.core.compiler.problem.emptyStatement=ignore
-org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
-org.eclipse.jdt.core.compiler.problem.explicitlyClosedAutoCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.fallthroughCase=ignore
-org.eclipse.jdt.core.compiler.problem.fatalOptionalError=disabled
-org.eclipse.jdt.core.compiler.problem.fieldHiding=ignore
-org.eclipse.jdt.core.compiler.problem.finalParameterBound=warning
-org.eclipse.jdt.core.compiler.problem.finallyBlockNotCompletingNormally=warning
-org.eclipse.jdt.core.compiler.problem.forbiddenReference=ignore
-org.eclipse.jdt.core.compiler.problem.hiddenCatchBlock=warning
-org.eclipse.jdt.core.compiler.problem.includeNullInfoFromAsserts=disabled
-org.eclipse.jdt.core.compiler.problem.incompatibleNonInheritedInterfaceMethod=warning
-org.eclipse.jdt.core.compiler.problem.incompleteEnumSwitch=warning
-org.eclipse.jdt.core.compiler.problem.indirectStaticAccess=ignore
-org.eclipse.jdt.core.compiler.problem.localVariableHiding=ignore
-org.eclipse.jdt.core.compiler.problem.methodWithConstructorName=warning
-org.eclipse.jdt.core.compiler.problem.missingDefaultCase=ignore
-org.eclipse.jdt.core.compiler.problem.missingDeprecatedAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingEnumCaseDespiteDefault=disabled
-org.eclipse.jdt.core.compiler.problem.missingHashCodeMethod=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotation=ignore
-org.eclipse.jdt.core.compiler.problem.missingOverrideAnnotationForInterfaceMethodImplementation=enabled
-org.eclipse.jdt.core.compiler.problem.missingSerialVersion=warning
-org.eclipse.jdt.core.compiler.problem.missingSynchronizedOnInheritedMethod=ignore
-org.eclipse.jdt.core.compiler.problem.noEffectAssignment=warning
-org.eclipse.jdt.core.compiler.problem.noImplicitStringConversion=warning
-org.eclipse.jdt.core.compiler.problem.nonExternalizedStringLiteral=ignore
-org.eclipse.jdt.core.compiler.problem.nonnullParameterAnnotationDropped=warning
-org.eclipse.jdt.core.compiler.problem.nonnullTypeVariableFromLegacyInvocation=warning
-org.eclipse.jdt.core.compiler.problem.nullAnnotationInferenceConflict=error
-org.eclipse.jdt.core.compiler.problem.nullReference=warning
-org.eclipse.jdt.core.compiler.problem.nullSpecViolation=error
-org.eclipse.jdt.core.compiler.problem.nullUncheckedConversion=warning
-org.eclipse.jdt.core.compiler.problem.overridingPackageDefaultMethod=warning
-org.eclipse.jdt.core.compiler.problem.parameterAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.pessimisticNullAnalysisForFreeTypeVariables=warning
-org.eclipse.jdt.core.compiler.problem.possibleAccidentalBooleanAssignment=ignore
-org.eclipse.jdt.core.compiler.problem.potentialNullReference=ignore
-org.eclipse.jdt.core.compiler.problem.potentiallyUnclosedCloseable=ignore
-org.eclipse.jdt.core.compiler.problem.rawTypeReference=ignore
-org.eclipse.jdt.core.compiler.problem.redundantNullAnnotation=warning
-org.eclipse.jdt.core.compiler.problem.redundantNullCheck=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSpecificationOfTypeArguments=ignore
-org.eclipse.jdt.core.compiler.problem.redundantSuperinterface=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBePotentiallyStatic=ignore
-org.eclipse.jdt.core.compiler.problem.reportMethodCanBeStatic=ignore
-org.eclipse.jdt.core.compiler.problem.specialParameterHidingField=disabled
-org.eclipse.jdt.core.compiler.problem.staticAccessReceiver=warning
-org.eclipse.jdt.core.compiler.problem.suppressOptionalErrors=disabled
-org.eclipse.jdt.core.compiler.problem.suppressWarnings=enabled
-org.eclipse.jdt.core.compiler.problem.syntacticNullAnalysisForFields=disabled
-org.eclipse.jdt.core.compiler.problem.syntheticAccessEmulation=ignore
-org.eclipse.jdt.core.compiler.problem.typeParameterHiding=warning
-org.eclipse.jdt.core.compiler.problem.unavoidableGenericTypeProblems=enabled
-org.eclipse.jdt.core.compiler.problem.uncheckedTypeOperation=ignore
-org.eclipse.jdt.core.compiler.problem.unclosedCloseable=warning
-org.eclipse.jdt.core.compiler.problem.undocumentedEmptyBlock=ignore
-org.eclipse.jdt.core.compiler.problem.unhandledWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.unnecessaryElse=ignore
-org.eclipse.jdt.core.compiler.problem.unnecessaryTypeCheck=ignore
-org.eclipse.jdt.core.compiler.problem.unqualifiedFieldAccess=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownException=ignore
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionExemptExceptionAndThrowable=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedDeclaredThrownExceptionWhenOverriding=disabled
-org.eclipse.jdt.core.compiler.problem.unusedExceptionParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedImport=warning
-org.eclipse.jdt.core.compiler.problem.unusedLabel=warning
-org.eclipse.jdt.core.compiler.problem.unusedLocal=warning
-org.eclipse.jdt.core.compiler.problem.unusedObjectAllocation=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedParameterIncludeDocCommentReference=enabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenImplementingAbstract=disabled
-org.eclipse.jdt.core.compiler.problem.unusedParameterWhenOverridingConcrete=disabled
-org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
-org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
-org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
-org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
-org.eclipse.jdt.core.compiler.source=1.7
-org.eclipse.jdt.core.formatter.align_type_members_on_columns=false
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_annotation=0
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_enum_constant=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_explicit_constructor_call=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_arguments_in_qualified_allocation_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_assignment=0
-org.eclipse.jdt.core.formatter.alignment_for_binary_expression=16
-org.eclipse.jdt.core.formatter.alignment_for_compact_if=16
-org.eclipse.jdt.core.formatter.alignment_for_conditional_expression=80
-org.eclipse.jdt.core.formatter.alignment_for_enum_constants=0
-org.eclipse.jdt.core.formatter.alignment_for_expressions_in_array_initializer=16
-org.eclipse.jdt.core.formatter.alignment_for_method_declaration=0
-org.eclipse.jdt.core.formatter.alignment_for_multiple_fields=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_parameters_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_resources_in_try=80
-org.eclipse.jdt.core.formatter.alignment_for_selector_in_method_invocation=16
-org.eclipse.jdt.core.formatter.alignment_for_superclass_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_enum_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_superinterfaces_in_type_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_constructor_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_throws_clause_in_method_declaration=16
-org.eclipse.jdt.core.formatter.alignment_for_union_type_in_multicatch=16
-org.eclipse.jdt.core.formatter.blank_lines_after_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_after_package=1
-org.eclipse.jdt.core.formatter.blank_lines_before_field=0
-org.eclipse.jdt.core.formatter.blank_lines_before_first_class_body_declaration=0
-org.eclipse.jdt.core.formatter.blank_lines_before_imports=1
-org.eclipse.jdt.core.formatter.blank_lines_before_member_type=1
-org.eclipse.jdt.core.formatter.blank_lines_before_method=1
-org.eclipse.jdt.core.formatter.blank_lines_before_new_chunk=1
-org.eclipse.jdt.core.formatter.blank_lines_before_package=0
-org.eclipse.jdt.core.formatter.blank_lines_between_import_groups=1
-org.eclipse.jdt.core.formatter.blank_lines_between_type_declarations=1
-org.eclipse.jdt.core.formatter.brace_position_for_annotation_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_anonymous_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_array_initializer=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_block_in_case=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_constructor_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_constant=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_enum_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_method_declaration=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_switch=end_of_line
-org.eclipse.jdt.core.formatter.brace_position_for_type_declaration=end_of_line
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_block_comment=false
-org.eclipse.jdt.core.formatter.comment.clear_blank_lines_in_javadoc_comment=false
-org.eclipse.jdt.core.formatter.comment.format_block_comments=false
-org.eclipse.jdt.core.formatter.comment.format_header=false
-org.eclipse.jdt.core.formatter.comment.format_html=true
-org.eclipse.jdt.core.formatter.comment.format_javadoc_comments=false
-org.eclipse.jdt.core.formatter.comment.format_line_comments=false
-org.eclipse.jdt.core.formatter.comment.format_source_code=true
-org.eclipse.jdt.core.formatter.comment.indent_parameter_description=true
-org.eclipse.jdt.core.formatter.comment.indent_root_tags=true
-org.eclipse.jdt.core.formatter.comment.insert_new_line_before_root_tags=insert
-org.eclipse.jdt.core.formatter.comment.insert_new_line_for_parameter=insert
-org.eclipse.jdt.core.formatter.comment.line_length=80
-org.eclipse.jdt.core.formatter.comment.new_lines_at_block_boundaries=true
-org.eclipse.jdt.core.formatter.comment.new_lines_at_javadoc_boundaries=true
-org.eclipse.jdt.core.formatter.comment.preserve_white_space_between_code_and_line_comments=false
-org.eclipse.jdt.core.formatter.compact_else_if=true
-org.eclipse.jdt.core.formatter.continuation_indentation=2
-org.eclipse.jdt.core.formatter.continuation_indentation_for_array_initializer=2
-org.eclipse.jdt.core.formatter.disabling_tag=@formatter\:off
-org.eclipse.jdt.core.formatter.enabling_tag=@formatter\:on
-org.eclipse.jdt.core.formatter.format_guardian_clause_on_one_line=false
-org.eclipse.jdt.core.formatter.format_line_comment_starting_on_first_column=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_annotation_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_constant_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_enum_declaration_header=true
-org.eclipse.jdt.core.formatter.indent_body_declarations_compare_to_type_header=true
-org.eclipse.jdt.core.formatter.indent_breaks_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_empty_lines=false
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_block=true
-org.eclipse.jdt.core.formatter.indent_statements_compare_to_body=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_cases=true
-org.eclipse.jdt.core.formatter.indent_switchstatements_compare_to_switch=false
-org.eclipse.jdt.core.formatter.indentation.size=4
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_field=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_local_variable=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_method=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_package=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_parameter=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_annotation_on_type=insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_label=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_after_opening_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_at_end_of_file_if_missing=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_catch_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_closing_brace_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_else_in_if_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_finally_in_try_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_before_while_in_do_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_annotation_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_block=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_method_body=insert
-org.eclipse.jdt.core.formatter.insert_new_line_in_empty_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_after_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_after_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_at_in_annotation_type_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_angle_bracket_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_after_closing_paren_in_cast=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_case=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_colon_in_labeled_statement=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_allocation_expression=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_annotation=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_constructor_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_constant_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_enum_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_explicitconstructorcall_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_increments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_for_inits=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_declaration_throws=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_method_invocation_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_field_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_multiple_local_declarations=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_parameterized_type_reference=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_superinterfaces=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_arguments=insert
-org.eclipse.jdt.core.formatter.insert_space_after_comma_in_type_parameters=insert
-org.eclipse.jdt.core.formatter.insert_space_after_ellipsis=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_opening_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_after_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_after_semicolon_in_try_resources=insert
-org.eclipse.jdt.core.formatter.insert_space_after_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_and_in_type_parameter=insert
-org.eclipse.jdt.core.formatter.insert_space_before_assignment_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_at_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_binary_operator=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_cast=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_catch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_if=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_switch=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_synchronized=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_try=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_closing_paren_in_while=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_assert=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_case=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_default=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_colon_in_labeled_statement=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_constructor_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_constant_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_enum_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_explicitconstructorcall_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_increments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_for_inits=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_declaration_throws=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_method_invocation_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_field_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_multiple_local_declarations=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_superinterfaces=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_comma_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_ellipsis=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_parameterized_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_arguments=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_angle_bracket_in_type_parameters=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_annotation_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_anonymous_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_array_initializer=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_block=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_constructor_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_constant=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_enum_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_method_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_brace_in_type_declaration=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_bracket_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_catch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_for=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_if=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_parenthesized_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_switch=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_synchronized=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_try=insert
-org.eclipse.jdt.core.formatter.insert_space_before_opening_paren_in_while=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_return=insert
-org.eclipse.jdt.core.formatter.insert_space_before_parenthesized_expression_in_throw=insert
-org.eclipse.jdt.core.formatter.insert_space_before_postfix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_prefix_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_conditional=insert
-org.eclipse.jdt.core.formatter.insert_space_before_question_in_wildcard=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_for=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_semicolon_in_try_resources=do not insert
-org.eclipse.jdt.core.formatter.insert_space_before_unary_operator=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_brackets_in_array_type_reference=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_braces_in_array_initializer=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_brackets_in_array_allocation_expression=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_annotation_type_member_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_constructor_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_enum_constant=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_declaration=do not insert
-org.eclipse.jdt.core.formatter.insert_space_between_empty_parens_in_method_invocation=do not insert
-org.eclipse.jdt.core.formatter.join_lines_in_comments=true
-org.eclipse.jdt.core.formatter.join_wrapped_lines=true
-org.eclipse.jdt.core.formatter.keep_else_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.keep_empty_array_initializer_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_imple_if_on_one_line=false
-org.eclipse.jdt.core.formatter.keep_then_statement_on_same_line=false
-org.eclipse.jdt.core.formatter.lineSplit=999
-org.eclipse.jdt.core.formatter.never_indent_block_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.never_indent_line_comments_on_first_column=false
-org.eclipse.jdt.core.formatter.number_of_blank_lines_at_beginning_of_method_body=0
-org.eclipse.jdt.core.formatter.number_of_empty_lines_to_preserve=1
-org.eclipse.jdt.core.formatter.put_empty_statement_on_new_line=true
-org.eclipse.jdt.core.formatter.tabulation.char=space
-org.eclipse.jdt.core.formatter.tabulation.size=4
-org.eclipse.jdt.core.formatter.use_on_off_tags=false
-org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations=false
-org.eclipse.jdt.core.formatter.wrap_before_binary_operator=true
-org.eclipse.jdt.core.formatter.wrap_before_or_operator_multicatch=true
-org.eclipse.jdt.core.formatter.wrap_outer_expressions_when_nested=true

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
----------------------------------------------------------------------
diff --git a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs b/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
deleted file mode 100644
index d521bab..0000000
--- a/engine-streaming/.settings/org.eclipse.jdt.ui.prefs
+++ /dev/null
@@ -1,7 +0,0 @@
-eclipse.preferences.version=1
-formatter_profile=_Space Indent & Long Lines
-formatter_settings_version=12
-org.eclipse.jdt.ui.ignorelowercasenames=true
-org.eclipse.jdt.ui.importorder=java;javax;org;com;
-org.eclipse.jdt.ui.ondemandthreshold=99
-org.eclipse.jdt.ui.staticondemandthreshold=99

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/engine-streaming/pom.xml b/engine-streaming/pom.xml
deleted file mode 100644
index 876279d..0000000
--- a/engine-streaming/pom.xml
+++ /dev/null
@@ -1,121 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements.  See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership.  The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License.  You may obtain a copy of the License at
- 
-     http://www.apache.org/licenses/LICENSE-2.0
- 
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>kylin-engine-streaming</artifactId>
-    <packaging>jar</packaging>
-    <name>Apache Kylin - Streaming Engine</name>
-
-    <parent>
-        <groupId>org.apache.kylin</groupId>
-        <artifactId>kylin</artifactId>
-        <version>1.6.0-SNAPSHOT</version>
-
-    </parent>
-
-    <properties>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-cube</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-storage</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-job</artifactId>
-        </dependency>
-
-        <!-- Env & Test -->
-        <dependency>
-            <groupId>org.apache.kylin</groupId>
-            <artifactId>kylin-core-common</artifactId>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-hdfs</artifactId>
-            <scope>provided</scope>
-            <!-- protobuf version conflict with hbase -->
-            <exclusions>
-                <exclusion>
-                    <groupId>com.google.protobuf</groupId>
-                    <artifactId>protobuf-java</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-app</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-api</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-
-</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
deleted file mode 100644
index 35bdfa8..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.streaming;
-
-/**
- */
-public class BootstrapConfig {
-
-    private String cubeName;
-    private long start = 0L;
-    private long end = 0L;
-
-    private boolean fillGap;
-    private long maxFillGapRange = 4 * 3600 * 1000L;
-
-    public long getStart() {
-        return start;
-    }
-
-    public void setStart(long start) {
-        this.start = start;
-    }
-
-    public long getEnd() {
-        return end;
-    }
-
-    public void setEnd(long end) {
-        this.end = end;
-    }
-
-    public String getCubeName() {
-        return cubeName;
-    }
-
-    public void setCubeName(String cubeName) {
-        this.cubeName = cubeName;
-    }
-
-    public boolean isFillGap() {
-        return fillGap;
-    }
-
-    public void setFillGap(boolean fillGap) {
-        this.fillGap = fillGap;
-    }
-
-    public long getMaxFillGapRange() {
-        return maxFillGapRange;
-    }
-
-    public void setMaxFillGapRange(long maxFillGapRange) {
-        this.maxFillGapRange = maxFillGapRange;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
deleted file mode 100644
index c583283..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingInput.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-/**
- */
-public interface IStreamingInput {
-
-    StreamingBatch getBatchWithTimeWindow(RealizationType realizationType, String realizationName, int id, long startTime, long endTime);
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
deleted file mode 100644
index cb15e2b..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-
-/**
- */
-public interface IStreamingOutput {
-
-    ICuboidWriter getCuboidWriter(IBuildable buildable);
-
-    void output(IBuildable buildable, Map<Long, HyperLogLogPlusCounter> samplingResult);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
deleted file mode 100644
index c9da46e..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.engine.streaming.util.StreamingUtils;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.RealizationType;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class OneOffStreamingBuilder {
-
-    private final IStreamingInput streamingInput;
-    private final IStreamingOutput streamingOutput;
-    private final StreamingBatchBuilder streamingBatchBuilder;
-    private final long startTime;
-    private final long endTime;
-    private final RealizationType realizationType;
-    private final String realizationName;
-
-    public OneOffStreamingBuilder(RealizationType realizationType, String realizationName, long startTime, long endTime) {
-        Preconditions.checkArgument(startTime < endTime);
-        this.startTime = startTime;
-        this.endTime = endTime;
-        this.realizationType = Preconditions.checkNotNull(realizationType);
-        this.realizationName = Preconditions.checkNotNull(realizationName);
-        this.streamingInput = Preconditions.checkNotNull(StreamingUtils.getStreamingInput());
-        this.streamingOutput = Preconditions.checkNotNull(StreamingUtils.getStreamingOutput());
-        this.streamingBatchBuilder = Preconditions.checkNotNull(StreamingUtils.getMicroBatchBuilder(realizationType, realizationName));
-    }
-
-    public Runnable build() {
-        return new Runnable() {
-            @Override
-            public void run() {
-                StreamingBatch streamingBatch = streamingInput.getBatchWithTimeWindow(realizationType, realizationName, -1, startTime, endTime);
-                final IBuildable buildable = streamingBatchBuilder.createBuildable(streamingBatch);
-                final Map<Long, HyperLogLogPlusCounter> samplingResult = streamingBatchBuilder.sampling(streamingBatch);
-                final Map<TblColRef, Dictionary<String>> dictionaryMap = streamingBatchBuilder.buildDictionary(streamingBatch, buildable);
-                streamingBatchBuilder.build(streamingBatch, dictionaryMap, streamingOutput.getCuboidWriter(buildable));
-                streamingOutput.output(buildable, samplingResult);
-                streamingBatchBuilder.commit(buildable);
-            }
-        };
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
deleted file mode 100644
index 8b0b8e6..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.util.Map;
-
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.common.util.StreamingBatch;
-import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
-import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.TblColRef;
-
-/**
- */
-public interface StreamingBatchBuilder {
-
-    IBuildable createBuildable(StreamingBatch streamingBatch);
-
-    Map<Long, HyperLogLogPlusCounter> sampling(StreamingBatch streamingBatch);
-
-    Map<TblColRef, Dictionary<String>> buildDictionary(StreamingBatch streamingBatch, IBuildable buildable);
-
-    void build(StreamingBatch streamingBatch, Map<TblColRef, Dictionary<String>> dictionaryMap, ICuboidWriter cuboidWriter);
-
-    void commit(IBuildable buildable);
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/2200b595/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
deleted file mode 100644
index 9d1a0b1..0000000
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.streaming;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.common.persistence.Serializer;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class StreamingConfig extends RootPersistentEntity {
-
-    public static Serializer<StreamingConfig> SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
-
-    public static final String STREAMING_TYPE_KAFKA = "kafka";
-
-    @JsonProperty("name")
-    private String name;
-
-    @JsonProperty("type")
-    private String type = STREAMING_TYPE_KAFKA;
-
-    public String getType() {
-        return type;
-    }
-
-    public void setType(String type) {
-        this.type = type;
-    }
-
-    public String getResourcePath() {
-        return concatResourcePath(name);
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    public static String concatResourcePath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    @Override
-    public StreamingConfig clone() {
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            SERIALIZER.serialize(this, new DataOutputStream(baos));
-            return SERIALIZER.deserialize(new DataInputStream(new ByteArrayInputStream(baos.toByteArray())));
-        } catch (IOException e) {
-            throw new RuntimeException(e);//in mem, should not happen
-        }
-    }
-
-}


Mime
View raw message