camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [2/2] git commit: Fixe bunch of eclipse warning of camel-kafka component
Date Tue, 01 Apr 2014 08:45:02 GMT
Fixe bunch of eclipse warning of camel-kafka component


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/21e3688a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/21e3688a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/21e3688a

Branch: refs/heads/camel-2.13.x
Commit: 21e3688a0c421c36dcec01ad1fe985aa39e32d0b
Parents: 07a9a10
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Tue Apr 1 15:53:55 2014 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Tue Apr 1 16:44:11 2014 +0800

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java    | 11 +++-----
 .../camel/component/kafka/KafkaProducerIT.java  | 15 +++++++----
 .../component/kafka/KafkaProducerTest.java      | 28 +++++++++++---------
 3 files changed, 29 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/21e3688a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 990e942..3087a14 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -27,20 +27,15 @@ import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
-
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  *
  */
 public class KafkaConsumer extends DefaultConsumer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
-
     protected ExecutorService executor;
     private final KafkaEndpoint endpoint;
     private final Processor processor;
@@ -82,7 +77,7 @@ public class KafkaConsumer extends DefaultConsumer {
         List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(endpoint.getTopic());
 
         executor = endpoint.createExecutor();
-        for (final KafkaStream stream : streams) {
+        for (final KafkaStream<byte[], byte[]> stream : streams) {
             executor.submit(new ConsumerTask(stream));
         }
     }
@@ -107,9 +102,9 @@ public class KafkaConsumer extends DefaultConsumer {
 
     class ConsumerTask implements Runnable {
 
-        private KafkaStream stream;
+        private KafkaStream<byte[], byte[]> stream;
 
-        public ConsumerTask(KafkaStream stream) {
+        public ConsumerTask(KafkaStream<byte[], byte[]> stream) {
             this.stream = stream;
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/21e3688a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
index 36bb6c4..f77d91a 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerIT.java
@@ -41,15 +41,19 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 /**
  * The Producer IT tests require a Kafka broker running on 9092 and a zookeeper instance
running on 2181.
  * The broker must have a topic called test created.
  */
 public class KafkaProducerIT extends CamelTestSupport {
-
+    
     public static final String TOPIC = "test";
     public static final String TOPIC_IN_HEADER = "testHeader";
 
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerIT.class);
+
     @EndpointInject(uri = "kafka:localhost:9092?topic=" + TOPIC 
         + "&partitioner=org.apache.camel.component.kafka.SimplePartitioner&serializerClass=kafka.serializer.StringEncoder&requestRequiredAcks=1")
     private Endpoint to;
@@ -111,10 +115,10 @@ public class KafkaProducerIT extends CamelTestSupport {
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConsumer.createMessageStreams(topicCountMap);
 
         ExecutorService executor = Executors.newFixedThreadPool(10);
-        for (final KafkaStream stream : consumerMap.get(TOPIC)) {
+        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC)) {
             executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
         }
-        for (final KafkaStream stream : consumerMap.get(TOPIC_IN_HEADER)) {
+        for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(TOPIC_IN_HEADER))
{
             executor.submit(new KakfaTopicConsumer(stream, messagesLatch));
         }
     }
@@ -131,10 +135,10 @@ public class KafkaProducerIT extends CamelTestSupport {
     }
 
     private static class KakfaTopicConsumer implements Runnable {
-        private final KafkaStream stream;
+        private final KafkaStream<byte[], byte[]> stream;
         private final CountDownLatch latch;
 
-        public KakfaTopicConsumer(KafkaStream stream, CountDownLatch latch) {
+        public KakfaTopicConsumer(KafkaStream<byte[], byte[]> stream, CountDownLatch
latch) {
             this.stream = stream;
             this.latch = latch;
         }
@@ -144,6 +148,7 @@ public class KafkaProducerIT extends CamelTestSupport {
             ConsumerIterator<byte[], byte[]> it = stream.iterator();
             while (it.hasNext()) {
                 String msg = new String(it.next().message());
+                LOG.info("Get the message" + msg);
                 latch.countDown();
             }
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/21e3688a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index acdfc60..3c71417 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -42,6 +42,7 @@ public class KafkaProducerTest {
     private Exchange exchange = Mockito.mock(Exchange.class);
     private Message in = new DefaultMessage();
 
+    @SuppressWarnings({"unchecked"})
     public KafkaProducerTest() throws IllegalAccessException, InstantiationException, ClassNotFoundException,
             URISyntaxException {
         endpoint = new KafkaEndpoint("kafka:broker1:1234,broker2:4567?topic=sometopic",
@@ -59,6 +60,7 @@ public class KafkaProducerTest {
     }
 
     @Test
+    @SuppressWarnings({"unchecked"})
     public void processSendsMesssage() throws Exception {
 
         endpoint.setTopic("sometopic");
@@ -80,10 +82,7 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
-        Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals("4", captor.getValue().key());
-        assertEquals("anotherTopic", captor.getValue().topic());
+        verifySendMessage("4", "anotherTopic");
     }
 
     @Test
@@ -96,10 +95,8 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
 
-        ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
-        Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals("4", captor.getValue().key());
-        assertEquals("anotherTopic", captor.getValue().topic());
+        verifySendMessage("4", "anotherTopic");
+      
     }
 
     @Test(expected = CamelException.class)
@@ -116,19 +113,26 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getIn()).thenReturn(in);
         producer.process(exchange);
     }
-
+    
     @Test
+    
     public void processSendsMesssageWithPartitionKeyHeader() throws Exception {
 
-        endpoint.setTopic("sometopic");
+        endpoint.setTopic("someTopic");
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
 
+        verifySendMessage("4", "someTopic");
+        
+    }
+    
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    protected void verifySendMessage(String key, String topic) {
         ArgumentCaptor<KeyedMessage> captor = ArgumentCaptor.forClass(KeyedMessage.class);
         Mockito.verify(producer.producer).send(captor.capture());
-        assertEquals("4", captor.getValue().key());
-        assertEquals("sometopic", captor.getValue().topic());
+        assertEquals(key, captor.getValue().key());
+        assertEquals(topic, captor.getValue().topic());
     }
 }


Mime
View raw message