ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [36/48] ignite git commit: IGNITE-2016: Kafka Connect integration - reflected review comments (avoiding setting same task parameters more than once). - Fixes #335.
Date Mon, 01 Feb 2016 14:27:31 GMT
IGNITE-2016: Kafka Connect integration - reflected review comments (avoiding setting same task parameters more than once). - Fixes #335.

Signed-off-by: shtykh_roman <rshtykh@yahoo.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c92c2747
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c92c2747
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c92c2747

Branch: refs/heads/ignite-2454
Commit: c92c274737391546b3dfe3ccbe527329c462d95f
Parents: 861236a
Author: shtykh_roman <rshtykh@yahoo.com>
Authored: Mon Feb 1 10:35:02 2016 +0900
Committer: shtykh_roman <rshtykh@yahoo.com>
Committed: Mon Feb 1 10:35:02 2016 +0900

----------------------------------------------------------------------
 modules/kafka/README.txt                        | 111 +++++-
 modules/kafka/pom.xml                           |  69 ++--
 .../ignite/stream/kafka/KafkaStreamer.java      |   2 +-
 .../kafka/connect/IgniteSinkConnector.java      |  91 +++++
 .../kafka/connect/IgniteSinkConstants.java      |  38 ++
 .../stream/kafka/connect/IgniteSinkTask.java    | 165 ++++++++
 .../kafka/IgniteKafkaStreamerSelfTestSuite.java |   9 +-
 .../stream/kafka/KafkaEmbeddedBroker.java       | 387 -------------------
 .../kafka/KafkaIgniteStreamerSelfTest.java      |  13 +-
 .../ignite/stream/kafka/SimplePartitioner.java  |  53 ---
 .../ignite/stream/kafka/TestKafkaBroker.java    | 237 ++++++++++++
 .../kafka/connect/IgniteSinkConnectorTest.java  | 250 ++++++++++++
 .../kafka/src/test/resources/example-ignite.xml |  71 ++++
 parent/pom.xml                                  |   9 +-
 14 files changed, 1011 insertions(+), 494 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index 1eaf861..f4e56bd 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -1,16 +1,17 @@
 Apache Ignite Kafka Streamer Module
-------------------------
+-----------------------------------
 
 Apache Ignite Kafka Streamer module provides streaming from Kafka to Ignite cache.
 
-To enable Kafka Streamer module when starting a standalone node, move 'optional/ignite-Kafka' folder to
-'libs' folder before running 'ignite.{sh|bat}' script. The content of the module folder will
-be added to classpath in this case.
+There are two ways this can be achieved:
+- importing Kafka Streamer module in your Maven project and instantiate KafkaStreamer for data streaming;
+- using Kafka Connect functionality.
 
-Importing Ignite Kafka Streamer Module In Maven Project
--------------------------------------
+Below are the details.
 
-If you are using Maven to manage dependencies of your project, you can add JCL module
+## Importing Ignite Kafka Streamer Module In Maven Project
+
+If you are using Maven to manage dependencies of your project, you can add Kafka module
 dependency like this (replace '${ignite.version}' with actual Ignite version you are
 interested in):
 
@@ -30,3 +31,99 @@ interested in):
     </dependencies>
     ...
 </project>
