kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mahong...@apache.org
Subject [1/7] incubator-kylin git commit: KYLIN-1011 clean code and restore KafkaInputAnalyzer
Date Sun, 25 Oct 2015 13:12:14 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging e8ecdc60e -> 7e66013ea


KYLIN-1011 clean code and restore KafkaInputAnalyzer


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7e66013e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7e66013e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7e66013e

Branch: refs/heads/2.x-staging
Commit: 7e66013eabfaa52ea65e7c2b1bc12cd343fec371
Parents: fabdd5c
Author: honma <honma@ebay.com>
Authored: Fri Oct 23 18:17:48 2015 +0800
Committer: honma <honma@ebay.com>
Committed: Sun Oct 25 21:15:21 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/util/AbstractApplication.java  |  47 +++
 .../apache/kylin/common/util/OptionsHelper.java |  79 +++++
 .../org/apache/kylin/common/util/BasicTest.java |  27 +-
 .../apache/kylin/job/common/OptionsHelper.java  |  79 -----
 .../kylin/job/engine/JobEngineConfig.java       |   2 +-
 .../engine/mr/common/AbstractHadoopJob.java     |   2 +-
 .../engine/spark/AbstractSparkApplication.java  |  48 ---
 .../kylin/engine/spark/SparkCountDemo.java      |   5 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |   5 +-
 .../apache/kylin/engine/spark/SparkEntry.java   |   9 +-
 .../kylin/engine/spark/SparkHelloWorld.java     |   5 +-
 .../kylin/engine/spark/SparkHiveDemo.java       |   5 +-
 .../kylin/engine/streaming/BootstrapConfig.java |  10 -
 .../kylin/engine/streaming/StreamingConfig.java |  11 -
 .../engine/streaming/StreamingManager.java      |  36 +--
 .../engine/streaming/cli/StreamingCLI.java      |   3 -
 .../diagnose/StreamingLogAnalyzer.java          |  96 ++++++
 .../engine/streaming/util/StreamingUtils.java   |   1 +
 .../kylin/source/kafka/KafkaStreamingInput.java |  35 +--
 .../kylin/source/kafka/StreamingParser.java     |  42 ++-
 .../source/kafka/StringStreamingParser.java     |  18 +-
 .../source/kafka/TimedJsonStreamParser.java     |   2 +-
 .../source/kafka/config/KafkaClusterConfig.java |   4 -
 .../kylin/source/kafka/config/KafkaConfig.java  |  12 +-
 .../kafka/diagnose/KafkaInputAnalyzer.java      | 307 +++++++++++++++++++
 .../source/kafka/diagnose/KafkaVerify.java      | 101 ++++++
 .../source/kafka/diagnose/TimeHistogram.java    |  85 +++++
 .../kylin/source/kafka/util/KafkaVerify.java    | 101 ------
 .../storage/hbase/cube/v1/CubeStorageQuery.java |   1 +
 29 files changed, 819 insertions(+), 359 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/core-common/src/main/java/org/apache/kylin/common/util/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/AbstractApplication.java b/core-common/src/main/java/org/apache/kylin/common/util/AbstractApplication.java
