camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [05/14] camel git commit: Refactored embedded test infrastructure
Date Fri, 03 Mar 2017 19:36:28 GMT
Refactored embedded test infrastructure


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4b7f0c75
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4b7f0c75
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4b7f0c75

Branch: refs/heads/master
Commit: 4b7f0c75a0a723160d8b5c6fad52c33eb4019fbd
Parents: e37bbb3
Author: Jakub Korab <jakub.korab@gmail.com>
Authored: Tue Feb 28 17:23:50 2017 +0000
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Mar 3 20:03:08 2017 +0100

----------------------------------------------------------------------
 .../component/kafka/BaseEmbeddedKafkaTest.java  |  31 ++-
 .../KafkaConsumerOffsetRepositoryEmptyTest.java |   2 +-
 ...KafkaConsumerOffsetRepositoryResumeTest.java |   3 +-
 .../kafka/embedded/EmbeddedKafkaBroker.java     | 128 ++++++++++++
 .../kafka/embedded/EmbeddedKafkaCluster.java    | 198 -------------------
 .../kafka/embedded/EmbeddedZookeeper.java       |  25 +--
 .../kafka/KafkaIdempotentRepositoryTest.java    |  37 +++-
 7 files changed, 188 insertions(+), 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
index a6cf073..79969f5 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/BaseEmbeddedKafkaTest.java
@@ -17,7 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.component.kafka.embedded.EmbeddedKafkaCluster;
+import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
 import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
 import org.apache.camel.component.properties.PropertiesComponent;
 import org.apache.camel.impl.JndiRegistry;
@@ -35,30 +35,27 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
 
     static final Logger LOG = LoggerFactory.getLogger(BaseEmbeddedKafkaTest.class);
 
-    // start from somewhere in the 23xxx range
-    private static volatile int zookeeperPort = AvailablePortFinder.getNextAvailable(23000);
-
     @ClassRule
-    public static EmbeddedZookeeper embeddedZookeeper = new EmbeddedZookeeper(zookeeperPort);
-
-    private static volatile int kafkaPort = AvailablePortFinder.getNextAvailable(24000);
+    public static EmbeddedZookeeper zookeeper = new EmbeddedZookeeper(
+            AvailablePortFinder.getNextAvailable(23000));
 
     @ClassRule
-    public static EmbeddedKafkaCluster embeddedKafkaCluster =
-            new EmbeddedKafkaCluster(embeddedZookeeper.getConnection(),
-                    new Properties(), kafkaPort);
+    public static EmbeddedKafkaBroker kafkaBroker =
+            new EmbeddedKafkaBroker(0,
+                    AvailablePortFinder.getNextAvailable(24000),
+                    zookeeper.getConnection(),
+                    new Properties());
 
     @BeforeClass
     public static void beforeClass() {
-        LOG.info("### Embedded Zookeeper connection: " + embeddedZookeeper.getConnection());
-        LOG.info("### Embedded Kafka cluster broker list: " + embeddedKafkaCluster.getBrokerList());
+        LOG.info("### Embedded Zookeeper connection: " + zookeeper.getConnection());
+        LOG.info("### Embedded Kafka cluster broker list: " + kafkaBroker.getBrokerList());
     }
 
     protected Properties getDefaultProperties() {
         Properties props = new Properties();
-        int kafkaPort = getKafkaPort();
-        LOG.info("Connecting to Kafka port {}", kafkaPort);
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:" + kafkaPort);
+        LOG.info("Connecting to Kafka port {}", kafkaBroker.getPort());
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker.getBrokerList());
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_SERIALIZER);
         props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaConstants.KAFKA_DEFAULT_PARTITIONER);