+
+
+## Streaming Data via Kafka Connect
+
+Sink Connector will help you export data from Kafka to Ignite cache. It polls data from Kafka topics and writes it to the user-specified cache.
+For more information on Kafka Connect, see [Kafka Documentation](http://kafka.apache.org/documentation.html#connect).
+
+Connector can be found in 'optional/ignite-kafka.' It and its dependencies have to be on the classpath of a Kafka running instance,
+as described in the following subsection.
+
+### Setting up and Running
+
+1. Put the following jar files on Kafka's classpath
+- ignite-kafka-connect-x.x.x-SNAPSHOT.jar
+- ignite-core-x.x.x-SNAPSHOT.jar
+- cache-api-1.0.0.jar
+- ignite-spring-1.5.0-SNAPSHOT.jar
+- spring-aop-4.1.0.RELEASE.jar
+- spring-beans-4.1.0.RELEASE.jar
+- spring-context-4.1.0.RELEASE.jar
+- spring-core-4.1.0.RELEASE.jar
+- spring-expression-4.1.0.RELEASE.jar
+- commons-logging-1.1.1.jar
+
+2. Prepare worker configurations, e.g.,
+```
+bootstrap.servers=localhost:9092
+
+key.converter=org.apache.kafka.connect.storage.StringConverter
+value.converter=org.apache.kafka.connect.storage.StringConverter
+key.converter.schemas.enable=false
+value.converter.schemas.enable=false
+
+internal.key.converter=org.apache.kafka.connect.storage.StringConverter
+internal.value.converter=org.apache.kafka.connect.storage.StringConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
+
+offset.storage.file.filename=/tmp/connect.offsets
+offset.flush.interval.ms=10000
+```
+
+3. Prepare connector configurations, e.g.,
+```
+# connector
+name=string-ignite-connector
+connector.class=IgniteSinkConnector
+tasks.max=2
+topics=testTopic1,testTopic2
+
+# cache
+cacheName=cache1
+cacheAllowOverwrite=true
+igniteCfg=/some-path/ignite.xml
+```
+where 'cacheName' is the name of the cache you specify in '/some-path/ignite.xml' and the data from 'testTopic1,testTopic2'
+will be pulled and stored. 'cacheAllowOverwrite' is set to true if you want to enable overwriting existing values in cache.
+You can also set 'cachePerNodeDataSize' and 'cachePerNodeParOps' to adjust per-node buffer and the maximum number
+of parallel stream operations for a single node.
+
+See example-ignite.xml in tests for a simple cache configuration file example.
+
+4. Start connector, for instance, as follows,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+## Checking the Flow
+
+To perform a very basic functionality check, you can do the following,
+
+1. Start Zookeeper
+```
+bin/zookeeper-server-start.sh config/zookeeper.properties
+```
+
+2. Start Kafka server
+```
+bin/kafka-server-start.sh config/server.properties
+```
+
+3. Provide some data input to the Kafka server
+```
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --operty key.separator=,
+k1,v1
+```
+
+4. Start the connector. For example,
+```
+./bin/connect-standalone.sh myconfig/connect-standalone.properties myconfig/ignite-connector.properties
+```
+
+5. Check the value is in the cache. For example, via REST,
+```
+http://node1:8080/ignite?cmd=size&cacheName=cache1
+```

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/pom.xml b/modules/kafka/pom.xml
index e00b190..0ac0487 100644
--- a/modules/kafka/pom.xml
+++ b/modules/kafka/pom.xml
@@ -20,7 +20,8 @@
 <!--
     POM file.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -43,48 +44,28 @@
 
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
+            <artifactId>kafka_2.11</artifactId>
             <version>${kafka.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>com.sun.jmx</groupId>
-                    <artifactId>jmxri</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.sun.jdmk</groupId>
-                    <artifactId>jmxtools</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>net.sf.jopt-simple</groupId>
-                    <artifactId>jopt-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.zookeeper</groupId>
-            <artifactId>zookeeper</artifactId>
-            <version>${zookeeper.version}</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-api</artifactId>
+            <version>${kafka.version}</version>
         </dependency>
 
         <dependency>
-            <groupId>org.apache.ignite</groupId>
-            <artifactId>ignite-log4j</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>connect-runtime</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
-            <groupId>org.ow2.asm</groupId>
-            <artifactId>asm-all</artifactId>
-            <version>${asm.version}</version>
+            <groupId>org.apache.curator</groupId>
+            <artifactId>curator-test</artifactId>
+            <version>${curator.version}</version>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
@@ -96,11 +77,33 @@
 
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>${easymock.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
             <version>${project.version}</version>
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
index cbc5b1b..487c369 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/KafkaStreamer.java
@@ -224,4 +224,4 @@ public class KafkaStreamer<T, K, V> extends StreamAdapter<T, K, V> {
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
new file mode 100644
index 0000000..9385920
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnector.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+/**
+ * Sink connector to manage sink tasks that transfer Kafka data to Ignite grid.
+ */
+public class IgniteSinkConnector extends SinkConnector {
+    /** Sink properties. */
+    private Map<String, String> configProps;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return AppInfoParser.getVersion();
+    }
+
+    /**
+     * A sink lifecycle method. Validates grid-specific sink properties.
+     *
+     * @param props Sink properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        configProps = props;
+
+        try {
+            A.notNullOrEmpty(configProps.get(SinkConnector.TOPICS_CONFIG), "topics");
+            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_NAME), "cache name");
+            A.notNullOrEmpty(configProps.get(IgniteSinkConstants.CACHE_CFG_PATH), "path to cache config file");
+        }
+        catch (IllegalArgumentException e) {
+            throw new ConnectException("Cannot start IgniteSinkConnector due to configuration error", e);
+        }
+    }
+
+    /**
+     * Obtains a sink task class to be instantiated for feeding data into grid.
+     *
+     * @return IgniteSinkTask class.
+     */
+    @Override public Class<? extends Task> taskClass() {
+        return IgniteSinkTask.class;
+    }
+
+    /**
+     * Builds each config for <tt>maxTasks</tt> tasks.
+     *
+     * @param maxTasks Max number of tasks.
+     * @return Task configs.
+     */
+    @Override public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> taskConfigs = new ArrayList<>();
+        Map<String, String> taskProps = new HashMap<>();
+
+        taskProps.putAll(configProps);
+
+        for (int i = 0; i < maxTasks; i++)
+            taskConfigs.add(taskProps);
+
+        return taskConfigs;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
new file mode 100644
index 0000000..7680d96
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConstants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+/**
+ * Sink configuration strings.
+ */
+public class IgniteSinkConstants {
+    /** Ignite configuration file path. */
+    public static final String CACHE_CFG_PATH = "igniteCfg";
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cacheName";
+
+    /** Flag to enable overwriting existing values in cache. */
+    public static final String CACHE_ALLOW_OVERWRITE = "cacheAllowOverwrite";
+
+    /** Size of per-node buffer before data is sent to remote node. */
+    public static final String CACHE_PER_NODE_DATA_SIZE = "cachePerNodeDataSize";
+
+    /** Maximum number of parallel stream operations per node. */
+    public static final String CACHE_PER_NODE_PAR_OPS = "cachePerNodeParOps";
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
new file mode 100644
index 0000000..3d9a00d
--- /dev/null
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSinkTask.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.Ignition;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Task to consume sequences of SinkRecords and write data to grid.
+ */
+public class IgniteSinkTask extends SinkTask {
+    /** Logger. */
+    private static final Logger log = LoggerFactory.getLogger(IgniteSinkTask.class);
+
+    /** Flag for stopped state. */
+    private static volatile boolean stopped = true;
+
+    /** Ignite grid configuration file. */
+    private static String igniteConfigFile;
+
+    /** Cache name. */
+    private static String cacheName;
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return new IgniteSinkConnector().version();
+    }
+
+    /**
+     * Initializes grid client from configPath.
+     *
+     * @param props Task properties.
+     */
+    @Override public void start(Map<String, String> props) {
+        // Each task has the same parameters -- avoid setting more than once.
+        if (cacheName != null)
+            return;
+
+        cacheName = props.get(IgniteSinkConstants.CACHE_NAME);
+        igniteConfigFile = props.get(IgniteSinkConstants.CACHE_CFG_PATH);
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE))
+            StreamerContext.getStreamer().allowOverwrite(
+                Boolean.parseBoolean(props.get(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE))
+            StreamerContext.getStreamer().perNodeBufferSize(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_DATA_SIZE)));
+
+        if (props.containsKey(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS))
+            StreamerContext.getStreamer().perNodeParallelOperations(
+                Integer.parseInt(props.get(IgniteSinkConstants.CACHE_PER_NODE_PAR_OPS)));
+
+        stopped = false;
+    }
+
+    /**
+     * Buffers records.
+     *
+     * @param records Records to inject into grid.
+     */
+    @SuppressWarnings("unchecked")
+    @Override public void put(Collection<SinkRecord> records) {
+        try {
+            for (SinkRecord record : records) {
+                if (record.key() != null) {
+                    // Data is flushed asynchronously when CACHE_PER_NODE_DATA_SIZE is reached.
+                    StreamerContext.getStreamer().addData(record.key(), record.value());
+                }
+                else {
+                    log.error("Failed to stream a record with null key!");
+                }
+
+            }
+        }
+        catch (ConnectException e) {
+            log.error("Failed adding record", e);
+
+            throw new ConnectException(e);
+        }
+    }
+
+    /**
+     * Pushes buffered data to grid. Flush interval is configured by worker configurations.
+     *
+     * @param offsets Offset information.
+     */
+    @Override public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        if (stopped)
+            return;
+
+        StreamerContext.getStreamer().flush();
+    }
+
+    /**
+     * Stops the grid client.
+     */
+    @Override public void stop() {
+        if (stopped)
+            return;
+
+        stopped = true;
+
+        StreamerContext.getStreamer().close();
+        StreamerContext.getIgnite().close();
+    }
+
+    /**
+     * Streamer context initializing grid and data streamer instances on demand.
+     */
+    public static class StreamerContext {
+        /** Constructor. */
+        private StreamerContext() {
+        }
+
+        /** Instance holder. */
+        private static class Holder {
+            private static final Ignite IGNITE = Ignition.start(igniteConfigFile);
+            private static final IgniteDataStreamer STREAMER = IGNITE.dataStreamer(cacheName);
+        }
+
+        /**
+         * Obtains grid instance.
+         *
+         * @return Grid instance.
+         */
+        public static Ignite getIgnite() {
+            return Holder.IGNITE;
+        }
+
+        /**
+         * Obtains data streamer instance.
+         *
+         * @return Data streamer instance.
+         */
+        public static IgniteDataStreamer getStreamer() {
+            return Holder.STREAMER;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
index 9115ab4..731f540 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/IgniteKafkaStreamerSelfTestSuite.java
@@ -18,9 +18,10 @@
 package org.apache.ignite.stream.kafka;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.stream.kafka.connect.IgniteSinkConnectorTest;
 
 /**
- * Apache Kafka streamer tests.
+ * Apache Kafka streamers tests.
  */
 public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
     /**
@@ -30,8 +31,12 @@ public class IgniteKafkaStreamerSelfTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Apache Kafka streamer Test Suite");
 
+        // Kafka streamer.
         suite.addTest(new TestSuite(KafkaIgniteStreamerSelfTest.class));
 
+        // Kafka streamer via Connect API.
+        suite.addTest(new TestSuite(IgniteSinkConnectorTest.class));
+
         return suite;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
deleted file mode 100644
index 5e7cee7..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaEmbeddedBroker.java
+++ /dev/null
@@ -1,387 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeoutException;
-import kafka.admin.AdminUtils;
-import kafka.api.LeaderAndIsr;
-import kafka.api.PartitionStateInfo;
-import kafka.api.Request;
-import kafka.producer.KeyedMessage;
-import kafka.producer.Producer;
-import kafka.producer.ProducerConfig;
-import kafka.serializer.StringEncoder;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.SystemTime$;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import org.I0Itec.zkclient.ZkClient;
-import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-
-/**
- * Kafka Embedded Broker.
- */
-public class KafkaEmbeddedBroker {
-    /** Default ZooKeeper host. */
-    private static final String ZK_HOST = "localhost";
-
-    /** Broker port. */
-    private static final int BROKER_PORT = 9092;
-
-    /** ZooKeeper connection timeout. */
-    private static final int ZK_CONNECTION_TIMEOUT = 6000;
-
-    /** ZooKeeper session timeout. */
-    private static final int ZK_SESSION_TIMEOUT = 6000;
-
-    /** ZooKeeper port. */
-    private static int zkPort = 0;
-
-    /** Is ZooKeeper ready. */
-    private boolean zkReady;
-
-    /** Kafka config. */
-    private KafkaConfig brokerCfg;
-
-    /** Kafka server. */
-    private KafkaServer kafkaSrv;
-
-    /** ZooKeeper client. */
-    private ZkClient zkClient;
-
-    /** Embedded ZooKeeper. */
-    private EmbeddedZooKeeper zooKeeper;
-
-    /**
-     * Creates an embedded Kafka broker.
-     */
-    public KafkaEmbeddedBroker() {
-        try {
-            setupEmbeddedZooKeeper();
-
-            setupEmbeddedKafkaServer();
-        }
-        catch (IOException | InterruptedException e) {
-            throw new RuntimeException("Failed to start Kafka broker " + e);
-        }
-    }
-
-    /**
-     * @return ZooKeeper address.
-     */
-    public static String getZKAddress() {
-        return ZK_HOST + ':' + zkPort;
-    }
-
-    /**
-     * Creates a Topic.
-     *
-     * @param topic Topic name.
-     * @param partitions Number of partitions for the topic.
-     * @param replicationFactor Replication factor.
-     * @throws TimeoutException If operation is timed out.
-     * @throws InterruptedException If interrupted.
-     */
-    public void createTopic(String topic, int partitions, int replicationFactor)
-        throws TimeoutException, InterruptedException {
-        AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, new Properties());
-
-        waitUntilMetadataIsPropagated(topic, 0, 10000, 100);
-    }
-
-    /**
-     * Sends message to Kafka broker.
-     *
-     * @param keyedMessages List of keyed messages.
-     * @return Producer used to send the message.
-     */
-    public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
-        Producer<String, String> producer = new Producer<>(getProducerConfig());
-
-        producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
-
-        return producer;
-    }
-
-    /**
-     * Shuts down Kafka broker.
-     */
-    public void shutdown() {
-        zkReady = false;
-
-        if (kafkaSrv != null)
-            kafkaSrv.shutdown();
-
-        List<String> logDirs = scala.collection.JavaConversions.asJavaList(brokerCfg.logDirs());
-
-        for (String logDir : logDirs)
-            U.delete(new File(logDir));
-
-        if (zkClient != null) {
-            zkClient.close();
-
-            zkClient = null;
-        }
-
-        if (zooKeeper != null) {
-
-            try {
-                zooKeeper.shutdown();
-            }
-            catch (IOException e) {
-                // No-op.
-            }
-
-            zooKeeper = null;
-        }
-    }
-
-    /**
-     * @return ZooKeeper client.
-     */
-    private ZkClient getZkClient() {
-        A.ensure(zkReady, "Zookeeper not setup yet");
-        A.notNull(zkClient, "Zookeeper client is not yet initialized");
-
-        return zkClient;
-    }
-
-    /**
-     * Checks if topic metadata is propagated.
-     *
-     * @param topic Topic name.
-     * @param part Partition.
-     * @return {@code True} if propagated, otherwise {@code false}.
-     */
-    private boolean isMetadataPropagated(String topic, int part) {
-        scala.Option<PartitionStateInfo> partStateOption =
-            kafkaSrv.apis().metadataCache().getPartitionInfo(topic, part);
-
-        if (!partStateOption.isDefined())
-            return false;
-
-        PartitionStateInfo partState = partStateOption.get();
-
-        LeaderAndIsr LeaderAndIsr = partState.leaderIsrAndControllerEpoch().leaderAndIsr();
-
-        return ZkUtils.getLeaderForPartition(getZkClient(), topic, part) != null &&
-            Request.isValidBrokerId(LeaderAndIsr.leader()) && LeaderAndIsr.isr().size() >= 1;
-    }
-
-    /**
-     * Waits until metadata is propagated.
-     *
-     * @param topic Topic name.
-     * @param part Partition.
-     * @param timeout Timeout value in millis.
-     * @param interval Interval in millis to sleep.
-     * @throws TimeoutException If operation is timed out.
-     * @throws InterruptedException If interrupted.
-     */
-    private void waitUntilMetadataIsPropagated(String topic, int part, long timeout, long interval)
-        throws TimeoutException, InterruptedException {
-        int attempt = 1;
-
-        long startTime = System.currentTimeMillis();
-
-        while (true) {
-            if (isMetadataPropagated(topic, part))
-                return;
-
-            long duration = System.currentTimeMillis() - startTime;
-
-            if (duration < timeout)
-                Thread.sleep(interval);
-            else
-                throw new TimeoutException("Metadata propagation is timed out, attempt " + attempt);
-
-            attempt++;
-        }
-    }
-
-    /**
-     * Sets up embedded Kafka server.
-     *
-     * @throws IOException If failed.
-     */
-    private void setupEmbeddedKafkaServer() throws IOException {
-        A.ensure(zkReady, "Zookeeper should be setup before hand");
-
-        brokerCfg = new KafkaConfig(getBrokerConfig());
-
-        kafkaSrv = new KafkaServer(brokerCfg, SystemTime$.MODULE$);
-
-        kafkaSrv.startup();
-    }
-
-    /**
-     * Sets up embedded ZooKeeper.
-     *
-     * @throws IOException If failed.
-     * @throws InterruptedException If interrupted.
-     */
-    private void setupEmbeddedZooKeeper() throws IOException, InterruptedException {
-        EmbeddedZooKeeper zooKeeper = new EmbeddedZooKeeper(ZK_HOST, zkPort);
-
-        zooKeeper.startup();
-
-        zkPort = zooKeeper.getActualPort();
-
-        zkClient = new ZkClient(getZKAddress(), ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT, ZKStringSerializer$.MODULE$);
-
-        zkReady = true;
-    }
-
-    /**
-     * @return Kafka broker address.
-     */
-    private static String getBrokerAddress() {
-        return ZK_HOST + ':' + BROKER_PORT;
-    }
-
-    /**
-     * Gets Kafka broker config.
-     *
-     * @return Kafka broker config.
-     * @throws IOException If failed.
-     */
-    private static Properties getBrokerConfig() throws IOException {
-        Properties props = new Properties();
-
-        props.put("broker.id", "0");
-        props.put("host.name", ZK_HOST);
-        props.put("port", "" + BROKER_PORT);
-        props.put("log.dir", createTempDir("_cfg").getAbsolutePath());
-        props.put("zookeeper.connect", getZKAddress());
-        props.put("log.flush.interval.messages", "1");
-        props.put("replica.socket.timeout.ms", "1500");
-
-        return props;
-    }
-
-    /**
-     * @return Kafka Producer config.
-     */
-    private static ProducerConfig getProducerConfig() {
-        Properties props = new Properties();
-
-        props.put("metadata.broker.list", getBrokerAddress());
-        props.put("serializer.class", StringEncoder.class.getName());
-        props.put("key.serializer.class", StringEncoder.class.getName());
-        props.put("partitioner.class", SimplePartitioner.class.getName());
-
-        return new ProducerConfig(props);
-    }
-
-    /**
-     * Creates temp directory.
-     *
-     * @param prefix Prefix.
-     * @return Created file.
-     * @throws IOException If failed.
-     */
-    private static File createTempDir( String prefix) throws IOException {
-        Path path = Files.createTempDirectory(prefix);
-
-        return path.toFile();
-    }
-
-    /**
-     * Creates embedded ZooKeeper.
-     */
-    private static class EmbeddedZooKeeper {
-        /** Default ZooKeeper host. */
-        private final String zkHost;
-
-        /** Default ZooKeeper port. */
-        private final int zkPort;
-
-        /** NIO context factory. */
-        private NIOServerCnxnFactory factory;
-
-        /** Snapshot directory. */
-        private File snapshotDir;
-
-        /** Log directory. */
-        private File logDir;
-
-        /**
-         * Creates an embedded ZooKeeper.
-         *
-         * @param zkHost ZooKeeper host.
-         * @param zkPort ZooKeeper port.
-         */
-        EmbeddedZooKeeper(String zkHost, int zkPort) {
-            this.zkHost = zkHost;
-            this.zkPort = zkPort;
-        }
-
-        /**
-         * Starts up ZooKeeper.
-         *
-         * @throws IOException If failed.
-         * @throws InterruptedException If interrupted.
-         */
-        void startup() throws IOException, InterruptedException {
-            snapshotDir = createTempDir("_ss");
-
-            logDir = createTempDir("_log");
-
-            ZooKeeperServer zkSrv = new ZooKeeperServer(snapshotDir, logDir, 500);
-
-            factory = new NIOServerCnxnFactory();
-
-            factory.configure(new InetSocketAddress(zkHost, zkPort), 16);
-
-            factory.startup(zkSrv);
-        }
-
-        /**
-         * @return Actual port ZooKeeper is started.
-         */
-        int getActualPort() {
-            return factory.getLocalPort();
-        }
-
-        /**
-         * Shuts down ZooKeeper.
-         *
-         * @throws IOException If failed.
-         */
-        void shutdown() throws IOException {
-            if (factory != null) {
-                factory.shutdown();
-
-                U.delete(snapshotDir);
-
-                U.delete(logDir);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
index 927ba3d..829c877 100644
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/KafkaIgniteStreamerSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import kafka.consumer.ConsumerConfig;
 import kafka.producer.KeyedMessage;
@@ -40,14 +41,13 @@ import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
-import static org.apache.ignite.stream.kafka.KafkaEmbeddedBroker.getZKAddress;
 
 /**
  * Tests {@link KafkaStreamer}.
  */
 public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
     /** Embedded Kafka. */
-    private KafkaEmbeddedBroker embeddedBroker;
+    private TestKafkaBroker embeddedBroker;
 
     /** Count. */
     private static final int CNT = 100;
@@ -77,7 +77,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         grid().<Integer, String>getOrCreateCache(defaultCacheConfiguration());
 
-        embeddedBroker = new KafkaEmbeddedBroker();
+        embeddedBroker = new TestKafkaBroker();
     }
 
     /** {@inheritDoc} */
@@ -176,7 +176,7 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
             kafkaStmr.setThreads(4);
 
             // Set the consumer configuration.
-            kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(getZKAddress(), "groupX"));
+            kafkaStmr.setConsumerConfig(createDefaultConsumerConfig(embeddedBroker.getZookeeperAddress(), "groupX"));
 
             // Set the decoders.
             StringDecoder strDecoder = new StringDecoder(new VerifiableProperties());
@@ -199,7 +199,8 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
             ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
 
-            latch.await();
+            // Checks all events successfully processed in 10 seconds.
+            assertTrue(latch.await(10, TimeUnit.SECONDS));
 
             for (Map.Entry<String, String> entry : keyValMap.entrySet())
                 assertEquals(entry.getValue(), cache.get(entry.getKey()));
@@ -232,4 +233,4 @@ public class KafkaIgniteStreamerSelfTest extends GridCommonAbstractTest {
 
         return new ConsumerConfig(props);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
deleted file mode 100644
index b49bebe..0000000
--- a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/SimplePartitioner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.kafka;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * Simple partitioner for Kafka.
- */
-@SuppressWarnings("UnusedDeclaration")
-public class SimplePartitioner implements Partitioner {
-    /**
-     * Constructs instance.
-     *
-     * @param props Properties.
-     */
-    public SimplePartitioner(VerifiableProperties props) {
-        // No-op.
-    }
-
-    /**
-     * Partitions the key based on the key value.
-     *
-     * @param key Key.
-     * @param partSize Partition size.
-     * @return partition Partition.
-     */
-    public int partition(Object key, int partSize) {
-        String keyStr = (String)key;
-
-        String[] keyValues = keyStr.split("\\.");
-
-        Integer intKey = Integer.parseInt(keyValues[3]);
-
-        return intKey > 0 ? intKey % partSize : 0;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
new file mode 100644
index 0000000..70acb78
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/TestKafkaBroker.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeoutException;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import kafka.producer.ProducerConfig;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.SystemTime$;
+import kafka.utils.TestUtils;
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+import org.apache.curator.test.TestingServer;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import scala.Tuple2;
+
+/**
+ * Kafka Test Broker.
+ */
+public class TestKafkaBroker {
+    /** ZooKeeper connection timeout. */
+    private static final int ZK_CONNECTION_TIMEOUT = 6000;
+
+    /** ZooKeeper session timeout. */
+    private static final int ZK_SESSION_TIMEOUT = 6000;
+
+    /** ZooKeeper port. */
+    private static final int ZK_PORT = 21811;
+
+    /** Broker host. */
+    private static final String BROKER_HOST = "localhost";
+
+    /** Broker port. */
+    private static final int BROKER_PORT = 9092;
+
+    /** Kafka config. */
+    private KafkaConfig kafkaCfg;
+
+    /** Kafka server. */
+    private KafkaServer kafkaSrv;
+
+    /** ZooKeeper. */
+    private TestingServer zkServer;
+
+    /** Kafka Zookeeper utils. */
+    private ZkUtils zkUtils;
+
+    /**
+     * Kafka broker constructor.
+     */
+    public TestKafkaBroker() {
+        try {
+            setupZooKeeper();
+
+            setupKafkaServer();
+        }
+        catch (Exception e) {
+            throw new RuntimeException("Failed to start Kafka: " + e);
+        }
+    }
+
+    /**
+     * Creates a topic.
+     *
+     * @param topic Topic name.
+     * @param partitions Number of partitions for the topic.
+     * @param replicationFactor Replication factor.
+     * @throws TimeoutException If operation is timed out.
+     * @throws InterruptedException If interrupted.
+     */
+    public void createTopic(String topic, int partitions, int replicationFactor)
+        throws TimeoutException, InterruptedException {
+        List<KafkaServer> servers = new ArrayList<>();
+
+        servers.add(kafkaSrv);
+
+        TestUtils.createTopic(zkUtils, topic, partitions, replicationFactor,
+            scala.collection.JavaConversions.asScalaBuffer(servers), new Properties());
+    }
+
+    /**
+     * Sends a message to Kafka broker.
+     *
+     * @param keyedMessages List of keyed messages.
+     * @return Producer used to send the message.
+     */
+    public Producer<String, String> sendMessages(List<KeyedMessage<String, String>> keyedMessages) {
+        Producer<String, String> producer = new Producer<>(getProducerConfig());
+
+        producer.send(scala.collection.JavaConversions.asScalaBuffer(keyedMessages));
+
+        return producer;
+    }
+
+    /**
+     * Shuts down test Kafka broker.
+     */
+    public void shutdown() {
+        if (zkUtils != null)
+            zkUtils.close();
+
+        if (kafkaSrv != null)
+            kafkaSrv.shutdown();
+
+        if (zkServer != null) {
+            try {
+                zkServer.stop();
+            }
+            catch (IOException e) {
+                // No-op.
+            }
+        }
+
+        List<String> logDirs = scala.collection.JavaConversions.seqAsJavaList(kafkaCfg.logDirs());
+
+        for (String logDir : logDirs)
+            U.delete(new File(logDir));
+    }
+
+    /**
+     * Sets up test Kafka broker.
+     *
+     * @throws IOException If failed.
+     */
+    private void setupKafkaServer() throws IOException {
+        kafkaCfg = new KafkaConfig(getKafkaConfig());
+
+        kafkaSrv = TestUtils.createServer(kafkaCfg, SystemTime$.MODULE$);
+
+        kafkaSrv.startup();
+    }
+
+    /**
+     * Sets up ZooKeeper test server.
+     *
+     * @throws Exception If failed.
+     */
+    private void setupZooKeeper() throws Exception {
+        zkServer = new TestingServer(ZK_PORT, true);
+
+        Tuple2<ZkClient, ZkConnection> zkTuple = ZkUtils.createZkClientAndConnection(zkServer.getConnectString(),
+            ZK_SESSION_TIMEOUT, ZK_CONNECTION_TIMEOUT);
+
+        zkUtils = new ZkUtils(zkTuple._1(), zkTuple._2(), false);
+    }
+
+    /**
+     * Obtains Kafka config.
+     *
+     * @return Kafka config.
+     * @throws IOException If failed.
+     */
+    private Properties getKafkaConfig() throws IOException {
+        Properties props = new Properties();
+
+        props.put("broker.id", "0");
+        props.put("zookeeper.connect", zkServer.getConnectString());
+        props.put("host.name", BROKER_HOST);
+        props.put("port", BROKER_PORT);
+        props.put("offsets.topic.replication.factor", "1");
+        props.put("log.dir", createTmpDir("_cfg").getAbsolutePath());
+        props.put("log.flush.interval.messages", "1");
+
+        return props;
+    }
+
+    /**
+     * Obtains broker address.
+     *
+     * @return Kafka broker address.
+     */
+    public String getBrokerAddress() {
+        return BROKER_HOST + ":" + BROKER_PORT;
+    }
+
+    /**
+     * Obtains Zookeeper address.
+     *
+     * @return Zookeeper address.
+     */
+    public String getZookeeperAddress() {
+        return BROKER_HOST + ":" + ZK_PORT;
+    }
+
+    /**
+     * Obtains producer config.
+     *
+     * @return Kafka Producer config.
+     */
+    private ProducerConfig getProducerConfig() {
+        Properties props = new Properties();
+
+        props.put("metadata.broker.list", getBrokerAddress());
+        props.put("bootstrap.servers", getBrokerAddress());
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+
+        return new ProducerConfig(props);
+    }
+
+    /**
+     * Creates temporary directory.
+     *
+     * @param prefix Prefix.
+     * @return Created file.
+     * @throws IOException If failed.
+     */
+    private static File createTmpDir(String prefix) throws IOException {
+        Path path = Files.createTempDirectory(prefix);
+
+        return path.toFile();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
new file mode 100644
index 0000000..a8583d0
--- /dev/null
+++ b/modules/kafka/src/test/java/org/apache/ignite/stream/kafka/connect/IgniteSinkConnectorTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.kafka.connect;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import kafka.producer.KeyedMessage;
+import kafka.producer.Producer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.stream.kafka.TestKafkaBroker;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.Worker;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.connect.storage.OffsetBackingStore;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.mock;
+
+/**
+ * Tests for {@link IgniteSinkConnector}.
+ */
+public class IgniteSinkConnectorTest extends GridCommonAbstractTest {
+    /** Number of input messages. */
+    private static final int EVENT_CNT = 10000;
+
+    /** Cache name. */
+    private static final String CACHE_NAME = "testCache";
+
+    /** Test topics. */
+    private static final String[] TOPICS = {"test1", "test2"};
+
+    /** Kafka partition. */
+    private static final int PARTITIONS = 3;
+
+    /** Kafka replication factor. */
+    private static final int REPLICATION_FACTOR = 1;
+
+    /** Test Kafka broker. */
+    private TestKafkaBroker kafkaBroker;
+
+    /** Worker to run tasks. */
+    private Worker worker;
+
+    /** Workers' herder. */
+    private Herder herder;
+
+    /** Ignite server node. */
+    private Ignite grid;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTest() throws Exception {
+        IgniteConfiguration cfg = loadConfiguration("modules/kafka/src/test/resources/example-ignite.xml");
+
+        cfg.setClientMode(false);
+
+        grid = startGrid("igniteServerNode", cfg);
+
+        kafkaBroker = new TestKafkaBroker();
+
+        for (String topic : TOPICS)
+            kafkaBroker.createTopic(topic, PARTITIONS, REPLICATION_FACTOR);
+
+        WorkerConfig workerConfig = new StandaloneConfig(makeWorkerProps());
+
+        OffsetBackingStore offsetBackingStore = mock(OffsetBackingStore.class);
+        offsetBackingStore.configure(anyObject(Map.class));
+
+        worker = new Worker(workerConfig, offsetBackingStore);
+        worker.start();
+
+        herder = new StandaloneHerder(worker);
+        herder.start();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        herder.stop();
+
+        worker.stop();
+
+        kafkaBroker.shutdown();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Tests the whole data flow from injecting data to Kafka to transferring it to the grid. It reads from two
+     * specified Kafka topics, because a sink task can read from multiple topics.
+     *
+     * @throws Exception Thrown in case of the failure.
+     */
+    public void testSinkPuts() throws Exception {
+        Map<String, String> sinkProps = makeSinkProps(Utils.join(TOPICS, ","));
+
+        FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+            @Override
+            public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
+                if (error != null)
+                    throw new RuntimeException("Failed to create a job!");
+            }
+        });
+
+        herder.putConnectorConfig(
+            sinkProps.get(ConnectorConfig.NAME_CONFIG),
+            sinkProps, false, cb);
+
+        cb.get();
+
+        final CountDownLatch latch = new CountDownLatch(EVENT_CNT * TOPICS.length);
+
+        final IgnitePredicate<Event> putLsnr = new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                assert evt != null;
+
+                latch.countDown();
+
+                return true;
+            }
+        };
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).localListen(putLsnr, EVT_CACHE_OBJECT_PUT);
+
+        IgniteCache<String, String> cache = grid.cache(CACHE_NAME);
+
+        assertEquals(0, cache.size(CachePeekMode.PRIMARY));
+
+        Map<String, String> keyValMap = new HashMap<>(EVENT_CNT * TOPICS.length);
+
+        // Produces events for the specified number of topics
+        for (String topic : TOPICS)
+            keyValMap.putAll(produceStream(topic));
+
+        // Checks all events successfully processed in 10 seconds.
+        assertTrue(latch.await(10, TimeUnit.SECONDS));
+
+        grid.events(grid.cluster().forCacheNodes(CACHE_NAME)).stopLocalListen(putLsnr);
+
+        // Checks that each event was processed properly.
+        for (Map.Entry<String, String> entry : keyValMap.entrySet())
+            assertEquals(entry.getValue(), cache.get(entry.getKey()));
+
+        assertEquals(EVENT_CNT * TOPICS.length, cache.size(CachePeekMode.PRIMARY));
+    }
+
+    /**
+     * Sends messages to Kafka.
+     *
+     * @param topic Topic name.
+     * @return Map of key value messages.
+     */
+    private Map<String, String> produceStream(String topic) {
+        List<KeyedMessage<String, String>> messages = new ArrayList<>(EVENT_CNT);
+
+        Map<String, String> keyValMap = new HashMap<>();
+
+        for (int evt = 0; evt < EVENT_CNT; evt++) {
+            long runtime = System.currentTimeMillis();
+
+            String key = topic + "_" + String.valueOf(evt);
+            String msg = runtime + String.valueOf(evt);
+
+            messages.add(new KeyedMessage<>(topic, key, msg));
+
+            keyValMap.put(key, msg);
+        }
+
+        Producer<String, String> producer = kafkaBroker.sendMessages(messages);
+
+        producer.close();
+
+        return keyValMap;
+    }
+
+    /**
+     * Creates properties for test sink connector.
+     *
+     * @param topics Topics.
+     * @return Test sink connector properties.
+     */
+    private Map<String, String> makeSinkProps(String topics) {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(ConnectorConfig.TOPICS_CONFIG, topics);
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "2");
+        props.put(ConnectorConfig.NAME_CONFIG, "test-connector");
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, IgniteSinkConnector.class.getName());
+        props.put(IgniteSinkConstants.CACHE_NAME, "testCache");
+        props.put(IgniteSinkConstants.CACHE_ALLOW_OVERWRITE, "true");
+        props.put(IgniteSinkConstants.CACHE_CFG_PATH, "example-ignite.xml");
+
+        return props;
+    }
+
+    /**
+     * Creates properties for Kafka Connect workers.
+     *
+     * @return Worker configurations.
+     */
+    private Map<String, String> makeWorkerProps() {
+        Map<String, String> props = new HashMap<>();
+
+        props.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("internal.key.converter.schemas.enable", "false");
+        props.put("internal.value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        props.put("key.converter.schemas.enable", "false");
+        props.put("value.converter.schemas.enable", "false");
+        props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerAddress());
+        // fast flushing for testing.
+        props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
+
+        return props;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/modules/kafka/src/test/resources/example-ignite.xml
----------------------------------------------------------------------
diff --git a/modules/kafka/src/test/resources/example-ignite.xml b/modules/kafka/src/test/resources/example-ignite.xml
new file mode 100644
index 0000000..fbb05d3
--- /dev/null
+++ b/modules/kafka/src/test/resources/example-ignite.xml
@@ -0,0 +1,71 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    Ignite configuration with all defaults and enabled p2p deployment and enabled events.
+    Used for testing IgniteSink running Ignite in a client mode.
+-->
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans
+        http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util
+        http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <!-- Enable client mode. -->
+        <property name="clientMode" value="true"/>
+
+        <!-- Cache accessed from IgniteSink. -->
+        <property name="cacheConfiguration">
+            <list>
+                <!-- Partitioned cache example configuration with configurations adjusted to server nodes'. -->
+                <bean class="org.apache.ignite.configuration.CacheConfiguration">
+                    <property name="atomicityMode" value="ATOMIC"/>
+
+                    <property name="name" value="testCache"/>
+                </bean>
+            </list>
+        </property>
+
+        <!-- Enable cache events. -->
+        <property name="includeEventTypes">
+            <list>
+                <!-- Cache events (only EVT_CACHE_OBJECT_PUT for tests). -->
+                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
+            </list>
+        </property>
+
+        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses">
+                            <list>
+                                <value>127.0.0.1:47500</value>
+                            </list>
+                        </property>
+                    </bean>
+                </property>
+            </bean>
+        </property>
+    </bean>
+</beans>

http://git-wip-us.apache.org/repos/asf/ignite/blob/c92c2747/parent/pom.xml
----------------------------------------------------------------------
diff --git a/parent/pom.xml b/parent/pom.xml
index 8f02fec..21d8c69 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -61,6 +61,7 @@
         <commons.lang.version>2.6</commons.lang.version>
         <cron4j.version>2.2.5</cron4j.version>
         <curator.version>2.9.1</curator.version>
+        <easymock.version>3.4</easymock.version>
         <ezmorph.bundle.version>1.0.6_1</ezmorph.bundle.version>
         <ezmorph.version>1.0.6</ezmorph.version>
         <flume.ng.version>1.6.0</flume.ng.version>
@@ -82,11 +83,9 @@
         <jsonlib.bundle.version>2.4_1</jsonlib.bundle.version>
         <jsonlib.version>2.4</jsonlib.version>
         <jtidy.version>r938</jtidy.version>
-        <kafka.bundle.version>0.8.2.1_1</kafka.bundle.version>
-        <kafka.clients.bundle.version>0.8.2.0_1</kafka.clients.bundle.version>
-        <kafka.clients.version>0.8.2.0</kafka.clients.version>
-        <kafka.version>0.8.2.1</kafka.version>
-        <kafka.version>0.8.2.1</kafka.version>
+        <kafka.bundle.version>0.9.0.0_1</kafka.bundle.version>
+        <kafka.clients.bundle.version>0.9.0.0_1</kafka.clients.bundle.version>
+        <kafka.version>0.9.0.0</kafka.version>
         <karaf.version>4.0.2</karaf.version>
         <lucene.bundle.version>3.5.0_1</lucene.bundle.version>
         <lucene.version>3.5.0</lucene.version>


Mime
View raw message