kylin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From liy...@apache.org
Subject [02/34] incubator-kylin git commit: implement update offset for one single streaming
Date Fri, 10 Apr 2015 22:47:39 GMT
implement update offset for one single streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/c212109d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/c212109d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/c212109d

Branch: refs/heads/streaming-localdict
Commit: c212109dc86f1a5af5d6c0982e3b8133c45ad7c5
Parents: 6baa620
Author: qianhao.zhou <qianzhou@ebay.com>
Authored: Tue Apr 7 14:53:13 2015 +0800
Committer: qianhao.zhou <qianzhou@ebay.com>
Committed: Tue Apr 7 14:53:13 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/persistence/ResourceStore.java |   3 +-
 .../kylin/job/streaming/StreamingBootstrap.java |   8 +-
 .../apache/kylin/streaming/StreamManager.java   | 123 ----------------
 .../kylin/streaming/StreamingManager.java       | 146 +++++++++++++++++++
 .../invertedindex/IIStreamBuilder.java          |   8 +-
 .../kylin/streaming/KafkaConsumerTest.java      |   2 +-
 .../kylin/streaming/KafkaRequesterTest.java     |   2 +-
 .../kylin/streaming/OneOffStreamProducer.java   |   2 +-
 .../kylin/streaming/StreamManagerTest.java      |  69 ---------
 .../kylin/streaming/StreamingManagerTest.java   |  93 ++++++++++++
 10 files changed, 251 insertions(+), 205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
