camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-9399 Implementation and happy path integration test
Date Mon, 21 Dec 2015 14:38:22 GMT
Repository: camel
Updated Branches:
  refs/heads/master b7f4e7ac2 -> bcd00fa15


CAMEL-9399 Implementation and happy path integration test


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e3751db8
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e3751db8
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e3751db8

Branch: refs/heads/master
Commit: e3751db814bc418fe265159992d7c5df678ebbd2
Parents: b7f4e7a
Author: Miloš Milivojević <mmilivojevic@deployinc.com>
Authored: Mon Dec 21 10:38:29 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Dec 21 15:34:24 2015 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQEndpoint.java    | 51 ++++++++++-
 .../rabbitmq/RabbitMQProducerIntTest.java       | 96 ++++++++++++++++----
 2 files changed, 125 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e3751db8/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index bcf0e7f..f7a02f4 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.rabbitmq;
 
+import javax.net.ssl.TrustManager;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -34,8 +35,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 
-import javax.net.ssl.TrustManager;
-
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Address;
 import com.rabbitmq.client.Channel;
@@ -43,7 +42,6 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.LongString;
-
 import org.apache.camel.Consumer;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -159,6 +157,10 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private long requestTimeoutCheckerInterval = 1000;
     @UriParam
     private boolean transferException;
+    @UriParam(label = "producer")
+    private boolean publisherAcknowledgements;
+    @UriParam(label = "producer")
+    private long publisherAcknowledgementsTimeout;
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
@@ -166,7 +168,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq
     private String replyTo;
 
-    private RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
+    private final RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter();
     
 
     public RabbitMQEndpoint() {
@@ -305,7 +307,16 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
         Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE,
isImmediate(), Boolean.class);
 
         LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange,
properties.getCorrelationId());
+
+        if (isPublisherAcknowledgements()) {
+            channel.confirmSelect();
+        }
+        
         channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties,
body);
+
+        if (isPublisherAcknowledgements()) {
+            waitForConfirmationFor(channel, camelExchange);
+        }
     }
 
     /**
@@ -318,6 +329,16 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
             exchangeName = getExchangeName();
         }
         return exchangeName;
+    }      
+    
+    private void waitForConfirmationFor(final Channel channel, final Exchange camelExchange)
throws IOException {
+        try {
+            LOG.debug("Waiting for publisher acknowledgements for {}ms", getPublisherAcknowledgementsTimeout());
+            channel.waitForConfirmsOrDie(getPublisherAcknowledgementsTimeout());
+        } catch (InterruptedException | TimeoutException e) {
+            LOG.warn("Acknowledgement error for {}", camelExchange);
+            throw new RuntimeCamelException(e);
+        }
     }
 
     @Override
@@ -978,6 +999,28 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     }
 
     /**
+     * When true, the message will be published with <a href="https://www.rabbitmq.com/confirms.html">publisher
acknowledgements</a> turned on
+     */
+    public boolean isPublisherAcknowledgements() {
+        return publisherAcknowledgements;
+    }
+
+    public void setPublisherAcknowledgements(final boolean publisherAcknowledgements) {
+        this.publisherAcknowledgements = publisherAcknowledgements;
+    }
+
+    /**
+     * The amount of time in milliseconds to wait for a basic.ack response from RabbitMQ
server
+     */
+    public long getPublisherAcknowledgementsTimeout() {
+        return publisherAcknowledgementsTimeout;
+    }
+
+    public void setPublisherAcknowledgementsTimeout(final long publisherAcknowledgementsTimeout)
{
+        this.publisherAcknowledgementsTimeout = publisherAcknowledgementsTimeout;
+    }
+
+    /**
      * Get replyToType for inOut exchange
      */
     public String getReplyToType() {

http://git-wip-us.apache.org/repos/asf/camel/blob/e3751db8/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 377f8a9..2684c20 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -27,67 +27,127 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DefaultConsumer;
 import com.rabbitmq.client.Envelope;
-
-import org.apache.camel.Endpoint;
-import org.apache.camel.EndpointInject;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 
 public class RabbitMQProducerIntTest extends CamelTestSupport {
     private static final String EXCHANGE = "ex1";
+    private static final String ROUTE = "route1";
+    private static final String BASIC_URI_FORMAT = "rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
+    private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, EXCHANGE, ROUTE);
+    private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + "&mandatory=true&publisherAcknowledgements=true";
+    private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = String.format(BASIC_URI_FORMAT,
EXCHANGE, "route2") + "&publisherAcknowledgements=true";
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate template;
 
+    @Produce(uri = "direct:start-with-confirms")
+    protected ProducerTemplate templateWithConfirms;
     
-    @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?routingKey=route1&username=cameltest&password=cameltest")
-    private Endpoint to;
+    @Produce(uri = "direct:start-with-confirms-bad-route")
+    protected ProducerTemplate templateWithConfirmsAndBadRoute;
 
+    private Connection connection;
+    private Channel channel;
     
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
+        context().setTracing(true);
         return new RouteBuilder() {
 
             @Override
             public void configure() throws Exception {
-                from("direct:start").to(to);
+                from("direct:start").to(BASIC_URI);
+                from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
+                from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
             }
         };
     }
 
