tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1480: Kafka Consumer for kafka strage.
Date Sun, 28 Aug 2016 06:39:28 GMT
Repository: tajo
Updated Branches:
  refs/heads/master fdd2ca208 -> d5ffbe645


TAJO-1480: Kafka Consumer for kafka strage.

Closes #1041

Signed-off-by: Jinho Kim <jhkim@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d5ffbe64
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d5ffbe64
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d5ffbe64

Branch: refs/heads/master
Commit: d5ffbe645c1fe47e24b3ffe2687cecdfc1ada1b4
Parents: fdd2ca2
Author: Byunghwa Yun <combine@combineads.co.kr>
Authored: Sun Aug 28 15:38:09 2016 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Sun Aug 28 15:38:09 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 tajo-project/pom.xml                            |   1 +
 tajo-storage/pom.xml                            |   2 +
 tajo-storage/tajo-storage-kafka/pom.xml         | 195 +++++++++++++++++++
 .../storage/kafka/SimpleConsumerManager.java    | 184 +++++++++++++++++
 .../tajo/storage/kafka/KafkaTestUtil.java       |  56 ++++++
 .../kafka/TestSimpleConsumerManager.java        | 107 ++++++++++
 .../storage/kafka/server/EmbeddedKafka.java     | 126 ++++++++++++
 .../storage/kafka/server/EmbeddedZookeeper.java |  84 ++++++++
 9 files changed, 758 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9428f13..ec9320e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -406,6 +406,9 @@ Release 0.12.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1480: Kafka Consumer for kafka strage. 
+    (Contributed by Byunghwa Yun. Committed by jinho)
+
     TAJO-2156: Create GeoIP functions taking various types instead of INET4 type. (Jongyoung)
 
     TAJO-2130: Remove inet4 data type.

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-project/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml
index 283bdc3..075e6e7 100644
--- a/tajo-project/pom.xml
+++ b/tajo-project/pom.xml
@@ -40,6 +40,7 @@
     <jersey.version>2.6</jersey.version>
     <jetty.version>6.1.26</jetty.version>
     <parquet.version>1.8.1</parquet.version>
+    <kafka.version>0.10.0.0</kafka.version>
     <tajo.root>${project.parent.relativePath}/..</tajo.root>
     <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path>
   </properties>

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml
index 4881e2c..396484f 100644
--- a/tajo-storage/pom.xml
+++ b/tajo-storage/pom.xml
@@ -40,6 +40,7 @@
     <module>tajo-storage-jdbc</module>
     <module>tajo-storage-pgsql</module>
     <module>tajo-storage-s3</module>
+    <module>tajo-storage-kafka</module>
   </modules>
 
   <build>
@@ -137,6 +138,7 @@
                       run cp -r ${basedir}/tajo-storage-hdfs/target/tajo-storage-hdfs-${project.version}*.jar
.
                       run cp -r ${basedir}/tajo-storage-hbase/target/tajo-storage-hbase-${project.version}*.jar
.
                       run cp -r ${basedir}/tajo-storage-s3/target/tajo-storage-s3-${project.version}*.jar