index 77149b3..96153fd 100644
--- a/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
+++ b/common/src/main/java/org/apache/kylin/common/persistence/ResourceStore.java
@@ -51,7 +51,8 @@ abstract public class ResourceStore {
     public static final String TABLE_EXD_RESOURCE_ROOT = "/table_exd";
     public static final String TABLE_RESOURCE_ROOT = "/table";
     public static final String HYBRID_RESOURCE_ROOT = "/hybrid";
-    public static final String STREAM_RESOURCE_ROOT = "/streaming";
+    public static final String STREAMING_RESOURCE_ROOT = "/streaming";
+    public static final String STREAMING_OUTPUT_RESOURCE_ROOT = "/streaming_output";
 
     private static ConcurrentHashMap<KylinConfig, ResourceStore> CACHE = new ConcurrentHashMap<KylinConfig,
ResourceStore>();
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index ce66d51..3f7665c 100644
--- a/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ b/job/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -67,7 +67,7 @@ public class StreamingBootstrap {
     private static Logger logger = LoggerFactory.getLogger(StreamingBootstrap.class);
 
     private KylinConfig kylinConfig;
-    private StreamManager streamManager;
+    private StreamingManager streamingManager;
     private IIManager iiManager;
 
     private Map<String, KafkaConsumer> kafkaConsumers = Maps.newConcurrentMap();
@@ -85,7 +85,7 @@ public class StreamingBootstrap {
 
     private StreamingBootstrap(KylinConfig kylinConfig) {
         this.kylinConfig = kylinConfig;
-        this.streamManager = StreamManager.getInstance(kylinConfig);
+        this.streamingManager = StreamingManager.getInstance(kylinConfig);
         this.iiManager = IIManager.getInstance(kylinConfig);
     }
 
@@ -105,7 +105,7 @@ public class StreamingBootstrap {
     }
 
     public void start(String streaming, int partitionId) throws Exception {
-        final KafkaConfig kafkaConfig = streamManager.getKafkaConfig(streaming);
+        final KafkaConfig kafkaConfig = streamingManager.getKafkaConfig(streaming);
         Preconditions.checkArgument(kafkaConfig != null, "cannot find kafka config:" + streaming);
         final IIInstance ii = iiManager.getII(kafkaConfig.getIiName());
         Preconditions.checkNotNull(ii, "cannot find ii name:" + kafkaConfig.getIiName());
@@ -119,7 +119,7 @@ public class StreamingBootstrap {
         final Broker leadBroker = getLeadBroker(kafkaConfig, partitionId);
         Preconditions.checkState(leadBroker != null, "cannot find lead broker");
         final long earliestOffset = KafkaRequester.getLastOffset(kafkaConfig.getTopic(),
partitionId, OffsetRequest.EarliestTime(), leadBroker, kafkaConfig);
-        long streamOffset = streamManager.getOffset(streaming, partitionId);
+        long streamOffset = streamingManager.getOffset(streaming, partitionId);
         logger.info("offset from ii desc is " + streamOffset);
         logger.info("offset from KafkaRequester is " + earliestOffset);
         if (streamOffset < earliestOffset) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/main/java/org/apache/kylin/streaming/StreamManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamManager.java
deleted file mode 100644
index 8f717a3..0000000
--- a/streaming/src/main/java/org/apache/kylin/streaming/StreamManager.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Created by qianzhou on 3/25/15.
- */
-public class StreamManager {
-
-    private static final Logger logger = LoggerFactory.getLogger(StreamManager.class);
-
-    // static cached instances
-    private static final ConcurrentHashMap<KylinConfig, StreamManager> CACHE = new
ConcurrentHashMap<KylinConfig, StreamManager>();
-
-    private KylinConfig config;
-
-    private StreamManager(KylinConfig config) {
-        this.config = config;
-    }
-
-    private ResourceStore getStore() {
-        return ResourceStore.getStore(this.config);
-    }
-
-
-    public static StreamManager getInstance(KylinConfig config) {
-        StreamManager r = CACHE.get(config);
-        if (r != null) {
-            return r;
-        }
-
-        synchronized (StreamManager.class) {
-            r = CACHE.get(config);
-            if (r != null) {
-                return r;
-            }
-            r = new StreamManager(config);
-            CACHE.put(config, r);
-            if (CACHE.size() > 1) {
-                logger.warn("More than one cubemanager singleton exist");
-            }
-            return r;
-        }
-    }
-
-    private boolean checkExistence(String name) {
-        return true;
-    }
-
-    private String formatResPath(String name) {
-        return ResourceStore.STREAM_RESOURCE_ROOT + "/" + name + ".json";
-    }
-
-
-    public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
-        try {
-            getStore().putResource(formatResPath(name), config, KafkaConfig.SERIALIZER);
-            return true;
-        } catch (IOException e) {
-            logger.error("error save resource name:" + name, e);
-            return false;
-        }
-    }
-
-    public KafkaConfig getKafkaConfig(String name) {
-        try {
-            return getStore().getResource(formatResPath(name), KafkaConfig.class, KafkaConfig.SERIALIZER);
-        } catch (IOException e) {
-            logger.error("error get resource name:" + name, e);
-            return null;
-        }
-    }
-
-    public long getOffset(String streaming, int partition) {
-        return 0;
-    }
-
-    public void updateOffset(String streaming, int partition, long offset) {
-
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
new file mode 100644
index 0000000..86338b8
--- /dev/null
+++ b/streaming/src/main/java/org/apache/kylin/streaming/StreamingManager.java
@@ -0,0 +1,146 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import com.google.common.base.Preconditions;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created by qianzhou on 3/25/15.
+ */
+public class StreamingManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamingManager.class);
+
+    // static cached instances
+    private static final ConcurrentHashMap<KylinConfig, StreamingManager> CACHE = new
ConcurrentHashMap<KylinConfig, StreamingManager>();
+
+    private KylinConfig config;
+
+    private StreamingManager(KylinConfig config) {
+        this.config = config;
+    }
+
+    private ResourceStore getStore() {
+        return ResourceStore.getStore(this.config);
+    }
+
+
+    public static StreamingManager getInstance(KylinConfig config) {
+        StreamingManager r = CACHE.get(config);
+        if (r != null) {
+            return r;
+        }
+
+        synchronized (StreamingManager.class) {
+            r = CACHE.get(config);
+            if (r != null) {
+                return r;
+            }
+            r = new StreamingManager(config);
+            CACHE.put(config, r);
+            if (CACHE.size() > 1) {
+                logger.warn("More than one cubemanager singleton exist");
+            }
+            return r;
+        }
+    }
+
+    private boolean checkExistence(String name) {
+        return true;
+    }
+
+    private String formatStreamingConfigPath(String name) {
+        return ResourceStore.STREAMING_RESOURCE_ROOT + "/" + name + ".json";
+    }
+
+    private String formatStreamingOutputPath(String streaming, int partition) {
+        return ResourceStore.STREAMING_OUTPUT_RESOURCE_ROOT + "/" + streaming + "_" + partition
+ ".json";
+    }
+
+
+    public boolean createOrUpdateKafkaConfig(String name, KafkaConfig config) {
+        try {
+            getStore().putResource(formatStreamingConfigPath(name), config, KafkaConfig.SERIALIZER);
+            return true;
+        } catch (IOException e) {
+            logger.error("error save resource name:" + name, e);
+            throw new RuntimeException("error save resource name:" + name, e);
+        }
+    }
+
+    public KafkaConfig getKafkaConfig(String name) {
+        try {
+            return getStore().getResource(formatStreamingConfigPath(name), KafkaConfig.class,
KafkaConfig.SERIALIZER);
+        } catch (IOException e) {
+            logger.error("error get resource name:" + name, e);
+            throw new RuntimeException("error get resource name:" + name, e);
+        }
+    }
+
+    public long getOffset(String streaming, int partition) {
+        final String resPath = formatStreamingOutputPath(streaming, partition);
+        try {
+            final InputStream inputStream = getStore().getResource(resPath);
+            if (inputStream == null) {
+                return 0;
+            } else {
+                final BufferedReader br = new BufferedReader(new InputStreamReader(inputStream));
+                return Long.parseLong(br.readLine());
+            }
+        } catch (Exception e) {
+            logger.error("error get offset, path:" + resPath, e);
+            throw new RuntimeException("error get offset, path:" + resPath, e);
+        }
+    }
+
+    public void updateOffset(String streaming, int partition, long offset) {
+        Preconditions.checkArgument(offset >= 0, "offset cannot be smaller than 0");
+        final String resPath = formatStreamingOutputPath(streaming, partition);
+        try {
+            getStore().putResource(resPath, new ByteArrayInputStream(Long.valueOf(offset).toString().getBytes()),
getStore().getResourceTimestamp(resPath));
+        } catch (IOException e) {
+            logger.error("error update offset, path:" + resPath, e);
+            throw new RuntimeException("error update offset, path:" + resPath, e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
----------------------------------------------------------------------
diff --git a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
index e966787..e4b81f1 100644
--- a/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
+++ b/streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
@@ -41,15 +41,13 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.Slice;
 import org.apache.kylin.invertedindex.model.IIDesc;
 import org.apache.kylin.invertedindex.model.IIKeyValueCodec;
 import org.apache.kylin.invertedindex.model.IIRow;
 import org.apache.kylin.streaming.Stream;
 import org.apache.kylin.streaming.StreamBuilder;
-import org.apache.kylin.streaming.StreamManager;
+import org.apache.kylin.streaming.StreamingManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -128,9 +126,9 @@ public class IIStreamBuilder extends StreamBuilder {
     }
 
     private void submitOffset(long offset) {
-        StreamManager streamManager = StreamManager.getInstance(KylinConfig.getInstanceFromEnv());
+        StreamingManager streamingManager = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv());
         try {
-            streamManager.updateOffset(streaming, partitionId, offset);
+            streamingManager.updateOffset(streaming, partitionId, offset);
             logger.info("submit offset:" + offset);
         } catch (Exception e) {
             logger.warn("error submit offset: " + offset + " retrying", e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
index 337dfc7..c0c67a5 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
@@ -65,7 +65,7 @@ public class KafkaConsumerTest extends KafkaBaseTest {
     public void before() throws IOException {
         producer = new OneOffStreamProducer(TOTAL_SEND_COUNT);
         producer.start();
-        kafkaConfig = StreamManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
     }
 
     @After

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java b/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
index 3166914..fb6ed84 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/KafkaRequesterTest.java
@@ -53,7 +53,7 @@ public class KafkaRequesterTest extends KafkaBaseTest {
 
     @Before
     public void before() {
-        kafkaConfig = StreamManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
     }
 
     @AfterClass

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
index 1f45cdb..dcd9642 100644
--- a/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
+++ b/streaming/src/test/java/org/apache/kylin/streaming/OneOffStreamProducer.java
@@ -70,7 +70,7 @@ public class OneOffStreamProducer {
     public void start() throws IOException {
         final Properties properties = new Properties();
         properties.load(ClassLoader.getSystemResourceAsStream("kafka_streaming_test/kafka.properties"));
-        final KafkaConfig kafkaConfig = StreamManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
+        final KafkaConfig kafkaConfig = StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig("kafka_test");
 
         Properties props = new Properties();
         props.put("metadata.broker.list", StringUtils.join(Iterators.transform(kafkaConfig.getBrokers().iterator(),
new Function<Broker, String>() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/test/java/org/apache/kylin/streaming/StreamManagerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/StreamManagerTest.java b/streaming/src/test/java/org/apache/kylin/streaming/StreamManagerTest.java
deleted file mode 100644
index d5280d6..0000000
--- a/streaming/src/test/java/org/apache/kylin/streaming/StreamManagerTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.streaming;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import static junit.framework.TestCase.assertNotNull;
-
-/**
- * Created by qianzhou on 3/25/15.
- */
-public class StreamManagerTest extends LocalFileMetadataTestCase {
-
-    private KylinConfig kylinConfig;
-    private StreamManager streamManager;
-
-    @Before
-    public void before() {
-        this.createTestMetadata();
-        kylinConfig = KylinConfig.getInstanceFromEnv();
-        streamManager = StreamManager.getInstance(kylinConfig);
-    }
-
-    @After
-    public void after() {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void test() {
-        assertNotNull(streamManager.getKafkaConfig("kafka_test"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/c212109d/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
new file mode 100644
index 0000000..b58dbfd
--- /dev/null
+++ b/streaming/src/test/java/org/apache/kylin/streaming/StreamingManagerTest.java
@@ -0,0 +1,93 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.streaming;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static org.junit.Assert.fail;
+
+/**
+ * Created by qianzhou on 3/25/15.
+ */
+public class StreamingManagerTest extends LocalFileMetadataTestCase {
+
+    private KylinConfig kylinConfig;
+    private StreamingManager streamingManager;
+
+    @Before
+    public void before() {
+        this.createTestMetadata();
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        streamingManager = StreamingManager.getInstance(kylinConfig);
+    }
+
+    @After
+    public void after() {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void test() {
+        assertNotNull(streamingManager.getKafkaConfig("kafka_test"));
+    }
+
+    @Test
+    public void testOffset() {
+        final String streaming = "kafka_test";
+        final int partition = 0;
+        assertEquals(0, streamingManager.getOffset(streaming, partition));
+
+        try {
+            updateOffsetAndCompare(streaming, partition, -1);
+            fail("offset cannot be smaller than 0");
+        } catch (IllegalArgumentException e) {
+        }
+        updateOffsetAndCompare(streaming, partition, 1000);
+        updateOffsetAndCompare(streaming, partition, 800);
+        updateOffsetAndCompare(streaming, partition, 2000);
+
+    }
+
+    private void updateOffsetAndCompare(String streaming, int partition, long offset) {
+        streamingManager.updateOffset(streaming, partition, offset);
+        assertEquals(offset, streamingManager.getOffset(streaming, partition));
+    }
+}


Mime
View raw message