Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 60FF317925 for ; Sat, 27 Sep 2014 11:23:32 +0000 (UTC) Received: (qmail 13473 invoked by uid 500); 27 Sep 2014 11:23:31 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 13413 invoked by uid 500); 27 Sep 2014 11:23:31 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 13262 invoked by uid 99); 27 Sep 2014 11:23:31 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 27 Sep 2014 11:23:31 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D25D89B7156; Sat, 27 Sep 2014 11:23:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Sat, 27 Sep 2014 11:23:30 -0000 Message-Id: <78356d0cc80342c2b1275db84edd348d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/7] git commit: CAMEL-7864: Updates to allow setting Kafka's zookeeper.connect directly. Repository: camel Updated Branches: refs/heads/camel-2.13.x 3a5497afc -> b840c4dd2 refs/heads/camel-2.14.x 49dccc13d -> 728a4c5e3 refs/heads/master 3eb922edd -> ecaafc411 CAMEL-7864: Updates to allow setting Kafka's zookeeper.connect directly. - Added new property to KafkaConfiguration for zookeeperConnection and configured it to override the zookeeperHost and zookeeperPort properties. - Created getZookeeperConnect method on KafkaConfiguration to return the zookeeperConnect property if set or the combination of host ":" port if zookeeperConnect is not set. - Added zookeeperConnect get and set methods on KafkaEndpoint to delegate to KafkaConfiguration. - Updated KafkaConsumer to use the getZookeeperConnect method on the KafkaEndpoint. - Added tests for the changes. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/64696f05 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/64696f05 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/64696f05 Branch: refs/heads/master Commit: 64696f05293e55ec42f88ef2fed4e28cc9d52a5a Parents: b1e84ee Author: john.shields Authored: Thu Sep 25 20:57:23 2014 -0500 Committer: john.shields Committed: Thu Sep 25 21:07:44 2014 -0500 ---------------------------------------------------------------------- .../component/kafka/KafkaConfiguration.java | 25 +++++++++++++++++-- .../camel/component/kafka/KafkaConsumer.java | 9 +++---- .../camel/component/kafka/KafkaEndpoint.java | 9 +++++++ .../component/kafka/KafkaComponentTest.java | 26 ++++++++++++++++++++ .../component/kafka/KafkaConsumerTest.java | 14 ++++++++--- 5 files changed, 71 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/64696f05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java index 88d5017..881ef3c 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java @@ -21,6 +21,7 @@ import java.util.Properties; import kafka.producer.DefaultPartitioner; public class KafkaConfiguration { + private String zookeeperConnect; private String zookeeperHost; private int zookeeperPort = 2181; private String topic; @@ -127,13 +128,31 @@ public class KafkaConfiguration { props.put(key, value.toString()); } } + + public String getZookeeperConnect() { + if (this.zookeeperConnect != null) { + return zookeeperConnect; + } else { + return getZookeeperHost() + ":" + getZookeeperPort(); + } + } + + public void setZookeeperConnect(String zookeeperConnect) { + this.zookeeperConnect = zookeeperConnect; + + // connect overrides host and port + this.zookeeperHost = null; + this.zookeeperPort = -1; + } public String getZookeeperHost() { return zookeeperHost; } public void setZookeeperHost(String zookeeperHost) { - this.zookeeperHost = zookeeperHost; + if (this.zookeeperConnect == null) { + this.zookeeperHost = zookeeperHost; + } } public int getZookeeperPort() { @@ -141,7 +160,9 @@ public class KafkaConfiguration { } public void setZookeeperPort(int zookeeperPort) { - this.zookeeperPort = zookeeperPort; + if (this.zookeeperConnect == null) { + this.zookeeperPort = zookeeperPort; + } } public String getGroupId() { http://git-wip-us.apache.org/repos/asf/camel/blob/64696f05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java index bfc73b2..3ff40eb 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java @@ -46,11 +46,8 @@ public class KafkaConsumer extends DefaultConsumer { super(endpoint, processor); this.endpoint = endpoint; this.processor = processor; - if (endpoint.getZookeeperHost() == null) { - throw new IllegalArgumentException("zookeeper host must be specified"); - } - if (endpoint.getZookeeperPort() == 0) { - throw new IllegalArgumentException("zookeeper port must be specified"); + if (endpoint.getZookeeperConnect() == null) { + throw new IllegalArgumentException("zookeeper host or zookeeper connect must be specified"); } if (endpoint.getGroupId() == null) { throw new IllegalArgumentException("groupId must not be null"); @@ -59,7 +56,7 @@ public class KafkaConsumer extends DefaultConsumer { Properties getProps() { Properties props = endpoint.getConfiguration().createConsumerProperties(); - props.put("zookeeper.connect", endpoint.getZookeeperHost() + ":" + endpoint.getZookeeperPort()); + props.put("zookeeper.connect", endpoint.getZookeeperConnect()); props.put("group.id", endpoint.getGroupId()); return props; } http://git-wip-us.apache.org/repos/asf/camel/blob/64696f05/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 002d15e..deed68a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -102,6 +102,14 @@ public class KafkaEndpoint extends DefaultEndpoint { // Delegated properties from the configuration //------------------------------------------------------------------------- + public String getZookeeperConnect() { + return configuration.getZookeeperConnect(); + } + + public void setZookeeperConnect(String zookeeperConnect) { + configuration.setZookeeperConnect(zookeeperConnect); + } + public String getZookeeperHost() { return configuration.getZookeeperHost(); } @@ -417,4 +425,5 @@ public class KafkaEndpoint extends DefaultEndpoint { public int getRequestTimeoutMs() { return configuration.getRequestTimeoutMs(); } + } http://git-wip-us.apache.org/repos/asf/camel/blob/64696f05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java index c11edaf..15cef7a 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaComponentTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class KafkaComponentTest { @@ -43,6 +44,7 @@ public class KafkaComponentTest { String remaining = "broker1:12345,broker2:12566"; KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("somehost:2987", endpoint.getZookeeperConnect()); assertEquals("somehost", endpoint.getZookeeperHost()); assertEquals(2987, endpoint.getZookeeperPort()); assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); @@ -50,4 +52,28 @@ public class KafkaComponentTest { assertEquals(3, endpoint.getConsumerStreams()); assertEquals("com.class.Party", endpoint.getPartitioner()); } + + @Test + public void testZookeeperConnectPropertyOverride() throws Exception { + Map params = new HashMap(); + params.put("zookeeperConnect", "thehost:2181/chroot"); + params.put("zookeeperHost", "somehost"); + params.put("zookeeperPort", 2987); + params.put("portNumber", 14123); + params.put("consumerStreams", "3"); + params.put("topic", "mytopic"); + params.put("partitioner", "com.class.Party"); + + String uri = "kafka:broker1:12345,broker2:12566"; + String remaining = "broker1:12345,broker2:12566"; + + KafkaEndpoint endpoint = new KafkaComponent(context).createEndpoint(uri, remaining, params); + assertEquals("thehost:2181/chroot", endpoint.getZookeeperConnect()); + assertNull(endpoint.getZookeeperHost()); + assertEquals(-1, endpoint.getZookeeperPort()); + assertEquals("broker1:12345,broker2:12566", endpoint.getBrokers()); + assertEquals("mytopic", endpoint.getTopic()); + assertEquals(3, endpoint.getConsumerStreams()); + assertEquals("com.class.Party", endpoint.getPartitioner()); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/64696f05/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java index 740f116..b51c09e 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaConsumerTest.java @@ -28,15 +28,21 @@ public class KafkaConsumerTest { private Processor processor = mock(Processor.class); @Test(expected = IllegalArgumentException.class) - public void consumerRequiresZookeeperHost() throws Exception { - Mockito.when(endpoint.getZookeeperPort()).thenReturn(2181); + public void consumerRequiresZookeeperConnect() throws Exception { + Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); new KafkaConsumer(endpoint, processor); } @Test(expected = IllegalArgumentException.class) - public void consumerRequiresZookeeperPort() throws Exception { - Mockito.when(endpoint.getZookeeperHost()).thenReturn("localhost"); + public void consumerRequiresGroupId() throws Exception { + Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot"); new KafkaConsumer(endpoint, processor); } + @Test + public void consumerOnlyRequiresZookeeperConnectAndGroupId() throws Exception { + Mockito.when(endpoint.getGroupId()).thenReturn("groupOne"); + Mockito.when(endpoint.getZookeeperConnect()).thenReturn("localhost:2181/chroot"); + new KafkaConsumer(endpoint, processor); + } }