kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [1/2] kylin git commit: KYLIN-1919 performance enhancement
Date Sat, 22 Oct 2016 04:42:37 GMT
Repository: kylin
Updated Branches:
  refs/heads/KYLIN-1971 5156ccd47 -> b1a965097


KYLIN-1919 performance enhancement


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3142c74c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3142c74c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3142c74c

Branch: refs/heads/KYLIN-1971
Commit: 3142c74cb22ea2b80ba36de462da69197020e1b3
Parents: 5156ccd
Author: shaofengshi <shaofengshi@apache.org>
Authored: Fri Oct 21 18:30:44 2016 +0800
Committer: shaofengshi <shaofengshi@apache.org>
Committed: Fri Oct 21 18:32:52 2016 +0800

----------------------------------------------------------------------
 .../kylin/source/kafka/StreamingParser.java     |  49 +++--
 .../source/kafka/TimedJsonStreamParser.java     |  31 ++--
 .../test/java/TimedJsonStreamParserTest.java    | 178 -------------------
 .../source/kafka/TimedJsonStreamParserTest.java | 166 +++++++++++++++++
 source-kafka/src/test/resources/message.json    |  10 +-
 5 files changed, 217 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 43b2ac5..75f9c4b 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -21,7 +21,6 @@ package org.apache.kylin.source.kafka;
 import java.lang.reflect.Constructor;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