new file mode 100644
index 0000000..cd357eb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/AbstractApplication.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.common.util;
+
+import java.io.Serializable;
+
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ */
+public abstract class AbstractApplication implements Serializable {
+
+    protected abstract Options getOptions();
+
+    protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
+
+    public final void execute(String[] args) {
+        OptionsHelper optionsHelper = new OptionsHelper();
+        System.out.println("Abstract Application args:" + StringUtils.join(args, " "));
+        try {
+            optionsHelper.parseOptions(getOptions(), args);
+            execute(optionsHelper);
+        } catch (ParseException e) {
+            optionsHelper.printUsage(this.getClass().getName(), getOptions());
+            throw new RuntimeException("error parsing args", e);
+        } catch (Exception e) {
+            throw new RuntimeException("error execute " + this.getClass().getName(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/core-common/src/main/java/org/apache/kylin/common/util/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/OptionsHelper.java b/core-common/src/main/java/org/apache/kylin/common/util/OptionsHelper.java
new file mode 100644
index 0000000..2e20ef6
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/OptionsHelper.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.File;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+/**
+ */
+public class OptionsHelper {
+    private CommandLine commandLine;
+
+    public void parseOptions(Options options, String[] args) throws ParseException {
+        CommandLineParser parser = new GnuParser();
+        commandLine = parser.parse(options, args);
+    }
+
+    public Option[] getOptions() {
+        return commandLine.getOptions();
+    }
+
+    public String getOptionsAsString() {
+        StringBuilder buf = new StringBuilder();
+        for (Option option : commandLine.getOptions()) {
+            buf.append(" ");
+            buf.append(option.getOpt());
+            if (option.hasArg()) {
+                buf.append("=");
+                buf.append(option.getValue());
+            }
+        }
+        return buf.toString();
+    }
+
+    public String getOptionValue(Option option) {
+        return commandLine.getOptionValue(option.getOpt());
+    }
+
+    public boolean hasOption(Option option) {
+        return commandLine.hasOption(option.getOpt());
+    }
+
+    public void printUsage(String programName, Options options) {
+        HelpFormatter formatter = new HelpFormatter();
+        formatter.printHelp(programName, options);
+    }
+
+    public static String convertToFileURL(String path) {
+        if (File.separatorChar != '/') {
+            path = path.replace(File.separatorChar, '/');
+        }
+
+        return path;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index c3a9761..c60f007 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -45,6 +45,15 @@ import com.google.common.collect.TreeMultiset;
 public class BasicTest {
     protected static final org.slf4j.Logger logger = LoggerFactory.getLogger(BasicTest.class);
 
+    class A {
+        public void foo() {
+            System.out.println(this.getClass().getName());
+        }
+    }
+
+    class B extends A {
+    }
+
     private void log(ByteBuffer a) {
         Integer x = 4;
         foo(x);
@@ -64,21 +73,9 @@ public class BasicTest {
 
     @Test
     public void testxx() {
-        byte[] temp = new byte[] { 1, 2, 3 };
-        byte[] temp2 = new byte[] { 1, 2, 3 };
-
-        System.out.println(temp.hashCode());
-        System.out.println(temp2.hashCode());
-
-        ByteBuffer buffer = ByteBuffer.allocateDirect(3);
-        buffer.put((byte) 1);
-        buffer.put((byte) 1);
-        buffer.put((byte) 1);
-        buffer.put((byte) 1);
-        System.out.println(buffer.position());
-        System.out.println(buffer.limit());
-        System.out.println(buffer.capacity());
-
+        B b= new B();
+        b.foo();;
+      
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java b/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
deleted file mode 100644
index 1cda348..0000000
--- a/core-job/src/main/java/org/apache/kylin/job/common/OptionsHelper.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.File;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-/**
- */
-public class OptionsHelper {
-    private CommandLine commandLine;
-
-    public void parseOptions(Options options, String[] args) throws ParseException {
-        CommandLineParser parser = new GnuParser();
-        commandLine = parser.parse(options, args);
-    }
-
-    public Option[] getOptions() {
-        return commandLine.getOptions();
-    }
-
-    public String getOptionsAsString() {
-        StringBuilder buf = new StringBuilder();
-        for (Option option : commandLine.getOptions()) {
-            buf.append(" ");
-            buf.append(option.getOpt());
-            if (option.hasArg()) {
-                buf.append("=");
-                buf.append(option.getValue());
-            }
-        }
-        return buf.toString();
-    }
-
-    public String getOptionValue(Option option) {
-        return commandLine.getOptionValue(option.getOpt());
-    }
-
-    public boolean hasOption(Option option) {
-        return commandLine.hasOption(option.getOpt());
-    }
-
-    public void printUsage(String programName, Options options) {
-        HelpFormatter formatter = new HelpFormatter();
-        formatter.printHelp(programName, options);
-    }
-
-    public static String convertToFileURL(String path) {
-        if (File.separatorChar != '/') {
-            path = path.replace(File.separatorChar, '/');
-        }
-
-        return path;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
index a76091a..b305749 100644
--- a/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ b/core-job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index d617c4e..1455664 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -59,7 +59,7 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.job.JobInstance;
-import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.job.exception.JobException;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-spark/src/main/java/org/apache/kylin/engine/spark/AbstractSparkApplication.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/AbstractSparkApplication.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/AbstractSparkApplication.java
deleted file mode 100644
index 5c8de1c..0000000
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/AbstractSparkApplication.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-package org.apache.kylin.engine.spark;
-
-import java.io.Serializable;
-
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.job.common.OptionsHelper;
-
-/**
- */
-public abstract class AbstractSparkApplication implements Serializable {
-
-    protected abstract Options getOptions();
-
-    protected abstract void execute(OptionsHelper optionsHelper) throws Exception;
-
-    public final void execute(String[] args) {
-        OptionsHelper optionsHelper = new OptionsHelper();
-        System.out.println("Spark Application args:" + StringUtils.join(args, " "));
-        try {
-            optionsHelper.parseOptions(getOptions(), args);
-            execute(optionsHelper);
-        } catch (ParseException e) {
-            optionsHelper.printUsage("SparkExecutor", getOptions());
-            throw new RuntimeException("error parsing args", e);
-        } catch (Exception e) {
-            throw new RuntimeException("error execute Spark Application", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
index 2f2497c..6478c10 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCountDemo.java
@@ -23,7 +23,8 @@ import org.apache.commons.cli.Options;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
-import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -34,7 +35,7 @@ import scala.Tuple2;
 
 /**
  */
-public class SparkCountDemo extends AbstractSparkApplication {
+public class SparkCountDemo extends AbstractApplication {
 
     private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Input path").create("input");
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 70b62c0..0e5081e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -60,7 +60,8 @@ import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;
-import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.metadata.measure.MeasureAggregators;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -93,7 +94,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  */
-public class SparkCubing extends AbstractSparkApplication {
+public class SparkCubing extends AbstractApplication {
 
     private static final Option OPTION_INPUT_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("Hive Intermediate Table").create("hiveTable");
     private static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("cube").hasArg().isRequired(true).withDescription("Cube Name").create("cubeName");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkEntry.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkEntry.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkEntry.java
index 28ec0ef..35d223a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkEntry.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkEntry.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark;
 import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.base.Preconditions;
+import org.apache.kylin.common.util.AbstractApplication;
 
 /**
  */
@@ -30,13 +31,13 @@ public final class SparkEntry {
         Preconditions.checkArgument(args.length >= 2, "-className is required");
         Preconditions.checkArgument(args[0].equals("-className"), "-className is required");
         final String className = args[1];
-        final Object o = Class.<AbstractSparkApplication> forName(className).newInstance();
-        Preconditions.checkArgument(o instanceof AbstractSparkApplication, className + " is not a subClass of AbstractSparkApplication");
+        final Object o = Class.<AbstractApplication> forName(className).newInstance();
+        Preconditions.checkArgument(o instanceof AbstractApplication, className + " is not a subClass of AbstractSparkApplication");
         String[] appArgs = new String[args.length - 2];
         for (int i = 2; i < args.length; i++) {
             appArgs[i - 2] = args[i];
         }
-        AbstractSparkApplication abstractSparkApplication = (AbstractSparkApplication) o;
-        abstractSparkApplication.execute(appArgs);
+        AbstractApplication abstractApplication = (AbstractApplication) o;
+        abstractApplication.execute(appArgs);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
index e632db7..4eda50e 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHelloWorld.java
@@ -18,11 +18,12 @@
 package org.apache.kylin.engine.spark;
 
 import org.apache.commons.cli.Options;
-import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
 
 /**
  */
-public class SparkHelloWorld extends AbstractSparkApplication {
+public class SparkHelloWorld extends AbstractApplication {
 
     @Override
     protected Options getOptions() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
index 44f62ce..e1ba470 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkHiveDemo.java
@@ -18,7 +18,8 @@
 package org.apache.kylin.engine.spark;
 
 import org.apache.commons.cli.Options;
-import org.apache.kylin.job.common.OptionsHelper;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.DataFrame;
@@ -26,7 +27,7 @@ import org.apache.spark.sql.hive.HiveContext;
 
 /**
  */
-public class SparkHiveDemo extends AbstractSparkApplication {
+public class SparkHiveDemo extends AbstractApplication {
 
     private final Options options;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
index 3cad67f..a3e2db5 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/BootstrapConfig.java
@@ -7,21 +7,11 @@ public class BootstrapConfig {
     private String streaming;
     private int partitionId = -1;
 
-    //one off default value set to true
-    private boolean oneOff = true;
     private long start = 0L;
     private long end = 0L;
 
     private boolean fillGap;
 
-    public boolean isOneOff() {
-        return oneOff;
-    }
-
-    public void setOneOff(boolean oneOff) {
-        this.oneOff = oneOff;
-    }
-
     public long getStart() {
         return start;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
index a7c6da0..f0a7ab1 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingConfig.java
@@ -64,9 +64,6 @@ public class StreamingConfig extends RootPersistentEntity {
     @JsonProperty("cubeName")
     private String cubeName;
 
-    @JsonProperty("margin")
-    private long margin;
-
     public String getCubeName() {
         return cubeName;
     }
@@ -91,14 +88,6 @@ public class StreamingConfig extends RootPersistentEntity {
         this.name = name;
     }
 
-    public long getMargin() {
-        return margin;
-    }
-
-    public void setMargin(long margin) {
-        this.margin = margin;
-    }
-
     public String getResourcePath() {
         return concatResourcePath(name);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
index 112379e..fa7d0f8 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingManager.java
@@ -40,7 +40,11 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang3.StringUtils;
@@ -72,7 +76,6 @@ public class StreamingManager {
 
     public static final Serializer<StreamingConfig> STREAMING_SERIALIZER = new JsonSerializer<StreamingConfig>(StreamingConfig.class);
 
-
     private KylinConfig config;
 
     // name ==> StreamingConfig
@@ -102,16 +105,16 @@ public class StreamingManager {
             if (r != null) {
                 return r;
             }
-            try{
-            r = new StreamingManager(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one streamingManager singleton exist");
+            try {
+                r = new StreamingManager(config);
+                CACHE.put(config, r);
+                if (CACHE.size() > 1) {
+                    logger.warn("More than one streamingManager singleton exist");
+                }
+                return r;
+            } catch (IOException e) {
+                throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
             }
-            return r;
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to init StreamingManager from " + config, e);
-        }
         }
     }
 
@@ -131,7 +134,6 @@ public class StreamingManager {
         return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions, "_") + ".json";
     }
 
-
     public StreamingConfig getStreamingConfig(String name) {
         return streamingMap.get(name);
     }
@@ -140,7 +142,6 @@ public class StreamingManager {
         return new ArrayList<>(streamingMap.values());
     }
 
-
     /**
      * Reload StreamingConfig from resource store It will be triggered by an desc
      * update event.
@@ -168,7 +169,6 @@ public class StreamingManager {
         streamingMap.remove(streamingConfig.getName());
     }
 
-
     public StreamingConfig getConfig(String name) {
         name = name.toUpperCase();
         return streamingMap.get(name);
@@ -195,7 +195,6 @@ public class StreamingManager {
             throw new IllegalArgumentException("StreamingConfig '" + name + "' does not exist.");
         }
 
-
         // Save Source
         String path = desc.getResourcePath();
         getStore().putResource(path, desc, STREAMING_SERIALIZER);
@@ -208,7 +207,6 @@ public class StreamingManager {
         return ndesc;
     }
 
-
     public StreamingConfig saveStreamingConfig(StreamingConfig streamingConfig) throws IOException {
         if (streamingConfig == null || StringUtils.isEmpty(streamingConfig.getName())) {
             throw new IllegalArgumentException();
@@ -219,7 +217,7 @@ public class StreamingManager {
 
         String path = formatStreamingConfigPath(streamingConfig.getName());
         getStore().putResource(path, streamingConfig, StreamingConfig.SERIALIZER);
-        streamingMap.put(streamingConfig.getName(),streamingConfig);
+        streamingMap.put(streamingConfig.getName(), streamingConfig);
         return streamingConfig;
     }
 
@@ -318,8 +316,8 @@ public class StreamingManager {
         }
 
         logger.debug("Loaded " + streamingMap.size() + " StreamingConfig(s)");
-    }    
-    
+    }
+
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class), SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
index a4ccabc..3b1693a 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -66,9 +66,6 @@ public class StreamingCLI {
             while (i < args.length) {
                 String argName = args[i];
                 switch (argName) {
-                case "-oneoff":
-                    Boolean.parseBoolean(args[++i]);
-                    break;
                 case "-start":
                     bootstrapConfig.setStart(Long.parseLong(args[++i]));
                     break;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
new file mode 100644
index 0000000..fba664d
--- /dev/null
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/diagnose/StreamingLogAnalyzer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.streaming.diagnose;
+
+import java.io.File;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.FileUtils;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+public class StreamingLogAnalyzer {
+    public static void main(String[] args) {
+        int errorFileCount = 0;
+        List<Long> ellapsedTimes = Lists.newArrayList();
+
+        String patternStr = "(\\d{2}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2})";
+        Pattern pattern = Pattern.compile(patternStr);
+
+        SimpleDateFormat format = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
+        format.setTimeZone(TimeZone.getTimeZone("GMT")); // NOTE: this must be GMT to calculate epoch date correctly
+
+        Preconditions.checkArgument(args.length == 1, "Usage: StreamingLogsAnalyser streaming_logs_folder");
+        for (File file : FileUtils.listFiles(new File(args[0]), new String[] { "log" }, false)) {
+            System.out.println("Processing file " + file.toString());
+
+            long startTime = 0;
+            long endTime = 0;
+            try {
+                List<String> contents = Files.readAllLines(file.toPath(), Charset.defaultCharset());
+                for (int i = 0; i < contents.size(); ++i) {
+                    Matcher m = pattern.matcher(contents.get(i));
+                    if (m.find()) {
+                        startTime = format.parse("20" + m.group(1)).getTime();
+                        break;
+                    }
+                }
+
+                for (int i = contents.size() - 1; i >= 0; --i) {
+                    Matcher m = pattern.matcher(contents.get(i));
+                    if (m.find()) {
+                        endTime = format.parse("20" + m.group(1)).getTime();
+                        break;
+                    }
+                }
+
+                if (startTime == 0 || endTime == 0) {
+                    throw new RuntimeException("start time or end time is not found");
+                }
+
+                if (endTime - startTime < 60000) {
+                    System.out.println("Warning: this job took less than one minute!!!! " + file.toString());
+                }
+
+                ellapsedTimes.add(endTime - startTime);
+
+            } catch (Exception e) {
+                System.out.println("Exception when processing log file " + file.toString());
+                System.out.println(e);
+                errorFileCount++;
+            }
+        }
+
+        System.out.println("Totally error files count " + errorFileCount);
+        System.out.println("Totally normal files processed " + ellapsedTimes.size());
+
+        long sum = 0;
+        for (Long x : ellapsedTimes) {
+            sum += x;
+        }
+        System.out.println("Avg build time " + (sum / ellapsedTimes.size()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 47db924..0ae7143 100644
--- a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -45,6 +45,7 @@ import org.apache.kylin.engine.streaming.cube.StreamingCubeBuilder;
 import com.google.common.base.Preconditions;
 
 /**
+ * TODO: like MRUtil, use Factory pattern to allow config
  */
 public class StreamingUtils {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 9951f86..a1ab712 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -33,7 +33,6 @@
  */
 package org.apache.kylin.source.kafka;
 
-import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -41,25 +40,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import javax.annotation.Nullable;
-
 import kafka.cluster.Broker;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
 import kafka.message.MessageAndOffset;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.streaming.IStreamingInput;
 import org.apache.kylin.engine.streaming.StreamingBatch;
-import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.engine.streaming.StreamingMessage;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
 import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.apache.kylin.source.kafka.util.KafkaRequester;
@@ -67,11 +57,9 @@ import org.apache.kylin.source.kafka.util.KafkaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 
-/**
- */
+@SuppressWarnings("unused")
 public class KafkaStreamingInput implements IStreamingInput {
 
     private static final Logger logger = LoggerFactory.getLogger(KafkaStreamingInput.class);
@@ -83,7 +71,7 @@ public class KafkaStreamingInput implements IStreamingInput {
             final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
             final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
-            final StreamingParser streamingParser = getStreamingParser(kafkaConfig);
+            final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig);
             final ExecutorService executorService = Executors.newCachedThreadPool();
             final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();
             for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
@@ -210,23 +198,4 @@ public class KafkaStreamingInput implements IStreamingInput {
         }
     }
 
-    private StreamingParser getStreamingParser(KafkaConfig kafkaConfig) throws ReflectiveOperationException {
-        final String cubeName = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(kafkaConfig.getName()).getCubeName();
-        final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-        List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
-            @Nullable
-            @Override
-            public TblColRef apply(IntermediateColumnDesc input) {
-                return input.getColRef();
-            }
-        });
-        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
-            Class clazz = Class.forName(kafkaConfig.getParserName());
-            Constructor constructor = clazz.getConstructor(List.class, String.class);
-            return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
-        } else {
-            throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 5c83463..aace8bc 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -34,20 +34,56 @@
 
 package org.apache.kylin.source.kafka;
 
+import java.lang.reflect.Constructor;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
 import kafka.message.MessageAndOffset;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.engine.streaming.StreamingManager;
 import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.IntermediateColumnDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Lists;
 
 /**
+ * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params
  */
-public interface StreamingParser {
+public abstract class StreamingParser {
 
     /**
      * @param kafkaMessage
      * @return StreamingMessage must not be NULL
      */
-    StreamingMessage parse(MessageAndOffset kafkaMessage);
+    abstract public StreamingMessage parse(MessageAndOffset kafkaMessage);
 
-    boolean filter(StreamingMessage streamingMessage);
+    abstract public boolean filter(StreamingMessage streamingMessage);
 
+    public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig) throws ReflectiveOperationException {
+        final String cubeName = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(kafkaConfig.getName()).getCubeName();
+        final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+        List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() {
+            @Nullable
+            @Override
+            public TblColRef apply(IntermediateColumnDesc input) {
+                return input.getColRef();
+            }
+        });
+        if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
+            Class clazz = Class.forName(kafkaConfig.getParserName());
+            Constructor constructor = clazz.getConstructor(List.class, String.class);
+            return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties());
+        } else {
+            throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index 307f73a..c0e506f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -34,20 +34,24 @@
 
 package org.apache.kylin.source.kafka;
 
-import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
 import kafka.message.MessageAndOffset;
+
 import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.TblColRef;
 
-import java.nio.ByteBuffer;
-import java.util.Collections;
+import com.google.common.collect.Lists;
 
 /**
  */
-public final class StringStreamingParser implements StreamingParser {
+public final class StringStreamingParser extends StreamingParser {
 
-    public static final StringStreamingParser instance = new StringStreamingParser();
+    public static final StringStreamingParser instance = new StringStreamingParser(null, null);
 
-    private StringStreamingParser() {
+    private StringStreamingParser(List<TblColRef> allColumns, String propertiesStr) {
     }
 
     @Override
@@ -55,7 +59,7 @@ public final class StringStreamingParser implements StreamingParser {
         final ByteBuffer payload = kafkaMessage.message().payload();
         byte[] bytes = new byte[payload.limit()];
         payload.get(bytes);
-        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object>emptyMap());
+        return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 9b5071b..4fae228 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -54,7 +54,7 @@ import java.util.*;
 /**
  * each json message with a "timestamp" field
  */
-public final class TimedJsonStreamParser implements StreamingParser {
+public final class TimedJsonStreamParser extends StreamingParser {
 
     private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
index b17dbfd..eb1150e 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaClusterConfig.java
@@ -40,10 +40,6 @@ public class KafkaClusterConfig extends RootPersistentEntity {
         return kafkaConfig.getTimeout();
     }
 
-    public int getMaxReadCount() {
-        return kafkaConfig.getMaxReadCount();
-    }
-
     public List<BrokerConfig> getBrokerConfigs() {
         return brokerConfigs;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index 90df9f5..2d8951f 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -71,9 +71,6 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("timeout")
     private int timeout;
 
-    @JsonProperty("maxReadCount")
-    private int maxReadCount;
-
     @JsonProperty("bufferSize")
     private int bufferSize;
 
@@ -86,6 +83,7 @@ public class KafkaConfig extends RootPersistentEntity {
     //"configA=1;configB=2"
     @JsonProperty("parserProperties")
     private String parserProperties;
+    
     public String getResourcePath() {
         return getKafkaResourcePath(name);
     }
@@ -114,14 +112,6 @@ public class KafkaConfig extends RootPersistentEntity {
         this.timeout = timeout;
     }
 
-    public int getMaxReadCount() {
-        return maxReadCount;
-    }
-
-    public void setMaxReadCount(int maxReadCount) {
-        this.maxReadCount = maxReadCount;
-    }
-
     public int getBufferSize() {
         return bufferSize;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
new file mode 100644
index 0000000..de5e58e
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.diagnose;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import kafka.api.OffsetRequest;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.message.MessageAndOffset;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.DaemonThreadFactory;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.kafka.KafkaConfigManager;
+import org.apache.kylin.source.kafka.StreamingParser;
+import org.apache.kylin.source.kafka.TimedJsonStreamParser;
+import org.apache.kylin.source.kafka.config.KafkaClusterConfig;
+import org.apache.kylin.source.kafka.config.KafkaConfig;
+import org.apache.kylin.source.kafka.util.KafkaRequester;
+import org.apache.kylin.source.kafka.util.KafkaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Continuously run this as a daemon to discover how "disordered" the kafka queue is.
+ * This daemon only store a digest so it should not be space-consuming
+ */
+public class KafkaInputAnalyzer extends AbstractApplication {
+
+    public class KafkaMessagePuller implements Runnable {
+
+        private final String topic;
+        private final int partitionId;
+        private final KafkaClusterConfig streamingConfig;
+        private final LinkedBlockingQueue<StreamingMessage> streamQueue;
+        private final StreamingParser streamingParser;
+        private final Broker leadBroker;
+        private long offset;
+
+        protected final Logger logger;
+
+        public KafkaMessagePuller(int clusterID, String topic, int partitionId, long startOffset, Broker leadBroker, KafkaClusterConfig kafkaClusterConfig, StreamingParser streamingParser) {
+            this.topic = topic;
+            this.partitionId = partitionId;
+            this.streamingConfig = kafkaClusterConfig;
+            this.offset = startOffset;
+            this.logger = LoggerFactory.getLogger(topic + "_cluster_" + clusterID + "_" + partitionId);
+            this.streamQueue = new LinkedBlockingQueue<StreamingMessage>(10000);
+            this.streamingParser = streamingParser;
+            this.leadBroker = leadBroker;
+        }
+
+        public BlockingQueue<StreamingMessage> getStreamQueue() {
+            return streamQueue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                int consumeMsgCount = 0;
+                int fetchRound = 0;
+                while (true) {
+                    int consumeMsgCountAtBeginning = consumeMsgCount;
+                    fetchRound++;
+
+                    logger.info("fetching topic {} partition id {} offset {} leader {}", new String[] { topic, String.valueOf(partitionId), String.valueOf(offset), leadBroker.toString() });
+
+                    final FetchResponse fetchResponse = KafkaRequester.fetchResponse(topic, partitionId, offset, leadBroker, streamingConfig);
+                    if (fetchResponse.errorCode(topic, partitionId) != 0) {
+                        logger.warn("fetch response offset:" + offset + " errorCode:" + fetchResponse.errorCode(topic, partitionId));
+                        Thread.sleep(30000);
+                        continue;
+                    }
+
+                    for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+                        offset++;
+                        consumeMsgCount++;
+
+                        final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset);
+                        if (streamingParser.filter(streamingMessage)) {
+                            streamQueue.add(streamingMessage);
+                        }
+
+                    }
+                    logger.info("Number of messages consumed: " + consumeMsgCount + " offset is: " + offset + " total fetch round: " + fetchRound);
+
+                    if (consumeMsgCount == consumeMsgCountAtBeginning) {//nothing this round 
+                        Thread.sleep(30000);
+                    }
+                }
+            } catch (Exception e) {
+                logger.error("consumer has encountered an error", e);
+            }
+        }
+
+    }
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_STREAMING = OptionBuilder.withArgName("streaming").hasArg().isRequired(true).withDescription("Name of the streaming").create("streaming");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_TASK = OptionBuilder.withArgName("task").hasArg().isRequired(true).withDescription("get delay or get disorder degree").create("task");
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_TSCOLNAME = OptionBuilder.withArgName("tsColName").hasArg().isRequired(true).withDescription("field name of the ts").create("tsColName");
+
+    private static final Logger logger = LoggerFactory.getLogger(KafkaInputAnalyzer.class);
+
+    private StreamingParser parser;
+    private KafkaConfig kafkaConfig;
+
+    private Options options;
+
+    public KafkaInputAnalyzer() {
+        options = new Options();
+        options.addOption(OPTION_STREAMING);
+        options.addOption(OPTION_TASK);
+        options.addOption(OPTION_TSCOLNAME);
+
+    }
+
+    private List<BlockingQueue<StreamingMessage>> consume(final int clusterID, final KafkaClusterConfig kafkaClusterConfig, final int partitionCount, long whichtime) {
+        List<BlockingQueue<StreamingMessage>> result = Lists.newArrayList();
+        for (int partitionId = 0; partitionId < partitionCount; ++partitionId) {
+            final kafka.cluster.Broker leadBroker = KafkaUtils.getLeadBroker(kafkaClusterConfig, partitionId);
+            long streamingOffset = KafkaRequester.getLastOffset(kafkaClusterConfig.getTopic(), partitionId, whichtime, leadBroker, kafkaClusterConfig);
+            logger.info("starting offset:" + streamingOffset + " cluster id:" + clusterID + " partitionId:" + partitionId);
+            KafkaMessagePuller consumer = new KafkaMessagePuller(clusterID, kafkaClusterConfig.getTopic(), partitionId, streamingOffset, leadBroker, kafkaClusterConfig, parser);
+            Executors.newSingleThreadExecutor(new DaemonThreadFactory()).submit(consumer);
+            result.add(consumer.getStreamQueue());
+        }
+        return result;
+    }
+
+    private List<BlockingQueue<StreamingMessage>> consumeAll(long whichtime) {
+        int clusterId = 0;
+        final List<BlockingQueue<StreamingMessage>> queues = Lists.newLinkedList();
+
+        for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) {
+            final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size();
+            final List<BlockingQueue<StreamingMessage>> oneClusterQueue = consume(clusterId, kafkaClusterConfig, partitionCount, whichtime);
+            queues.addAll(oneClusterQueue);
+            logger.info("Cluster {} with {} partitions", clusterId, oneClusterQueue.size());
+            clusterId++;
+        }
+        return queues;
+    }
+
+    private void analyzeLatency() throws InterruptedException {
+        long[] intervals = new long[] { 1, 5, 60, 300, 1800 };
+        final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.LatestTime());
+        final List<TimeHistogram> allHistograms = Lists.newArrayList();
+        final TimeHistogram overallHistogram = new TimeHistogram(intervals, "overall");
+
+        ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
+        for (int i = 0; i < allPartitionData.size(); ++i) {
+            final int index = i;
+            allHistograms.add(new TimeHistogram(intervals, "" + i));
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    while (true) {
+                        try {
+                            StreamingMessage message = allPartitionData.get(index).take();
+                            long t = message.getTimestamp();
+                            allHistograms.get(index).processMillis(System.currentTimeMillis() - t);
+                            overallHistogram.processMillis(System.currentTimeMillis() - t);
+                        } catch (InterruptedException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                }
+            });
+        }
+
+        while (true) {
+            System.out.println("Printing status at : " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(Calendar.getInstance().getTime()));
+
+            for (TimeHistogram histogram : allHistograms) {
+                histogram.printStatus();
+            }
+            overallHistogram.printStatus();
+            Thread.sleep(300000);
+        }
+    }
+
+    private void analyzeDisorder() throws InterruptedException {
+        final List<BlockingQueue<StreamingMessage>> allPartitionData = consumeAll(OffsetRequest.EarliestTime());
+
+        final List<Long> wallClocks = Lists.newArrayList();
+        final List<Long> wallOffset = Lists.newArrayList();
+        final List<Long> maxDisorderTime = Lists.newArrayList();
+        final List<Long> maxDisorderOffset = Lists.newArrayList();
+        final List<Long> processedMessages = Lists.newArrayList();
+
+        for (int i = 0; i < allPartitionData.size(); i++) {
+            wallClocks.add(0L);
+            wallOffset.add(0L);
+            maxDisorderTime.add(0L);
+            maxDisorderOffset.add(0L);
+            processedMessages.add(0L);
+        }
+
+        ExecutorService executorService = Executors.newFixedThreadPool(allPartitionData.size(), new DaemonThreadFactory());
+        final CountDownLatch countDownLatch = new CountDownLatch(allPartitionData.size());
+        for (int i = 0; i < allPartitionData.size(); ++i) {
+            final int index = i;
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        while (true) {
+                            StreamingMessage message = allPartitionData.get(index).poll(60, TimeUnit.SECONDS);
+                            if (message == null) {
+                                System.out.println(String.format("Thread %d is exiting", index));
+                                break;
+                            }
+                            long t = message.getTimestamp();
+                            long offset = message.getOffset();
+                            if (t < wallClocks.get(index)) {
+                                maxDisorderTime.set(index, Math.max(wallClocks.get(index) - t, maxDisorderTime.get(index)));
+                                maxDisorderOffset.set(index, Math.max(offset - wallOffset.get(index), maxDisorderOffset.get(index)));
+                            } else {
+                                wallClocks.set(index, t);
+                                wallOffset.set(index, offset);
+                            }
+                            processedMessages.set(index, processedMessages.get(index) + 1);
+
+                            if (processedMessages.get(index) % 10000 == 1) {
+                                System.out.println(String.format("Thread %d processed %d messages. Max disorder time is %d , max disorder offset is %d",//
+                                        index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
+                            }
+                        }
+
+                        System.out.println(String.format("Thread %d finishes after %d messages. Max disorder time is %d , max disorder offset is %d",//
+                                index, processedMessages.get(index), maxDisorderTime.get(index), maxDisorderOffset.get(index)));
+                        countDownLatch.countDown();
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+            });
+        }
+
+        countDownLatch.await();
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+
+        String streaming = optionsHelper.getOptionValue(OPTION_STREAMING);
+        String task = optionsHelper.getOptionValue(OPTION_TASK);
+        String tsColName = optionsHelper.getOptionValue(OPTION_TSCOLNAME);
+
+        kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(streaming);
+        parser = new TimedJsonStreamParser(Lists.<TblColRef> newArrayList(), "formatTs=true;tsColName=" + tsColName);
+
+        if ("disorder".equalsIgnoreCase(task)) {
+            analyzeDisorder();
+        } else if ("delay".equalsIgnoreCase(task)) {
+            analyzeLatency();
+        } else {
+            optionsHelper.printUsage(this.getClass().getName(), options);
+        }
+    }
+
+    public static void main(String[] args) {
+        KafkaInputAnalyzer analyzer = new KafkaInputAnalyzer();
+        analyzer.execute(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
new file mode 100644
index 0000000..9b6cb4d
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaVerify.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.diagnose;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+
+/**
+ * only for verify kylin streaming's correctness by comparing to data in original kafka topic
+ */
+public class KafkaVerify {
+
+    public static void main(String[] args) throws IOException {
+
+        System.out.println("start");
+        
+        ObjectMapper mapper = new ObjectMapper();
+        JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+        long start = Long.valueOf(args[0]);
+        long end = Long.valueOf(args[1]);
+        long interval = Long.valueOf(args[2]);
+        int bucket = (int) ((end - start + interval - 1) / interval);
+        
+        long qtySum[] = new long[bucket];
+        long qtyTotal = 0;
+        long counts[] = new long[bucket];
+        long countTotal = 0;
+        long processed = 0;
+        long minOffset = -1;
+        long maxOffset = -1;
+
+        try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
+            String s;
+            while ((s = br.readLine()) != null) {
+                // process the line.
+                if (++processed % 10000 == 1) {
+                    System.out.println("processing " + processed);
+                }
+
+                Map<String, String> root = mapper.readValue(s, mapType);
+                String tsStr = root.get("sys_ts");
+
+                if (StringUtils.isEmpty(tsStr)) {
+                    continue;
+                }
+                long ts = Long.valueOf(tsStr);
+                if (ts < start || ts >= end) {
+                    continue;
+                }
+
+                if (minOffset == -1) {
+                    minOffset = processed - 1;
+                }
+                maxOffset = processed - 1;
+
+                long qty = Long.valueOf(root.get("qty"));
+                int index = (int) ((ts - start) / interval);
+                qtySum[index] += qty;
+                qtyTotal += qty;
+                counts[index]++;
+                countTotal++;
+            }
+        }
+
+        System.out.println("qty sum is " + Arrays.toString(qtySum));
+        System.out.println("qty total is " + qtyTotal);
+        System.out.println("count is " + Arrays.toString(counts));
+        System.out.println("count total is " + countTotal);
+        System.out.println("first processed is " + minOffset);
+        System.out.println("last processed is " + maxOffset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
new file mode 100644
index 0000000..1c579c6
--- /dev/null
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/TimeHistogram.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.kafka.diagnose;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TimeHistogram {
+    private long[] bucketsBoundary;
+    private AtomicLong[] counters;
+    private String id;
+
+    private static Object printLock = new Object();
+
+    /**
+     * example: [10,20] will generate three  buckets: (-∞,10), [10,20),[20,+∞)
+     * unit: second
+     */
+    public TimeHistogram(long[] bucketsBoundary, String id) {
+        this.bucketsBoundary = bucketsBoundary;
+        this.counters = new AtomicLong[this.bucketsBoundary.length + 1];
+        for (int i = 0; i < counters.length; i++) {
+            this.counters[i] = new AtomicLong();
+        }
+        this.id = id;
+    }
+
+    /**
+     * @param second in seconds
+     */
+    public void process(long second) {
+        for (int i = 0; i < bucketsBoundary.length; ++i) {
+            if (second < bucketsBoundary[i]) {
+                counters[i].incrementAndGet();
+                return;
+            }
+        }
+
+        counters[bucketsBoundary.length].incrementAndGet();
+    }
+
+    /**
+     * @param millis in milli seconds
+     */
+    public void processMillis(long millis) {
+        process(millis / 1000);
+    }
+
+    public void printStatus() {
+        long[] countersSnapshot = new long[counters.length];
+        for (int i = 0; i < countersSnapshot.length; i++) {
+            countersSnapshot[i] = counters[i].get();
+        }
+
+        long sum = 0;
+        for (long counter : countersSnapshot) {
+            sum += counter;
+        }
+
+        synchronized (printLock) {
+            System.out.println("============== status of TimeHistogram " + id + " =================");
+
+            for (int i = 0; i < countersSnapshot.length; ++i) {
+                System.out.println(String.format("bucket: %d , count: %d ,percentage: %.4f", i, countersSnapshot[i], 1.0 * countersSnapshot[i] / (sum == 0 ? 1 : sum)));
+            }
+
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaVerify.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaVerify.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaVerify.java
deleted file mode 100644
index 0ace6e1..0000000
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaVerify.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.source.kafka.util;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-
-/**
- * only for verify kylin streaming's correctness by comparing to data in original kafka topic
- */
-public class KafkaVerify {
-
-    public static void main(String[] args) throws IOException {
-
-        System.out.println("start");
-        
-        ObjectMapper mapper = new ObjectMapper();
-        JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class));
-
-        long start = Long.valueOf(args[0]);
-        long end = Long.valueOf(args[1]);
-        long interval = Long.valueOf(args[2]);
-        int bucket = (int) ((end - start + interval - 1) / interval);
-        
-        long qtySum[] = new long[bucket];
-        long qtyTotal = 0;
-        long counts[] = new long[bucket];
-        long countTotal = 0;
-        long processed = 0;
-        long minOffset = -1;
-        long maxOffset = -1;
-
-        try (BufferedReader br = new BufferedReader(new FileReader(new File(args[3])))) {
-            String s;
-            while ((s = br.readLine()) != null) {
-                // process the line.
-                if (++processed % 10000 == 1) {
-                    System.out.println("processing " + processed);
-                }
-
-                Map<String, String> root = mapper.readValue(s, mapType);
-                String tsStr = root.get("sys_ts");
-
-                if (StringUtils.isEmpty(tsStr)) {
-                    continue;
-                }
-                long ts = Long.valueOf(tsStr);
-                if (ts < start || ts >= end) {
-                    continue;
-                }
-
-                if (minOffset == -1) {
-                    minOffset = processed - 1;
-                }
-                maxOffset = processed - 1;
-
-                long qty = Long.valueOf(root.get("qty"));
-                int index = (int) ((ts - start) / interval);
-                qtySum[index] += qty;
-                qtyTotal += qty;
-                counts[index]++;
-                countTotal++;
-            }
-        }
-
-        System.out.println("qty sum is " + Arrays.toString(qtySum));
-        System.out.println("qty total is " + qtyTotal);
-        System.out.println("count is " + Arrays.toString(counts));
-        System.out.println("count total is " + countTotal);
-        System.out.println("first processed is " + minOffset);
-        System.out.println("last processed is " + maxOffset);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e66013e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 5c2117d..c62308e 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 import org.apache.hadoop.hbase.client.HConnection;


Mime
View raw message