Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 278F3200C6F for ; Mon, 24 Apr 2017 13:34:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 261B0160BB2; Mon, 24 Apr 2017 11:34:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2DF7E160B99 for ; Mon, 24 Apr 2017 13:34:41 +0200 (CEST) Received: (qmail 76645 invoked by uid 500); 24 Apr 2017 11:34:40 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 76632 invoked by uid 99); 24 Apr 2017 11:34:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Apr 2017 11:34:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87D00E178B; Mon, 24 Apr 2017 11:34:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Mon, 24 Apr 2017 11:34:39 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] camel git commit: CAMEL-1193: Make kafka easier to use when routing between topics to avoid the header topic causing Camel to send the message to itself instead of the endpoint topic name as users would expect. archived-at: Mon, 24 Apr 2017 11:34:42 -0000 Repository: camel Updated Branches: refs/heads/master bab5b27bc -> 6243402b2 CAMEL-1193: Make kafka easier to use when routing between topics to avoid the header topic causing Camel to send the message to itself instead of the endpoint topic name as users would expect. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1d164d54 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1d164d54 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1d164d54 Branch: refs/heads/master Commit: 1d164d54675069fb672be606e99c2c7944cd8f23 Parents: bab5b27 Author: Claus Ibsen Authored: Mon Apr 24 13:03:56 2017 +0200 Committer: Claus Ibsen Committed: Mon Apr 24 13:14:41 2017 +0200 ---------------------------------------------------------------------- .../src/main/docs/kafka-component.adoc | 3 +- .../camel/component/kafka/KafkaEndpoint.java | 18 +++++++- .../camel/component/kafka/KafkaProducer.java | 24 ++++++++++- .../component/kafka/KafkaProducerTest.java | 44 ++++++++++++++++++-- 4 files changed, 83 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/docs/kafka-component.adoc ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/docs/kafka-component.adoc b/components/camel-kafka/src/main/docs/kafka-component.adoc index 4604e9c..dceca6f 100644 --- a/components/camel-kafka/src/main/docs/kafka-component.adoc +++ b/components/camel-kafka/src/main/docs/kafka-component.adoc @@ -66,7 +66,7 @@ with the following path and query parameters: | **topic** | *Required* Name of the topic to use. On the consumer you can use comma to separate multiple topics. A producer can only send a message to a single topic. | | String |======================================================================= -#### Query Parameters (82 parameters): +#### Query Parameters (83 parameters): [width="100%",cols="2,5,^1,2",options="header"] |======================================================================= @@ -100,6 +100,7 @@ with the following path and query parameters: | **exchangePattern** (consumer) | Sets the exchange pattern when the consumer creates an exchange. | | ExchangePattern | **bridgeEndpoint** (producer) | If the option is true then KafkaProducer will ignore the KafkaConstants.TOPIC header setting of the inbound message. | false | boolean | **bufferMemorySize** (producer) | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.This setting should correspond roughly to the total memory the producer will use but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | 33554432 | Integer +| **circularKeyDetection** (producer) | If the option is true then KafkaProducer will detect if the message is attempted to be sent back to the same topic it may come from if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the same as the original kafka consumer topic then the header setting is ignored and the topic of the producer endpoint is used. In other words this avoids sending the same message back to where it came from. This option is not in use if the option bridgeEndpoint is set to true. | true | boolean | **compressionCodec** (producer) | This parameter allows you to specify the compression codec for all data generated by this producer. Valid values are none gzip and snappy. | none | String | **connectionMaxIdleMs** (producer) | Close idle connections after the number of milliseconds specified by this config. | 540000 | Integer | **key** (producer) | The record key (or null if no key is specified). If this option has been configured then it take precedence over header link KafkaConstantsKEY | | String http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java index 46bf844..b4c86ef 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java @@ -20,7 +20,6 @@ import java.lang.reflect.Field; import java.util.Properties; import java.util.concurrent.ExecutorService; -import org.apache.camel.Component; import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -53,6 +52,8 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS private KafkaConfiguration configuration = new KafkaConfiguration(); @UriParam(label = "producer") private boolean bridgeEndpoint; + @UriParam(label = "producer", defaultValue = "true") + private boolean circularTopicDetection = true; public KafkaEndpoint() { } @@ -196,4 +197,19 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS public void setBridgeEndpoint(boolean bridgeEndpoint) { this.bridgeEndpoint = bridgeEndpoint; } + + public boolean isCircularTopicDetection() { + return circularTopicDetection; + } + + /** + * If the option is true, then KafkaProducer will detect if the message is attempted to be sent back to the same topic + * it may come from, if the message was original from a kafka consumer. If the KafkaConstants.TOPIC header is the + * same as the original kafka consumer topic, then the header setting is ignored, and the topic of the producer + * endpoint is used. In other words this avoids sending the same message back to where it came from. + * This option is not in use if the option bridgeEndpoint is set to true. + */ + public void setCircularTopicDetection(boolean circularTopicDetection) { + this.circularTopicDetection = circularTopicDetection; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java index 65fd9d2..01d29b5 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java @@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelException; import org.apache.camel.CamelExchangeException; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.kafka.clients.producer.Callback; @@ -120,9 +121,30 @@ public class KafkaProducer extends DefaultAsyncProducer { @SuppressWarnings("unchecked") protected Iterator createRecorder(Exchange exchange) throws CamelException { String topic = endpoint.getConfiguration().getTopic(); + if (!endpoint.isBridgeEndpoint()) { - topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class); + String headerTopic = exchange.getIn().getHeader(KafkaConstants.TOPIC, String.class); + boolean allowHeader = true; + + // when we do not bridge then detect if we try to send back to ourselves + // which we most likely do not want to do + if (headerTopic != null && endpoint.isCircularTopicDetection()) { + Endpoint from = exchange.getFromEndpoint(); + if (from instanceof KafkaEndpoint) { + String fromTopic = ((KafkaEndpoint) from).getConfiguration().getTopic(); + allowHeader = !headerTopic.equals(fromTopic); + if (!allowHeader) { + log.debug("Circular topic detected from message header." + + " Cannot send to same topic as the message comes from: {}" + + ". Will use endpoint configured topic: {}", from, topic); + } + } + } + if (allowHeader && headerTopic != null) { + topic = headerTopic; + } } + if (topic == null) { throw new CamelExchangeException("No topic key set", exchange); } http://git-wip-us.apache.org/repos/asf/camel/blob/1d164d54/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java index d30e737..9143017 100644 --- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java +++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java @@ -47,6 +47,7 @@ public class KafkaProducerTest { private KafkaProducer producer; private KafkaEndpoint endpoint; + private KafkaEndpoint fromEndpoint; private TypeConverter converter = Mockito.mock(TypeConverter.class); private CamelContext context = Mockito.mock(CamelContext.class); @@ -63,6 +64,8 @@ public class KafkaProducerTest { endpoint = kafka.createEndpoint("kafka:sometopic", "sometopic", new HashMap()); producer = new KafkaProducer(endpoint); + fromEndpoint = kafka.createEndpoint("kafka:fromtopic", "fromtopic", new HashMap()); + RecordMetadata rm = new RecordMetadata(null, 1, 1); Future future = Mockito.mock(Future.class); Mockito.when(future.get()).thenReturn(rm); @@ -204,7 +207,7 @@ public class KafkaProducerTest { } @Test - public void processSendsMesssageWithPartitionKeyHeader() throws Exception { + public void processSendsMessageWithPartitionKeyHeader() throws Exception { endpoint.getConfiguration().setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); @@ -218,7 +221,7 @@ public class KafkaProducerTest { } @Test - public void processSendsMesssageWithMessageKeyHeader() throws Exception { + public void processSendsMessageWithMessageKeyHeader() throws Exception { endpoint.getConfiguration().setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out); @@ -246,8 +249,43 @@ public class KafkaProducerTest { assertRecordMetadataExists(); } + @Test + public void processSendMessageWithCircularDetected() throws Exception { + endpoint.getConfiguration().setTopic("sometopic"); + endpoint.setCircularTopicDetection(true); // enable by default + Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); + Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint); + // this is the from topic that are from the fromEndpoint + in.setHeader(KafkaConstants.TOPIC, "fromtopic"); + in.setHeader(KafkaConstants.KEY, "somekey"); + + producer.process(exchange); + + verifySendMessage("sometopic", "somekey"); + assertRecordMetadataExists(); + } + + @Test + public void processSendMessageWithNoCircularDetected() throws Exception { + endpoint.getConfiguration().setTopic("sometopic"); + endpoint.setCircularTopicDetection(false); // enable by default + Mockito.when(exchange.getIn()).thenReturn(in); + Mockito.when(exchange.getOut()).thenReturn(out); + Mockito.when(exchange.getFromEndpoint()).thenReturn(fromEndpoint); + // this is the from topic that are from the fromEndpoint + in.setHeader(KafkaConstants.TOPIC, "fromtopic"); + in.setHeader(KafkaConstants.KEY, "somekey"); + + producer.process(exchange); + + // will end up sending back to itself at fromtopic + verifySendMessage("fromtopic", "somekey"); + assertRecordMetadataExists(); + } + @Test // Message and Topic Name alone - public void processSendsMesssageWithMessageTopicName() throws Exception { + public void processSendsMessageWithMessageTopicName() throws Exception { endpoint.getConfiguration().setTopic("someTopic"); Mockito.when(exchange.getIn()).thenReturn(in); Mockito.when(exchange.getOut()).thenReturn(out);