kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qhz...@apache.org
Subject incubator-kylin git commit: fix
Date Wed, 23 Sep 2015 06:05:14 GMT
Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-1011 919cb9951 -> b415bbca0


fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/b415bbca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/b415bbca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/b415bbca

Branch: refs/heads/KYLIN-1011
Commit: b415bbca0058a97a79e4827e8a3e53bbc3d7cf94
Parents: 919cb99
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Wed Sep 23 14:04:50 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Wed Sep 23 14:04:50 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/BuildCubeWithStreamTest.java      |  2 +-
 .../kylin/common/persistence/ResourceStore.java |  1 +
 .../localmeta/kafka/kafka_test.json             | 20 +++++
 .../kafka/test_streaming_table_cube.json        | 22 +++++
 .../kafka/test_streaming_table_ii.json          | 22 +++++
 .../streaming/test_streaming_table_cube.json    | 18 +---
 .../streaming/test_streaming_table_ii.json      | 18 +---
 .../kylin/source/kafka/KafkaConfigManager.java  | 94 ++------------------
 .../kylin/source/kafka/KafkaStreamingInput.java |  2 +-
 9 files changed, 78 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
index 6bfd560..560035a 100644
--- a/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
+++ b/assembly/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
@@ -88,7 +88,7 @@ public class BuildCubeWithStreamTest {
         final StreamingConfig config = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(streamingName);
 
         //Use a random topic for kafka data stream
-        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getStreamingConfig(streamingName);
+        KafkaConfig streamingConfig = KafkaConfigManager.getInstance(kylinConfig).getKafkaConfig(streamingName);
         streamingConfig.setTopic(UUID.randomUUID().toString());
         KafkaConfigManager.getInstance(kylinConfig).saveStreamingConfig(streamingConfig);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 29e2345..89100a2 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -54,6 +54,7 @@ abstract public class ResourceStore {
     public static final String TABLE_RESOURCE_ROOT = "/table";
     public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
     public static final String STREAMING_RESOURCE_ROOT = "/streaming";
+    public static final String KAfKA_RESOURCE_ROOT = "/kafka";
     public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
     public static final String CUBE_STATISTICS_ROOT = "/cube_statistics";
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/examples/test_case_data/localmeta/kafka/kafka_test.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/kafka_test.json b/examples/test_case_data/localmeta/kafka/kafka_test.json
new file mode 100644
index 0000000..5445417
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/kafka_test.json
@@ -0,0 +1,20 @@
+{
+  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec6c3c0d",
+  "name": "kafka_test",
+  "topic": "kafka_stream_test",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox.hortonworks.com",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
new file mode 100644
index 0000000..5fae898
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/test_streaming_table_cube.json
@@ -0,0 +1,22 @@
+{
+  "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
+  "name": "test_streaming_table_cube",
+  "topic": "test_streaming_table_topic_xyz",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "parserName": "org.apache.kylin.source.kafka.TimedJsonStreamParser",
+  "partition": 1,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
new file mode 100644
index 0000000..9e36201
--- /dev/null
+++ b/examples/test_case_data/localmeta/kafka/test_streaming_table_ii.json
@@ -0,0 +1,22 @@
+{
+  "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
+  "name": "test_streaming_table_ii",
+  "topic": "test_streaming_table_topic_xyz",
+  "timeout": 60000,
+  "maxReadCount": 1000,
+  "bufferSize": 65536,
+  "parserName": "org.apache.kylin.source.kafka.JsonStreamParser",
+  "partition": 1,
+  "last_modified": 0,
+  "clusters": [
+    {
+      "brokers": [
+        {
+          "id": 0,
+          "host": "sandbox",
+          "port": 6667
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
index 98b4218..b358183 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table_cube.json
@@ -1,23 +1,7 @@
 {
   "uuid": "8b2b9dfe-777c-4d39-bf89-8472ec909193",
   "name": "test_streaming_table_cube",
-  "topic": "test_streaming_table_topic_xyz",
-  "timeout": 60000,
-  "maxReadCount": 1000,
-  "bufferSize": 65536,
   "cubeName": "test_streaming_table_cube",
-  "parserName": "org.apache.kylin.streaming.TimedJsonStreamParser",
   "partition": 1,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox",
-          "port": 6667
-        }
-      ]
-    }
-  ]
+  "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
index e70153e..da1a765 100644
--- a/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
+++ b/examples/test_case_data/localmeta/streaming/test_streaming_table_ii.json
@@ -1,23 +1,7 @@
 {
   "uuid": "8b2b9dfe-900c-4d39-bf89-8472ec909322",
   "name": "test_streaming_table_ii",
-  "topic": "test_streaming_table_topic_xyz",
-  "timeout": 60000,
-  "maxReadCount": 1000,
-  "bufferSize": 65536,
   "iiName": "test_streaming_table_ii",
-  "parserName": "org.apache.kylin.streaming.JsonStreamParser",
   "partition": 1,
-  "last_modified": 0,
-  "clusters": [
-    {
-      "brokers": [
-        {
-          "id": 0,
-          "host": "sandbox",
-          "port": 6667
-        }
-      ]
-    }
-  ]
+  "last_modified": 0
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
index c84035f..be07290 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaConfigManager.java
@@ -34,18 +34,10 @@
 
 package org.apache.kylin.source.kafka;
 
-import java.io.BufferedReader;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
@@ -53,12 +45,9 @@ import org.apache.kylin.source.kafka.config.KafkaConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.MapType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  */
@@ -104,15 +93,7 @@ public class KafkaConfigManager {
     }
 
     private String formatStreamingConfigPath(String name) {
-        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, int partition) {
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition
+ ".json";
-    }
-
-    private String formatStreamingOutputPath(String streaming, List<Integer> partitions)
{
-        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + StringUtils.join(partitions,
"_") + ".json";
+        return ResourceStore.KAfKA_RESOURCE_ROOT + "/" + name + ".json";
     }
 
     public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
@@ -125,7 +106,7 @@ public class KafkaConfigManager {
         }
     }
 
-    public KafkaConfig getStreamingConfig(String name) {
+    public KafkaConfig getKafkaConfig(String name) {
         try {
             return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class,
KafkaConfig.SERIALIZER);
         } catch (IOException e) {
@@ -143,63 +124,6 @@ public class KafkaConfigManager {
         getStore().putResource(path, kafkaConfig, KafkaConfig.SERIALIZER);
     }
 
-    public long getOffset(String streaming, int shard) {
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        try {
-            final InputStream inputStream = getStore().getResource(resPath);
-            if (inputStream == null) {
-                return 0;
-            } else {
-                final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
-                return Long.parseLong(br.readLine());
-            }
-        } catch (Exception e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        }
-    }
-
-    public void updateOffset(String streaming, int shard, long offset) {
-        Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
-        final String resPath = formatStreamingOutputPath(streaming, shard);
-        try {
-            getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()),
getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
-    public Map<Integer, Long> getOffset(String streaming, List<Integer> partitions)
{
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        try {
-            final InputStream inputStream = getStore().getResource(resPath);
-            if (inputStream == null) {
-                return Collections.emptyMap();
-            }
-            final HashMap<Integer, Long> result = mapper.readValue(inputStream, mapType);
-            return result;
-        } catch (IOException e) {
-            logger.error("error get offset, path:" + resPath, e);
-            throw new RuntimeException("error get offset, path:" + resPath, e);
-        }
-    }
-
-    public void updateOffset(String streaming, HashMap<Integer, Long> offset) {
-        List<Integer> partitions = Lists.newLinkedList(offset.keySet());
-        Collections.sort(partitions);
-        final String resPath = formatStreamingOutputPath(streaming, partitions);
-        try {
-            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-            mapper.writeValue(baos, offset);
-            getStore().putResource(resPath, new ByteArrayInputStream(baos.toByteArray()),
getStore().getResourceTimestamp(resPath));
-        } catch (IOException e) {
-            logger.error("error update offset, path:" + resPath, e);
-            throw new RuntimeException("error update offset, path:" + resPath, e);
-        }
-    }
-
     private final ObjectMapper mapper = new ObjectMapper();
     private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(Integer.class),
SimpleType.construct(Long.class));
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/b415bbca/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 09dee50..9951f86 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -82,7 +82,7 @@ public class KafkaStreamingInput implements IStreamingInput {
             logger.info(String.format("prepare to get streaming batch, name:%s, id:%d, startTime:%d,
endTime:%d", streaming, id, startTime, endTime));
             final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig);
-            final KafkaConfig kafkaConfig = kafkaConfigManager.getStreamingConfig(streaming);
+            final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming);
             final StreamingParser streamingParser = getStreamingParser(kafkaConfig);
             final ExecutorService executorService = Executors.newCachedThreadPool();
             final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList();


Mime
View raw message