+    @Before
+    public void setUpRabbitMQ() throws Exception {
+        connection = createTestConnection();
+        channel = connection.createChannel();
+        channel.queueDeclare("sammyq", false, false, true, null);
+        channel.queueBind("sammyq", EXCHANGE, ROUTE);
+    }
+
+    @After
+    public void tearDownRabbitMQ() throws Exception {
+        channel.abort();
+        connection.abort();
+    }
+
     @Test
     public void producedMessageIsReceived() throws InterruptedException, IOException, TimeoutException
{
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received));
+
+        template.sendBodyAndHeader("new message", RabbitMQConstants.EXCHANGE_NAME, "ex1");
+
+        assertThatBodiesReceivedIn(received, "new message");
+    }
 
+    private void assertThatBodiesReceivedIn(final List<String> received, final String...
expected) throws InterruptedException {
+        Thread.sleep(500);
+        
+        assertListSize(received, expected.length);
+        for (String body : expected) {
+            assertEquals(body, received.get(0));
+        }
+    }
+
+    @Test
+    public void producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabled() throws
InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received));
+
+        templateWithConfirms.sendBodyAndHeader("publisher ack message", RabbitMQConstants.EXCHANGE_NAME,
"ex1");
+
+        assertThatBodiesReceivedIn(received, "publisher ack message");
+    }
+
+    @Test
+    public void producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabledAndBadRoutingKeyIsUsed()
throws InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        channel.basicConsume("sammyq", true, new ArrayPopulatingConsumer(received));
+        
+        templateWithConfirmsAndBadRoute.sendBody("publisher ack message");
+        
+        assertThatBodiesReceivedIn(received);
+    }
+
+    private Connection createTestConnection() throws IOException, TimeoutException {
         ConnectionFactory factory = new ConnectionFactory();
         factory.setHost("localhost");
         factory.setPort(5672);
         factory.setUsername("cameltest");
         factory.setPassword("cameltest");
         factory.setVirtualHost("/");
-        Connection conn = factory.newConnection();
+        return factory.newConnection();
+    }
 
-        final List<Envelope> received = new ArrayList<Envelope>();
+    private class ArrayPopulatingConsumer extends DefaultConsumer {
+        private final List<String> received;
+
+        public ArrayPopulatingConsumer(final List<String> received) {
+            super(RabbitMQProducerIntTest.this.channel);
+            this.received = received;
+        }
 
-        Channel channel = conn.createChannel();
-        channel.queueDeclare("sammyq", false, false, true, null);
-        channel.queueBind("sammyq", EXCHANGE, "route1");
-        channel.basicConsume("sammyq", true, new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(String consumerTag,
                                        Envelope envelope,
                                        AMQP.BasicProperties properties,
                                        byte[] body) throws IOException {
-                received.add(envelope);
+            received.add(new String(body));
             }
-        });
-
-        template.sendBodyAndHeader("new message", RabbitMQConstants.EXCHANGE_NAME, "ex1");
-        Thread.sleep(500);
-        assertEquals(1, received.size());
     }
 }
 


Mime
View raw message