camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [3/5] camel git commit: CAMEL-9957: Fixed as it was not correct.
Date Fri, 13 May 2016 09:32:01 GMT
CAMEL-9957: Fixed as it was not correct.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/81e1f8a0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/81e1f8a0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/81e1f8a0

Branch: refs/heads/camel-2.17.x
Commit: 81e1f8a0c8a118262223c6c2c85465735fc07a17
Parents: 772b2c3
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri May 13 10:51:57 2016 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri May 13 11:27:39 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 27 ++++++++++++--------
 .../component/kafka/KafkaProducerTest.java      |  6 +----
 2 files changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/81e1f8a0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0c4013f..6c432d6 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -73,6 +73,7 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor
{
         }
     }
 
+    @SuppressWarnings("unchecked")
     protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getTopic();
         if (!endpoint.isBridgeEndpoint()) {
@@ -103,19 +104,25 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor
{
 
     @Override
     @SuppressWarnings("unchecked")
-    public void process(Exchange exchange) throws CamelException {
-
+    public void process(Exchange exchange) throws Exception {
         ProducerRecord record = createRecorder(exchange);
         // Just send out the record in the sync way
-        try {
-            kafkaProducer.send(record).get();
-        } catch (Exception e) {
-            throw new CamelException(e);
-        }
+        kafkaProducer.send(record).get();
     }
 
     @Override
     public boolean process(Exchange exchange, AsyncCallback callback) {
+        // force processing synchronously using different api
+        if (endpoint.isSynchronous()) {
+            try {
+                process(exchange);
+            } catch (Throwable e) {
+                exchange.setException(e);
+            }
+            callback.done(true);
+            return true;
+        }
+
         try {
             ProducerRecord record = createRecorder(exchange);
             kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
@@ -129,10 +136,10 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor
{
         }
     }
 
-    class KafkaProducerCallBack implements Callback {
+    private final class KafkaProducerCallBack implements Callback {
 
-        private Exchange exchange;
-        private AsyncCallback callback;
+        private final Exchange exchange;
+        private final AsyncCallback callback;
 
         KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/81e1f8a0/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 68798f3..40f2113 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -79,7 +79,7 @@ public class KafkaProducerTest {
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
     }
 
-    @Test(expected=CamelException.class)
+    @Test(expected = Exception.class)
     @SuppressWarnings({"unchecked"})
     public void processSendsMessageWithException() throws Exception {
         endpoint.setTopic("sometopic");
@@ -90,7 +90,6 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
-
     }
 
     @Test
@@ -103,10 +102,8 @@ public class KafkaProducerTest {
         producer.process(exchange, callback);
 
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class),
Matchers.any(Callback.class));
-
     }
 
-
     @Test
     public void processAsyncSendsMessageWithException() throws Exception {
 
@@ -126,7 +123,6 @@ public class KafkaProducerTest {
         Mockito.verify(callback).done(Matchers.eq(true));
     }
 
-
     @Test
     public void processSendsMessageWithTopicHeaderAndNoTopicInEndPoint() throws Exception
{
         endpoint.setTopic(null);


Mime
View raw message