@@ -31,7 +30,6 @@ import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,15 +45,15 @@ public abstract class StreamingParser {
     public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
 
     public static final Map<String, String> defaultProperties = Maps.newHashMap();
-    public static final Set derivedTimeColumns = Sets.newHashSet();
+    public static final Map<String, Integer> derivedTimeColumns = Maps.newHashMap();
     static {
-        derivedTimeColumns.add("minute_start");
-        derivedTimeColumns.add("hour_start");
-        derivedTimeColumns.add("day_start");
-        derivedTimeColumns.add("week_start");
-        derivedTimeColumns.add("month_start");
-        derivedTimeColumns.add("quarter_start");
-        derivedTimeColumns.add("year_start");
+        derivedTimeColumns.put("minute_start", 1);
+        derivedTimeColumns.put("hour_start", 2);
+        derivedTimeColumns.put("day_start", 3);
+        derivedTimeColumns.put("week_start", 4);
+        derivedTimeColumns.put("month_start", 5);
+        derivedTimeColumns.put("quarter_start", 6);
+        derivedTimeColumns.put("year_start", 7);
         defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
         defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser");
         defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
@@ -108,32 +106,45 @@ public abstract class StreamingParser {
      * @return true if the columnName is a derived time column; otherwise false;
      */
     public static final boolean populateDerivedTimeColumns(String columnName, List<String>
result, long t) {
-        if (derivedTimeColumns.contains(columnName) == false)
+
+        Integer derivedTimeColumn = derivedTimeColumns.get(columnName);
+        if (derivedTimeColumn == null) {
             return false;
+        }
 
         long normalized = 0;
-        if (columnName.equals("minute_start")) {
+        switch (derivedTimeColumn) {
+        case 1:
             normalized = TimeUtil.getMinuteStart(t);
             result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
-        } else if (columnName.equals("hour_start")) {
+            break;
+        case 2:
             normalized = TimeUtil.getHourStart(t);
             result.add(DateFormat.formatToTimeWithoutMilliStr(normalized));
-        } else if (columnName.equals("day_start")) {
-            //from day_start on, formatTs will output date format
+            break;
+        case 3:
             normalized = TimeUtil.getDayStart(t);
             result.add(DateFormat.formatToDateStr(normalized));
-        } else if (columnName.equals("week_start")) {
+            break;
+        case 4:
             normalized = TimeUtil.getWeekStart(t);
             result.add(DateFormat.formatToDateStr(normalized));
-        } else if (columnName.equals("month_start")) {
+            break;
+        case 5:
             normalized = TimeUtil.getMonthStart(t);
             result.add(DateFormat.formatToDateStr(normalized));
-        } else if (columnName.equals("quarter_start")) {
+            break;
+        case 6:
             normalized = TimeUtil.getQuarterStart(t);
             result.add(DateFormat.formatToDateStr(normalized));
-        } else if (columnName.equals("year_start")) {
+            break;
+        case 7:
             normalized = TimeUtil.getYearStart(t);
             result.add(DateFormat.formatToDateStr(normalized));
+            break;
+        default:
+            throw new IllegalStateException();
+
         }
 
         return true;

http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 633a30c..e00ce16 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -64,6 +64,9 @@ public final class TimedJsonStreamParser extends StreamingParser {
     private String tsColName = null;
     private String tsParser = null;
     private String separator = null;
+    private final Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+    private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+    private final Map<String, String[]> nameMap = new HashMap<>();
 
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
SimpleType.construct(Object.class));
 
@@ -100,15 +103,14 @@ public final class TimedJsonStreamParser extends StreamingParser {
     public StreamingMessage parse(ByteBuffer buffer) {
         try {
             Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer),
mapType);
-            Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+            root.clear();
             root.putAll(message);
             String tsStr = objToString(root.get(tsColName));
             long t = streamTimeParser.parseTime(tsStr);
             ArrayList<String> result = Lists.newArrayList();
 
             for (TblColRef column : allColumns) {
-                String columnName = column.getName();
-                columnName = columnName.toLowerCase();
+                final String columnName = column.getName().toLowerCase();
                 if (populateDerivedTimeColumns(columnName, result, t) == false) {
                     result.add(getValueByKey(columnName, root));
                 }
@@ -126,18 +128,24 @@ public final class TimedJsonStreamParser extends StreamingParser {
         return true;
     }
 
-    protected String getValueByKey(String key, Map<String, Object> root) throws IOException
{
-        if (root.containsKey(key)) {
-            return objToString(root.get(key));
+    protected String getValueByKey(String key, Map<String, Object> rootMap) throws
IOException {
+        if (rootMap.containsKey(key)) {
+            return objToString(rootMap.get(key));
         }
 
-        if (key.contains(separator)) {
-            String[] names = key.toLowerCase().split(separator);
-            Map<String, Object> tempMap = root;
+        String[] names = nameMap.get(key);
+        if (names == null && key.contains(separator)) {
+            names = key.toLowerCase().split(separator);
+            nameMap.put(key, names);
+        }
+
+        if (names != null && names.length > 0) {
+            tempMap.clear();
+            tempMap.putAll(rootMap);
             for (int i = 0; i < names.length - 1; i++) {
-                Object o = root.get(names[i]);
+                Object o = tempMap.get(names[i]);
                 if (o instanceof Map) {
-                    tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+                    tempMap.clear();
                     tempMap.putAll((Map<String, Object>) o);
                 } else {
                     throw new IOException("Property '" + names[i] + "' is not embedded format");
@@ -145,7 +153,6 @@ public final class TimedJsonStreamParser extends StreamingParser {
             }
             Object finalObject = tempMap.get(names[names.length - 1]);
             return objToString(finalObject);
-
         }
 
         return StringUtils.EMPTY;

http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/test/java/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/TimedJsonStreamParserTest.java
deleted file mode 100644
index 5a52b61..0000000
--- a/source-kafka/src/test/java/TimedJsonStreamParserTest.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import com.fasterxml.jackson.databind.JavaType;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.StreamingMessage;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.source.kafka.TimedJsonStreamParser;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.HashMap;
-import java.util.ArrayList;
-
-
-import static org.junit.Assert.assertEquals;
-
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-
-
-public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
-
-    private static String[] userNeedColNames;
-
-    private static final String jsonFilePath = "src/test/resources/message.json";
-
-    private static ObjectMapper mapper;
-
-    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
-            SimpleType.construct(Object.class));
-
-
-    @BeforeClass
-    public static void setUp() throws Exception {
-        staticCreateTestMetadata();
-        mapper = new ObjectMapper();
-    }
-
-    @AfterClass
-    public static void after() throws Exception {
-        cleanAfterClass();
-    }
-
-
-    @Test
-    public void testNormalValue() throws Exception {
-        userNeedColNames = new String[]{"createdAt", "id", "isTruncated", "text"};
-        List<TblColRef> allCol = mockupTblColRefList();
-        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
-        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
-        ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
-        assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
-        assertEquals("755703618762862600", result.get(1));
-        assertEquals("false", result.get(2));
-        assertEquals("dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup",
result.get(3));
-    }
-
-    @Test
-    public void testEmbeddedValue() throws Exception {
-        userNeedColNames = new String[]{"user_id", "user_description", "user_isProtected"};
-        List<TblColRef> allCol = mockupTblColRefList();
-        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
-        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
-        ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
-        assertEquals("4853763947", result.get(0));
-        assertEquals("Noticias, an��lisis e informaci��n para el crecimiento de la
regi��n.", result.get(1));
-        assertEquals("false", result.get(2));
-    }
-
-    @Test
-    public void testArrayValue() throws Exception {
-        userNeedColNames = new String[]{"userMentionEntities", "mediaEntities"};
-        List<TblColRef> allCol = mockupTblColRefList();
-        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
-        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
-        HashMap<String, Object> map = (HashMap<String, Object>) msg;
-        Object array = map.get("mediaEntities");
-        ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
-        System.out.println(result);
-
-    }
-
-    @Test
-    public void testMapValue() throws Exception {
-        userNeedColNames = new String[]{"user"};
-        List<TblColRef> allCol = mockupTblColRefList();
-        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
-        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
-        ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
-        System.out.println("result:" + result);
-
-    }
-
-    @Test
-    public void testNullKey() throws Exception {
-        userNeedColNames = new String[]{"null", ""};
-        List<TblColRef> allCol = mockupTblColRefList();
-        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
-        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
-        ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
-        assertEquals(StringUtils.EMPTY, result.get(0));
-        assertEquals(StringUtils.EMPTY, result.get(1));
-    }
-
-
-    private static ByteBuffer getJsonByteBuffer(Object obj) throws IOException {
-        byte[] bytes = mapper.writeValueAsBytes(obj);
-        ByteBuffer buff = ByteBuffer.wrap(bytes);
-        buff.position(0);
-        return buff;
-    }
-
-
-    private static List<TblColRef> mockupTblColRefList() {
-        TableDesc t = mockupTableDesc("table_a");
-        List<TblColRef> list = new ArrayList<>();
-        for (int i = 0; i < userNeedColNames.length; i++) {
-            ColumnDesc c = mockupColumnDesc(t, i, userNeedColNames[i], "string");
-            list.add(c.getRef());
-        }
-        return list;
-    }
-
-    private static TableDesc mockupTableDesc(String tableName) {
-        TableDesc mockup = new TableDesc();
-        mockup.setName(tableName);
-        return mockup;
-    }
-
-    private static ColumnDesc mockupColumnDesc(TableDesc table, int oneBasedColumnIndex,
String name, String datatype) {
-        ColumnDesc desc = new ColumnDesc();
-        String id = "" + oneBasedColumnIndex;
-        desc.setId(id);
-        desc.setName(name);
-        desc.setDatatype(datatype);
-        desc.init(table);
-        return desc;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
new file mode 100644
index 0000000..f92a24e
--- /dev/null
+++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.kafka;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.HashMap;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+
+public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
+
+    private static String[] userNeedColNames;
+    private static final String jsonFilePath = "src/test/resources/message.json";
+    private static ObjectMapper mapper;
+    private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class),
SimpleType.construct(Object.class));
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        staticCreateTestMetadata();
+        mapper = new ObjectMapper();
+    }
+
+    @AfterClass
+    public static void after() throws Exception {
+        cleanAfterClass();
+    }
+
+    @Test
+    public void testNormalValue() throws Exception {
+        userNeedColNames = new String[] { "createdAt", "id", "isTruncated", "text" };
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
+        assertEquals("755703618762862600", result.get(1));
+        assertEquals("false", result.get(2));
+        assertEquals("dejamos", result.get(3));
+    }
+
+    @Test
+    public void testEmbeddedValue() throws Exception {
+        userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected"
};
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        assertEquals("4853763947", result.get(0));
+        assertEquals("Noticias", result.get(1));
+        assertEquals("false", result.get(2));
+    }
+
+    @Test
+    public void testArrayValue() throws Exception {
+        userNeedColNames = new String[] { "userMentionEntities", "mediaEntities" };
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        HashMap<String, Object> map = (HashMap<String, Object>) msg;
+        Object array = map.get("mediaEntities");
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        System.out.println(result);
+
+    }
+
+    @Test
+    public void testMapValue() throws Exception {
+        userNeedColNames = new String[] { "user" };
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+
+    }
+
+    @Test
+    public void testNullKey() throws Exception {
+        userNeedColNames = new String[] { "null", "" };
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        StreamingMessage sMsg = parser.parse(buffer);
+        List<String> result = sMsg.getData();
+        assertEquals(StringUtils.EMPTY, result.get(0));
+        assertEquals(StringUtils.EMPTY, result.get(1));
+    }
+
+    private static ByteBuffer getJsonByteBuffer(Object obj) throws IOException {
+        byte[] bytes = mapper.writeValueAsBytes(obj);
+        ByteBuffer buff = ByteBuffer.wrap(bytes);
+        buff.position(0);
+        return buff;
+    }
+
+    private static List<TblColRef> mockupTblColRefList() {
+        TableDesc t = mockupTableDesc("table_a");
+        List<TblColRef> list = new ArrayList<>();
+        for (int i = 0; i < userNeedColNames.length; i++) {
+            ColumnDesc c = mockupColumnDesc(t, i, userNeedColNames[i], "string");
+            list.add(c.getRef());
+        }
+        return list;
+    }
+
+    private static TableDesc mockupTableDesc(String tableName) {
+        TableDesc mockup = new TableDesc();
+        mockup.setName(tableName);
+        return mockup;
+    }
+
+    private static ColumnDesc mockupColumnDesc(TableDesc table, int oneBasedColumnIndex,
String name, String datatype) {
+        ColumnDesc desc = new ColumnDesc();
+        String id = "" + oneBasedColumnIndex;
+        desc.setId(id);
+        desc.setName(name);
+        desc.setDatatype(datatype);
+        desc.init(table);
+        return desc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/3142c74c/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json
index dfafd45..55f35d3 100644
--- a/source-kafka/src/test/resources/message.json
+++ b/source-kafka/src/test/resources/message.json
@@ -1,8 +1,7 @@
 {
   "createdAt": "Jul 20, 2016 9:59:17 AM",
   "id": 755703618762862600,
-  "text": "dejamos las tapas regionales de este #Miercoles https://t.co/kfe0kT2Fup",
-  "source": "<a href=\"http://twitter.com\" rel=\"nofollow\">Twitter Web Client</a>",
+  "text": "dejamos",
   "isTruncated": false,
   "inReplyToStatusId": -1,
   "inReplyToUserId": -1,
@@ -15,8 +14,6 @@
   "mediaEntities": [
     {
       "id": 755703584084328400,
-      "url": "https://t.co/kfe0kT2Fup",
-      "displayURL": "pic.twitter.com/kfe0kT2Fup",
       "sizes": {
         "0": {
           "width": 150,
@@ -38,11 +35,8 @@
   "currentUserRetweetId": -1,
   "user": {
     "id": 4853763947,
-    "name": "El Metropolitano",
-    "screenName": "ElTWdelMetro",
-    "description": "Noticias, an��lisis e informaci��n para el crecimiento de la
regi��n.",
+    "description": "Noticias",
     "isDefaultProfileImage": false,
-    "url": "http://elmetropolitano.com.ar/",
     "isProtected": false
   }
 }
\ No newline at end of file


Mime
View raw message