.
+                      run cp -r ${basedir}/tajo-storage-kafka/target/tajo-storage-kafka-${project.version}*.jar
.
 
                       echo
                       echo "Tajo Storage dist layout available at: ${project.build.directory}/tajo-storage-${project.version}"

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/tajo-storage-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/pom.xml b/tajo-storage/tajo-storage-kafka/pom.xml
new file mode 100644
index 0000000..136af4a
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/pom.xml
@@ -0,0 +1,195 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>tajo-project</artifactId>
+    <groupId>org.apache.tajo</groupId>
+    <version>0.12.0-SNAPSHOT</version>
+    <relativePath>../../tajo-project</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>tajo-storage-kafka</artifactId>
+  <packaging>jar</packaging>
+  <name>Tajo Kafka Storage</name>
+  <properties>
+    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+  </properties>
+
+  <repositories>
+    <repository>
+      <id>repository.jboss.org</id>
+      <url>https://repository.jboss.org/nexus/content/repositories/releases/
+      </url>
+      <snapshots>
+        <enabled>false</enabled>
+      </snapshots>
+    </repository>
+  </repositories>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.rat</groupId>
+        <artifactId>apache-rat-plugin</artifactId>
+        <configuration>
+          <excludes>
+            <exclude>src/main/resources/*.json</exclude>
+            <exclude>src/test/resources/*.json</exclude>
+          </excludes>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemProperties>
+            <tajo.test.enabled>TRUE</tajo.test.enabled>
+          </systemProperties>
+          <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8 -Dderby.storage.pageSize=1024
-Dderby.stream.error.file=/dev/null</argLine>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-catalog-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-plan</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.tajo</groupId>
+      <artifactId>tajo-storage-hdfs</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+      <version>${kafka.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+      <version>${kafka.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>3.4.6</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.airlift</groupId>
+      <artifactId>testing</artifactId>
+      <version>0.88</version>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <id>docs</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-javadoc-plugin</artifactId>
+            <executions>
+              <execution>
+                <!-- build javadoc jars per jar for publishing to maven -->
+                <id>module-javadocs</id>
+                <phase>package</phase>
+                <goals>
+                  <goal>jar</goal>
+                </goals>
+                <configuration>
+                  <destDir>${project.build.directory}</destDir>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
+  <reporting>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-report-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </reporting>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
new file mode 100644
index 0000000..dd44ea7
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/main/java/org/apache/tajo/storage/kafka/SimpleConsumerManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * SimpleConsumerManager is kafka client for KafkaScanner.
+ * It's one per partition. Each partition instantiate this class.
+ */
+public class SimpleConsumerManager implements Closeable {
+  private KafkaConsumer<byte[], byte[]> consumer = null;
+  private TopicPartition partition;
+
+  /**
+   * Create SimpleConsumer instance.
+   *
+   * @param uri Kafka Tablespace URI. ex) kafka://localhost:9092,localhost:9091
+   * @param topic topic name
+   * @param partitionId partition id
+   */
+  public SimpleConsumerManager(URI uri, String topic, int partitionId) {
+    this(uri, topic, partitionId, Integer.MAX_VALUE);
+  }
+
+  /**
+   * Create SimpleConsumer instance.
+   *
+   * @param uri Kafka Tablespace URI. ex) kafka://localhost:9092,localhost:9091
+   * @param topic topic name
+   * @param partitionId partition id
+   * @param fragmentSize max polling size of kafka
+   */
+  public SimpleConsumerManager(URI uri, String topic, int partitionId, int fragmentSize)
{
+    String clientId = SimpleConsumerManager.createIdentifier("TCons");
+    Properties props = getDefaultProperties(uri, clientId, fragmentSize);
+
+    partition = new TopicPartition(topic, partitionId);
+    consumer = new KafkaConsumer<>(props);
+    consumer.assign(Collections.singletonList(partition));
+  }
+
+  /**
+   * Close consumer.
+   */
+  @Override
+  public void close() {
+    if (consumer != null) {
+      consumer.close();
+    }
+    consumer = null;
+  }
+
+  /**
+   * Get the earliest offset.
+   *
+   * @return the earliest offset
+   */
+  public long getEarliestOffset() {
+    long currentPosition = consumer.position(partition);
+    consumer.seekToBeginning(Collections.singletonList(partition));
+    long earliestPosition = consumer.position(partition);
+    consumer.seek(partition, currentPosition);
+    return earliestPosition;
+  }
+
+  /**
+   * Get the latest offset.
+   *
+   * @return the latest offset
+   */
+  public long getLatestOffset() {
+    long currentPosition = consumer.position(partition);
+    consumer.seekToEnd(Collections.singletonList(partition));
+    long latestPosition = consumer.position(partition);
+    consumer.seek(partition, currentPosition);
+    return latestPosition;
+  }
+
+  /**
+   * Poll data from kafka.
+   *
+   * @param offset position of partition to seek.
+   * @param timeout polling timeout.
+   * @return records of topic.
+   */
+  public ConsumerRecords<byte[], byte[]> poll(long offset, long timeout) {
+    consumer.seek(partition, offset);
+
+    return consumer.poll(timeout);
+  }
+
+  /**
+   * Return partition information list of specific topic.
+   *
+   * @param uri Kafka Tablespace URI
+   * @param topic
+   * @return
+   * @throws IOException
+   */
+  static List<PartitionInfo> getPartitions(URI uri, String topic) throws IOException
{
+    String clientId = SimpleConsumerManager.createIdentifier("TPart");
+    Properties props = getDefaultProperties(uri, clientId, Integer.MAX_VALUE);
+    try (KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props))
{
+      return consumer.partitionsFor(topic);
+    }
+  }
+
+  /**
+   * It extracts broker addresses from a kafka Tablespace URI.
+   * For example, consider an example URI 'kafka://host1:9092,host2:9092,host3:9092'.
+   * <code>extractBroker</code> will extract only 'host1:9092,host2:9092,host3:9092'.
+   *
+   * @param uri Kafka Tablespace URI
+   * @return Broker addresses
+   */
+  static String extractBroker(URI uri) {
+    String uriStr = uri.toString();
+    int start = uriStr.indexOf("/") + 2;
+
+    return uriStr.substring(start);
+  }
+
+  /**
+   * Gets the default properties.
+   *
+   * @param uri kafka broker URIs
+   * @param clientId
+   * @param fragmentSize
+   * @return the default properties
+   */
+  private static Properties getDefaultProperties(URI uri, String clientId, int fragmentSize)
{
+    Properties props = new Properties();
+    props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, extractBroker(uri));
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, fragmentSize);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    return props;
+  }
+
+  /**
+   * Create identifier for SimpleConsumer.
+   * The SimpleConsumer connects at kafka using this identifier.
+   *
+   * @param prefix
+   * @return
+   */
+  private static String createIdentifier(String prefix) {
+    Random r = new Random();
+    return prefix + "_" + r.nextInt(1000000) + "_" + System.currentTimeMillis();
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/KafkaTestUtil.java
b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/KafkaTestUtil.java
new file mode 100644
index 0000000..160ebfe
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/KafkaTestUtil.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Collection;
+
+public class KafkaTestUtil {
+  private static final String[] TEST_DATA = { "1|abc|0.2", "2|def|0.4", "3|ghi|0.6", "4|jkl|0.8",
"5|mno|1.0" };
+
+  static final String TOPIC_NAME = "test_topic";
+  static final String TEST_JSON_DATA = "{\"col1\":1, \"col2\":\"abc\", \"col3\":0.2}";
+  static final int DEFAULT_TEST_PARTITION_NUM = 3;
+
+  static void sendTestData(Producer<String, String> producer, String topic) throws
Exception {
+    for (int i = 0; i < TEST_DATA.length; i++) {
+      sendTestData(producer, topic, TEST_DATA[i]);
+    }
+  }
+
+  static void sendTestData(Producer<String, String> producer, String topic, String
data) throws Exception {
+    producer.send(new ProducerRecord<String, String>(topic, data));
+  }
+
+  static boolean equalTestData(Collection<String> receivedDataSet) {
+    if (receivedDataSet.size() != TEST_DATA.length) {
+      return false;
+    }
+
+    for (String td : TEST_DATA) {
+      if (!receivedDataSet.contains(td)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
new file mode 100644
index 0000000..d780ac4
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/TestSimpleConsumerManager.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka;
+
+import static org.apache.tajo.storage.kafka.KafkaTestUtil.TOPIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.tajo.storage.kafka.server.EmbeddedKafka;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestSimpleConsumerManager {
+
+  private static EmbeddedKafka KAFKA;
+
+  private static URI KAFKA_SERVER_URI;
+
+  /**
+   * Start up EmbeddedKafka and Generate test data.
+   *
+   * @throws Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    KAFKA = EmbeddedKafka.createEmbeddedKafka(2181, 9092);
+    KAFKA.start();
+    KAFKA.createTopic(KafkaTestUtil.DEFAULT_TEST_PARTITION_NUM, 1, TOPIC_NAME);
+    KAFKA_SERVER_URI = URI.create("kafka://" + KAFKA.getConnectString());
+
+    // Load test data.
+    try (Producer<String, String> producer = KAFKA.createProducer(KAFKA.getConnectString()))
{
+      KafkaTestUtil.sendTestData(producer, TOPIC_NAME);
+    }
+  }
+
+  /**
+   * Close EmbeddedKafka.
+   *
+   * @throws Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    KAFKA.close();
+  }
+
+  @Test
+  public void testExtractBroker() {
+    assertEquals("host1:9092,host2:9092,host3:9092",
+        SimpleConsumerManager.extractBroker(URI.create("kafka://host1:9092,host2:9092,host3:9092")));
+  }
+
+  /**
+   * Test for getting topic partitions.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testGetPartitions() throws Exception {
+    int prtition_num = SimpleConsumerManager.getPartitions(KAFKA_SERVER_URI, TOPIC_NAME).size();
+    assertTrue(prtition_num == KafkaTestUtil.DEFAULT_TEST_PARTITION_NUM);
+  }
+
+  // Test for to fetch data from kafka.
+  @Test
+  public void testFetchData() throws Exception {
+    Set<String> receivedDataSet = new HashSet<>();
+    for (PartitionInfo partitionInfo : SimpleConsumerManager.getPartitions(KAFKA_SERVER_URI,
TOPIC_NAME)) {
+      int partitionId = partitionInfo.partition();
+      try (SimpleConsumerManager cm = new SimpleConsumerManager(KAFKA_SERVER_URI, TOPIC_NAME,
partitionId)) {
+        long startOffset = cm.getEarliestOffset();
+        long lastOffset = cm.getLatestOffset();
+        if (startOffset < lastOffset) {
+          for (ConsumerRecord<byte[], byte[]> message : cm.poll(startOffset, Long.MAX_VALUE))
{
+            receivedDataSet.add(new String(message.value(), "UTF-8"));
+          }
+        }
+      }
+    }
+
+    KafkaTestUtil.equalTestData(receivedDataSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedKafka.java
b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedKafka.java
new file mode 100644
index 0000000..90c06e1
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedKafka.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka.server;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static io.airlift.testing.FileUtils.deleteRecursively;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.io.Files;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServerStartable;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+public class EmbeddedKafka implements Closeable {
+  private final EmbeddedZookeeper zookeeper;
+  private final int port;
+  private final File kafkaDataDir;
+  private final KafkaServerStartable kafka;
+
+  private final AtomicBoolean started = new AtomicBoolean();
+
+  public static EmbeddedKafka createEmbeddedKafka(int zookeeperPort, int kafkaPort) throws
IOException {
+    return new EmbeddedKafka(new EmbeddedZookeeper(zookeeperPort), kafkaPort);
+  }
+
+  EmbeddedKafka(EmbeddedZookeeper zookeeper, int kafkaPort) throws IOException {
+    this.zookeeper = checkNotNull(zookeeper, "zookeeper is null");
+
+    this.port = kafkaPort;
+    this.kafkaDataDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty("broker.id", "0");
+    properties.setProperty("host.name", "localhost");
+    properties.setProperty("num.partitions", "2");
+    properties.setProperty("log.flush.interval.messages", "10000");
+    properties.setProperty("log.flush.interval.ms", "1000");
+    properties.setProperty("log.retention.minutes", "60");
+    properties.setProperty("log.segment.bytes", "1048576");
+    properties.setProperty("auto.create.topics.enable", "false");
+    properties.setProperty("zookeeper.connection.timeout.ms", "1000000");
+    properties.setProperty("port", Integer.toString(port));
+    properties.setProperty("log.dirs", kafkaDataDir.getAbsolutePath());
+    properties.setProperty("zookeeper.connect", zookeeper.getConnectString());
+
+    KafkaConfig config = new KafkaConfig(properties);
+    this.kafka = new KafkaServerStartable(config);
+  }
+
+  public void start() throws InterruptedException, IOException {
+    if (!started.getAndSet(true)) {
+      zookeeper.start();
+      kafka.startup();
+    }
+  }
+
+  @Override
+  public void close() {
+    if (started.get()) {
+      kafka.shutdown();
+      kafka.awaitShutdown();
+      zookeeper.close();
+      deleteRecursively(kafkaDataDir);
+    }
+  }
+
+  public String getConnectString() {
+    return kafka.serverConfig().hostName() + ":" + kafka.serverConfig().port();
+  }
+
+  public String getZookeeperConnectString() {
+    return zookeeper.getConnectString();
+  }
+
+  public void createTopic(int partitions, int replication, String topic) {
+    checkState(started.get(), "not started!");
+
+    ZkClient zkClient = new ZkClient(getZookeeperConnectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
+    try {
+      AdminUtils.createTopic(ZkUtils.apply(zkClient, false), topic, partitions, replication,
new Properties(),
+          RackAwareMode.Enforced$.MODULE$);
+    } finally {
+      zkClient.close();
+    }
+  }
+
+  public Producer<String, String> createProducer(String connecting) {
+    Properties properties = new Properties();
+    properties.put("key.serializer", StringSerializer.class);
+    properties.put("value.serializer", StringSerializer.class);
+    properties.put("bootstrap.servers", connecting);
+    Producer<String, String> producer = new KafkaProducer<String, String>(properties);
+    return producer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/d5ffbe64/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedZookeeper.java
b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedZookeeper.java
new file mode 100644
index 0000000..892b89f
--- /dev/null
+++ b/tajo-storage/tajo-storage-kafka/src/test/java/org/apache/tajo/storage/kafka/server/EmbeddedZookeeper.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.kafka.server;
+
+import static io.airlift.testing.FileUtils.deleteRecursively;
+
+import org.apache.tajo.util.NetUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.io.Files;
+
+public class EmbeddedZookeeper implements Closeable {
+  private final File zkDataDir;
+  private final ZooKeeperServer zkServer;
+  private final NIOServerCnxnFactory cnxnFactory;
+
+  private final AtomicBoolean started = new AtomicBoolean();
+
+  public EmbeddedZookeeper() throws IOException {
+    this(2181);
+  }
+
+  public EmbeddedZookeeper(int port) throws IOException {
+    zkDataDir = Files.createTempDir();
+    zkServer = new ZooKeeperServer();
+
+    FileTxnSnapLog ftxn = new FileTxnSnapLog(zkDataDir, zkDataDir);
+    zkServer.setTxnLogFactory(ftxn);
+
+    cnxnFactory = new NIOServerCnxnFactory();
+    cnxnFactory.configure(new InetSocketAddress(port), 0);
+  }
+
+  public void start() throws InterruptedException, IOException {
+    if (!started.getAndSet(true)) {
+      cnxnFactory.startup(zkServer);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (started.get()) {
+      cnxnFactory.shutdown();
+      try {
+        cnxnFactory.join();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+
+      if (zkServer.isRunning()) {
+        zkServer.shutdown();
+      }
+      deleteRecursively(zkDataDir);
+    }
+  }
+
+  public String getConnectString() {
+    return NetUtils.normalizeInetSocketAddress(cnxnFactory.getLocalAddress());
+  }
+}


Mime
View raw message