@@ -90,11 +87,11 @@ public class BaseEmbeddedKafkaTest extends CamelTestSupport {
     }
 
     protected static int getZookeeperPort() {
-        return zookeeperPort;
+        return zookeeper.getPort();
     }
 
     protected static int getKafkaPort() {
-        return kafkaPort;
+        return kafkaBroker.getPort();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
index 72e8f14..73ec9e6 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryEmptyTest.java
@@ -41,7 +41,7 @@ public class KafkaConsumerOffsetRepositoryEmptyTest extends BaseEmbeddedKafkaTes
     @Override
     protected void doPreSetup() throws Exception {
         // Create the topic with 2 partitions + send 10 messages (5 in each partitions)
-        embeddedKafkaCluster.createTopic(TOPIC, 2);
+        kafkaBroker.createTopic(TOPIC, 2);
 
         Properties props = getDefaultProperties();
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
index 793535e..3b08dea 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerOffsetRepositoryResumeTest.java
@@ -23,7 +23,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.JndiRegistry;
 import org.apache.camel.impl.MemoryStateRepository;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.junit.After;
 import org.junit.Test;
@@ -44,7 +43,7 @@ public class KafkaConsumerOffsetRepositoryResumeTest extends BaseEmbeddedKafkaTe
         producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
 
         // Create the topic with 2 partitions + send 10 messages (5 in each partitions)
-        embeddedKafkaCluster.createTopic(TOPIC, 2);
+        kafkaBroker.createTopic(TOPIC, 2);
         for (int i = 0; i < 10; i++) {
             producer.send(new ProducerRecord<>(TOPIC, i % 2, "key", "message-" + i));
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
new file mode 100644
index 0000000..8ab50ac
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.embedded;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.ZkUtils;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.mutable.Buffer;
+
+import static org.apache.camel.component.kafka.embedded.TestUtils.*;
+
+public class EmbeddedKafkaBroker extends ExternalResource {
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private final Integer brokerId;
+    private final Integer port;
+    private final String zkConnection;
+    private final Properties baseProperties;
+
+    private final String brokerList;
+
+    private KafkaServer kafkaServer;
+    private File logDir;
+
+    public EmbeddedKafkaBroker(int brokerId, String zkConnection) {
+        this(brokerId, AvailablePortFinder.getNextAvailable(), zkConnection, new Properties());
+    }
+
+    public EmbeddedKafkaBroker(int brokerId, int port, String zkConnection, Properties baseProperties)
{
+        this.brokerId = brokerId;
+        this.port = port;
+        this.zkConnection = zkConnection;
+        this.baseProperties = baseProperties;
+
+        this.brokerList = "localhost:" + this.port;
+    }
+
+    public ZkUtils getZkUtils() {
+        return kafkaServer.zkUtils();
+    }
+
+    public void createTopic(String topic, int partitionCount) {
+        log.info("createTopic");
+        AdminUtils.createTopic(getZkUtils(), topic, partitionCount, 1, new Properties(),
RackAwareMode.Enforced$.MODULE$);
+    }
+
+    public void before() {
+        log.info("before");
+        logDir = constructTempDir(perTest("kafka-log"));
+
+        Properties properties = new Properties();
+        properties.putAll(baseProperties);
+        properties.setProperty("zookeeper.connect", zkConnection);
+        properties.setProperty("broker.id", brokerId.toString());
+        properties.setProperty("host.name", "localhost");
+        properties.setProperty("port", Integer.toString(port));
+        properties.setProperty("log.dir", logDir.getAbsolutePath());
+        properties.setProperty("num.partitions", String.valueOf(1));
+        properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
+        log.info("log directory: " + logDir.getAbsolutePath());
+        properties.setProperty("log.flush.interval.messages", String.valueOf(1));
+
+        kafkaServer = startBroker(properties);
+    }
+
+
+    private KafkaServer startBroker(Properties props) {
+        List<KafkaMetricsReporter> kmrList = new ArrayList<>();
+        Buffer<KafkaMetricsReporter> metricsList = scala.collection.JavaConversions.asScalaBuffer(kmrList);
+        KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime(), Option.<String>empty(),
metricsList);
+        server.startup();
+        return server;
+    }
+
+    public String getBrokerList() {
+        return brokerList;
+    }
+
+    public Integer getPort() {
+        return port;
+    }
+
+    public void after() {
+        kafkaServer.shutdown();
+        try {
+            TestUtils.deleteFile(logDir);
+        } catch (FileNotFoundException e) {
+            log.info("Could not delete {} - not found", logDir.getAbsolutePath());
+        }
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("EmbeddedKafkaBroker{");
+        sb.append("brokerList='").append(brokerList).append('\'');
+        sb.append('}');
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
deleted file mode 100644
index 5f5fc8e..0000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaCluster.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.embedded;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-
-import kafka.admin.AdminUtils;
-import kafka.admin.RackAwareMode;
-import kafka.metrics.KafkaMetricsReporter;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.ZkUtils;
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.collection.mutable.Buffer;
-
-import static org.apache.camel.component.kafka.embedded.TestUtils.*;
-
-public class EmbeddedKafkaCluster extends ExternalResource {
-
-    private final Logger log = LoggerFactory.getLogger(this.getClass());
-
-    private final List<Integer> ports;
-    private final String zkConnection;
-    private final Properties baseProperties;
-
-    private final String brokerList;
-
-    private final List<KafkaServer> kafkaServers;
-    private final List<File> logDirs;
-
-    public EmbeddedKafkaCluster(String zkConnection) {
-        this(zkConnection, new Properties());
-    }
-
-    public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties) {
-        this(zkConnection, baseProperties, Collections.singletonList(-1));
-    }
-
-    public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, Integer port)
{
-        this(zkConnection, baseProperties, Collections.singletonList(port));
-    }
-
-    public EmbeddedKafkaCluster(String zkConnection, Properties baseProperties, List<Integer>
ports) {
-        this.zkConnection = zkConnection;
-        this.ports = resolvePorts(ports);
-        this.baseProperties = baseProperties;
-        this.kafkaServers = new ArrayList<>();
-        this.logDirs = new ArrayList<>();
-
-        this.brokerList = constructBrokerList(this.ports);
-    }
-
-    public ZkUtils getZkUtils() {
-        // gets the first instance
-        for (KafkaServer kafkaServer : kafkaServers) {
-            return kafkaServer.zkUtils();
-        }
-        return null;
-    }
-
-    public void createTopic(String topic, int partitionCount) {
-        log.info("createTopic");
-        AdminUtils.createTopic(getZkUtils(), topic, partitionCount, 1, new Properties(),
RackAwareMode.Enforced$.MODULE$);
-    }
-
-    public void createTopics(String... topics) {
-        for (String topic : topics) {
-            AdminUtils.createTopic(getZkUtils(), topic, 2, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
-        }
-    }
-
-    private List<Integer> resolvePorts(List<Integer> ports) {
-        List<Integer> resolvedPorts = new ArrayList<Integer>(ports.size());
-        for (Integer port : ports) {
-            resolvedPorts.add(resolvePort(port));
-        }
-        return resolvedPorts;
-    }
-
-    private int resolvePort(int port) {
-        if (port == -1) {
-            return TestUtils.getAvailablePort();
-        }
-        return port;
-    }
-
-    private String constructBrokerList(List<Integer> ports) {
-        StringBuilder sb = new StringBuilder();
-        for (Integer port : ports) {
-            if (sb.length() > 0) {
-                sb.append(",");
-            }
-            sb.append("localhost:").append(port);
-        }
-        return sb.toString();
-    }
-
-    public void before() {
-        log.info("before");
-        for (int i = 0; i < ports.size(); i++) {
-            Integer port = ports.get(i);
-            File logDir = constructTempDir(perTest("kafka-log"));
-
-            Properties properties = new Properties();
-            properties.putAll(baseProperties);
-            properties.setProperty("zookeeper.connect", zkConnection);
-            properties.setProperty("broker.id", String.valueOf(i + 1));
-            properties.setProperty("host.name", "localhost");
-            properties.setProperty("port", Integer.toString(port));
-            properties.setProperty("log.dir", logDir.getAbsolutePath());
-            properties.setProperty("num.partitions", String.valueOf(1));
-            properties.setProperty("auto.create.topics.enable", String.valueOf(Boolean.TRUE));
-            log.info("log directory: " + logDir.getAbsolutePath());
-            properties.setProperty("log.flush.interval.messages", String.valueOf(1));
-
-            KafkaServer broker = startBroker(properties);
-
-            kafkaServers.add(broker);
-            logDirs.add(logDir);
-        }
-    }
-
-
-    private KafkaServer startBroker(Properties props) {
-        List<KafkaMetricsReporter> kmrList = new ArrayList<>();
-        Buffer<KafkaMetricsReporter> metricsList = scala.collection.JavaConversions.asScalaBuffer(kmrList);
-        KafkaServer server = new KafkaServer(new KafkaConfig(props), new SystemTime(), Option.<String>empty(),
metricsList);
-        server.startup();
-        return server;
-    }
-
-    public Properties getProps() {
-        Properties props = new Properties();
-        props.putAll(baseProperties);
-        props.put("metadata.broker.list", brokerList);
-        props.put("zookeeper.connect", zkConnection);
-        return props;
-    }
-
-    public String getBrokerList() {
-        return brokerList;
-    }
-
-    public List<Integer> getPorts() {
-        return ports;
-    }
-
-    public String getZkConnection() {
-        return zkConnection;
-    }
-
-    public void after() {
-        for (KafkaServer broker : kafkaServers) {
-            try {
-                broker.shutdown();
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-        for (File logDir : logDirs) {
-            try {
-                TestUtils.deleteFile(logDir);
-            } catch (FileNotFoundException e) {
-                // do nothing
-            }
-        }
-    }
-
-    @Override
-    public String toString() {
-        final StringBuilder sb = new StringBuilder("EmbeddedKafkaCluster{");
-        sb.append("brokerList='").append(brokerList).append('\'');
-        sb.append('}');
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
index 1ee4bb4..1c6db29 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedZookeeper.java
@@ -21,6 +21,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import org.apache.camel.test.AvailablePortFinder;
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -37,31 +38,17 @@ public class EmbeddedZookeeper extends ExternalResource {
     private File logDir;
     private ZooKeeperServer zooKeeperServer;
 
+
     public EmbeddedZookeeper() {
-        this(-1);
+        this(AvailablePortFinder.getNextAvailable());
     }
 
     public EmbeddedZookeeper(int port) {
-        this(port, 500);
-    }
-
-    public EmbeddedZookeeper(int port, int tickTime) {
-        this.port = resolvePort(port);
-        this.tickTime = tickTime;
-    }
-
-    private int resolvePort(int port) {
-        if (port == -1) {
-            return TestUtils.getAvailablePort();
-        }
-        return port;
+        this.port = port;
     }
 
     @Override
     public void before() throws IOException {
-        if (this.port == -1) {
-            this.port = TestUtils.getAvailablePort();
-        }
         this.snapshotDir = constructTempDir(perTest("zk-snapshot"));
         this.logDir = constructTempDir(perTest("zk-log"));
 
@@ -96,6 +83,10 @@ public class EmbeddedZookeeper extends ExternalResource {
         return "localhost:" + port;
     }
 
+    public int getPort() {
+        return port;
+    }
+
     @Override
     public String toString() {
         final StringBuilder sb = new StringBuilder("EmbeddedZookeeper{");

http://git-wip-us.apache.org/repos/asf/camel/blob/4b7f0c75/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
index 936f846..90b89a0 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
@@ -1,10 +1,45 @@
 package org.apache.camel.processor.idempotent.kafka;
 
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
+import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
+import org.apache.camel.test.AvailablePortFinder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.Properties;
+
 import static org.junit.Assert.*;
 
 /**
  * @author jkorab
  */
-public class KafkaIdempotentRepositoryTest {
+public class KafkaIdempotentRepositoryTest extends CamelTestSupport {
+
+    @Rule
+    public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
+
+    @Rule
+    public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository();
+
+                from("direct:in")
+                    .idempotentConsumer(header("id"), kafkaIdempotentRepository)
+                    .to("mock:out");
+            }
+        };
+    }
 
+    @Test
+    public void testRemovesDuplicate() {
+        
+    }
 }
\ No newline at end of file


Mime
View raw message