kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [04/50] incubator-kylin git commit: refactor
Date Sat, 28 Mar 2015 00:04:41 GMT
refactor


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/56d57a2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/56d57a2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/56d57a2d

Branch: refs/heads/streaming-localdict
Commit: 56d57a2d5940a19c575125a9aff073235355b4c3
Parents: 9dd1512
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Thu Mar 26 16:21:41 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Thu Mar 26 16:21:41 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/streaming/JsonStreamParser.java     | 11 +++++++++--
 .../java/org/apache/kylin/streaming/KafkaConfig.java     |  3 +++
 .../java/org/apache/kylin/streaming/StreamParser.java    |  3 +--
 .../org/apache/kylin/streaming/StringStreamParser.java   |  2 +-
 .../apache/kylin/streaming/cube/CubeStreamBuilder.java   |  2 +-
 .../streaming/invertedindex/PrintOutStreamBuilder.java   |  6 +++---
 6 files changed, 18 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/56d57a2d/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
index cb43dc6..5c8b49d 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/JsonStreamParser.java
@@ -35,6 +35,7 @@
 package org.apache.kylin.streaming;
 
 import com.google.common.collect.Lists;
+import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -42,6 +43,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created by qianzhou on 3/25/15.
@@ -55,11 +57,16 @@ public final class JsonStreamParser implements StreamParser {
     private JsonStreamParser(){}
 
     @Override
-    public List<String> parse(Stream stream, Collection<TblColRef> allColumns)
{
+    public List<String> parse(Stream stream, List<TblColRef> allColumns) {
         final JsonObject root = jsonParser.parse(new String(stream.getRawData())).getAsJsonObject();
         ArrayList<String> result = Lists.newArrayList();
+
         for (TblColRef column : allColumns) {
-            result.add(root.get(column.getName()).getAsString());
+            for (Map.Entry<String, JsonElement> entry : root.entrySet()) {
+                if (entry.getKey().equalsIgnoreCase(column.getName())) {
+                    result.add(entry.getValue().getAsString());
+                }
+            }
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/56d57a2d/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
index ee5a96a..b22c7e0 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/KafkaConfig.java
@@ -76,6 +76,9 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("bufferSize")
     private int bufferSize;
 
+    @JsonProperty("iiDesc")
+    private String iiDesc;
+
     private int partitionId;
 
     public int getTimeout() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/56d57a2d/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
index 0c59151..9b41c95 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamParser.java
@@ -36,7 +36,6 @@ package org.apache.kylin.streaming;
 
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.util.Collection;
 import java.util.List;
 
 /**
@@ -44,5 +43,5 @@ import java.util.List;
  */
 public interface StreamParser {
 
-    List<String> parse(Stream stream, Collection<TblColRef> allColumns);
+    List<String> parse(Stream stream, List<TblColRef> allColumns);
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/56d57a2d/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
index 7611869..3c62a3a 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StringStreamParser.java
@@ -49,7 +49,7 @@ public final class StringStreamParser implements StreamParser {
 
     private StringStreamParser(){}
     @Override
-    public List<String> parse(Stream stream, Collection<TblColRef> allColumns)
{
+    public List<String> parse(Stream stream, List<TblColRef> allColumns) {
         return Lists.newArrayList(new String(stream.getRawData()).split(","));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/56d57a2d/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
index 9429033..912c3cd 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
@@ -410,7 +410,7 @@ public class CubeStreamBuilder extends StreamBuilder {
     }
 
     private List<String> parseStream(Stream stream, CubeDesc desc) {
-        return getStreamParser().parse(stream, desc.listAllColumns());
+        return getStreamParser().parse(stream, Lists.newArrayList(desc.listAllColumns()));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/56d57a2d/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
index 43aa0a5..e83bdc5 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/invertedindex/PrintOutStreamBuilder.java
@@ -49,9 +49,9 @@ import java.util.concurrent.BlockingQueue;
  */
 public class PrintOutStreamBuilder extends StreamBuilder {
 
-    private final Collection<TblColRef> allColumns;
+    private final List<TblColRef> allColumns;
 
-    public PrintOutStreamBuilder(BlockingQueue<Stream> streamQueue, int sliceSize,
Collection<TblColRef> allColumns) {
+    public PrintOutStreamBuilder(BlockingQueue<Stream> streamQueue, int sliceSize,
List<TblColRef> allColumns) {
         super(streamQueue, sliceSize);
         setStreamParser(JsonStreamParser.instance);
         this.allColumns = allColumns;
@@ -61,7 +61,7 @@ public class PrintOutStreamBuilder extends StreamBuilder {
     protected void build(List<Stream> streamsToBuild) throws Exception {
         for (Stream stream : streamsToBuild) {
             final List<String> row = getStreamParser().parse(stream, allColumns);
-            System.out.println(StringUtils.join(row, ","));
+            System.out.println("offset:" + stream.getOffset() + " " + StringUtils.join(row,
","));
         }
     }
 }


Mime
View raw message