camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ningji...@apache.org
Subject [1/2] git commit: CAMEL-7556 Multiple concurrent consumer threads
Date Mon, 30 Jun 2014 15:15:05 GMT
Repository: camel
Updated Branches:
  refs/heads/master 6faf7f403 -> 28a8d00d3


CAMEL-7556 Multiple concurrent consumer threads


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3c4f8331
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3c4f8331
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3c4f8331

Branch: refs/heads/master
Commit: 3c4f8331bff02b5098187475e5de3f419d7dd718
Parents: 6faf7f4
Author: Gerald Quintana <gerald.quintana@zenika.com>
Authored: Sun Jun 29 18:12:42 2014 +0200
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Mon Jun 30 21:39:05 2014 +0800

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQConsumer.java    | 106 +++++++++++++------
 .../component/rabbitmq/RabbitMQEndpoint.java    |  15 ++-
 .../component/rabbitmq/RabbitMQLoadIntTest.java | 101 ++++++++++++++++++
 3 files changed, 186 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 0f1d85f..91da43b 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -17,6 +17,8 @@
 package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -25,6 +27,7 @@ import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.Envelope;
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -32,17 +35,18 @@ import org.apache.camel.impl.DefaultConsumer;
 public class RabbitMQConsumer extends DefaultConsumer {
     ExecutorService executor;
     Connection conn;
-    Channel channel;
-
     private int closeTimeout = 30 * 1000;
-    
     private final RabbitMQEndpoint endpoint;
     /**
      * Task in charge of starting consumer
      */
     private StartConsumerCallable startConsumerCallable;
+	/**
+	 * Running consumers
+	 */
+	private final List<RabbitConsumer> consumers=new ArrayList<RabbitConsumer>();
 
-    public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
+	public RabbitMQConsumer(RabbitMQEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
         this.endpoint = endpoint;
     }
@@ -54,39 +58,58 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * Open connection and channel
+     * Open connection
      */
-    private void openConnectionAndChannel() throws IOException {
+    private void openConnection() throws IOException {
         log.trace("Creating connection...");
         this.conn = getEndpoint().connect(executor);
         log.debug("Created connection: {}", conn);
-
-        log.trace("Creating channel...");
-        this.channel = conn.createChannel();
-        log.debug("Created channel: {}", channel);
-        // setup the basicQos
-        if (endpoint.isPrefetchEnabled()) {
-            channel.basicQos(endpoint.getPrefetchSize(), 
-                             endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
-        }
-        getEndpoint().declareExchangeAndQueue(channel);
     }
 
-    /**
-     * If needed, create Exchange and Queue, then add message listener
+	/**
+	 * Open channel
+	 */
+	private Channel openChannel() throws IOException {
+		log.trace("Creating channel...");
+		Channel channel = conn.createChannel();
+		log.debug("Created channel: {}", channel);
+		// setup the basicQos
+		if (endpoint.isPrefetchEnabled()) {
+			channel.basicQos(endpoint.getPrefetchSize(),
+					endpoint.getPrefetchCount(), endpoint.isPrefetchGlobal());
+		}
+		return channel;
+	}
+	/**
+	 * Add a consummer thread for given channel
+	 */
+	private void startConsumers() throws IOException {
+		// First channel used to declare Exchange and Queue
+		Channel channel=openChannel();
+		endpoint.declareExchangeAndQueue(channel);
+		startConsumer(channel);
+		// Other channels
+		for(int i=1; i<endpoint.getConcurrentConsumers();i++) {
+			channel=openChannel();
+			startConsumer(channel);
+		}
+	}
+	/**
+     * Add a consummer thread for given channel
      */
-    private void addConsumer() throws IOException {
-        channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(),
-                new RabbitConsumer(this, channel));
-    }
+    private void startConsumer(Channel channel) throws IOException {
+		RabbitConsumer consumer = new RabbitConsumer(this, channel);
+		consumer.start();
+		this.consumers.add(consumer);
+	}
 
     @Override
     protected void doStart() throws Exception {
         executor = endpoint.createExecutor();
         log.debug("Using executor {}", executor);
         try {
-            openConnectionAndChannel();
-            addConsumer();
+            openConnection();
+            startConsumers();
         } catch (Exception e) {
             // Open connection, and start message listener in background
             Integer networkRecoveryInterval = getEndpoint().getNetworkRecoveryInterval();
@@ -97,17 +120,16 @@ public class RabbitMQConsumer extends DefaultConsumer {
     }
 
     /**
-     * If needed, close Connection and Channel
+     * If needed, close Connection and Channels
      */
     private void closeConnectionAndChannel() throws IOException {
         if (startConsumerCallable != null) {
             startConsumerCallable.stop();
         }
-        if (channel != null) {
-            log.debug("Closing channel: {}", channel);
-            channel.close();
-            channel = null;
-        }
+		for(RabbitConsumer consumer: this.consumers) {
+			consumer.stop();
+		}
+		this.consumers.clear();
         if (conn != null) {
             log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
             conn.close(closeTimeout);
@@ -133,7 +155,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
 
         private final RabbitMQConsumer consumer;
         private final Channel channel;
-
+		private String tag;
         /**
          * Constructs a new instance and records its association to the
          * passed-in channel.
@@ -211,7 +233,23 @@ public class RabbitMQConsumer extends DefaultConsumer {
             }
         }
 
-    }
+		/**
+		 * Bind consumer to channel
+		 */
+		public void start() throws IOException {
+			tag=channel.basicConsume(endpoint.getQueue(), endpoint.isAutoAck(), this);
+		}
+
+		/**
+		 * Unbind consumer from channel
+		 */
+		public void stop() throws IOException{
+			if (tag!=null) {
+				channel.basicCancel(tag);
+			}
+			channel.close();
+		}
+	}
 
     /**
      * Task in charge of opening connection and adding listener when consumer is started
@@ -233,7 +271,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
             // Reconnection loop
             while (running.get() && connectionFailed) {
                 try {
-                    openConnectionAndChannel();
+                    openConnection();
                     connectionFailed = false;
                 } catch (Exception e) {
                     log.debug("Connection failed, will retry in " + connectionRetryInterval
+ "ms", e);
@@ -241,7 +279,7 @@ public class RabbitMQConsumer extends DefaultConsumer {
                 }
             }
             if (!connectionFailed) {
-                addConsumer();
+                startConsumers();
             }
             stop();
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index e475819..4ee6a7c 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -81,8 +81,11 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     private int prefetchCount;
     //Default value in RabbitMQ is false.
     private boolean prefetchGlobal;
-
-    public RabbitMQEndpoint() {
+	/**
+	 * Number of concurrent consumer threads
+	 */
+	private int concurrentConsumers = 1;
+	public RabbitMQEndpoint() {
     }
 
     public RabbitMQEndpoint(String endpointUri, RabbitMQComponent component) throws URISyntaxException
{
@@ -460,4 +463,12 @@ public class RabbitMQEndpoint extends DefaultEndpoint {
     public boolean isPrefetchGlobal() {
         return prefetchGlobal;
     }
+
+	public int getConcurrentConsumers() {
+		return concurrentConsumers;
+	}
+
+	public void setConcurrentConsumers(int concurrentConsumers) {
+		this.concurrentConsumers = concurrentConsumers;
+	}
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/3c4f8331/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
new file mode 100644
index 0000000..adf1367
--- /dev/null
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQLoadIntTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import com.rabbitmq.client.AlreadyClosedException;
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test to check that RabbitMQ Endpoint is able handle heavy load using multiple
producers and
+ * consumers
+ */
+public class RabbitMQLoadIntTest extends CamelTestSupport {
+	private static final int PRODUCER_COUNT=10;
+	private static final int CONSUMER_COUNT=10;
+	private static final int MESSAGE_COUNT=100;
+	public static final String ROUTING_KEY = "rk4";
+	@Produce(uri = "direct:rabbitMQ")
+    protected ProducerTemplate directProducer;
+
+    @EndpointInject(uri = "rabbitmq:localhost:5672/ex4?username=cameltest&password=cameltest"
+                          + "&queue=q4&routingKey="+ROUTING_KEY
+			+"&threadPoolSize="+(CONSUMER_COUNT+5)
+			+"&concurrentConsumers="+CONSUMER_COUNT)
+    private Endpoint rabbitMQEndpoint;
+
+    @EndpointInject(uri = "mock:producing")
+    private MockEndpoint producingMockEndpoint;
+
+    @EndpointInject(uri = "mock:consuming")
+    private MockEndpoint consumingMockEndpoint;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:rabbitMQ")
+                        .id("producingRoute")
+                        .log("Sending message")
+                        .inOnly(rabbitMQEndpoint)
+                        .to(producingMockEndpoint);
+                from(rabbitMQEndpoint)
+                        .id("consumingRoute")
+                        .log("Receiving message")
+                        .to(consumingMockEndpoint);
+            }
+        };
+    }
+
+    @Test
+    public void testSendEndReceive() throws Exception {
+		// Start producers
+		ExecutorService executorService= Executors.newFixedThreadPool(PRODUCER_COUNT);
+		List<Future> futures=new ArrayList<Future>(PRODUCER_COUNT);
+		for(int i = 0 ; i < PRODUCER_COUNT; i++) {
+			futures.add(executorService.submit(new Runnable() {
+				@Override
+				public void run() {
+					for (int i = 0; i < MESSAGE_COUNT; i++) {
+						directProducer.sendBodyAndHeader("Message #" + i, RabbitMQConstants.ROUTING_KEY, ROUTING_KEY);
+					}
+				}
+			}));
+		}
+		// Wait for producers to end
+		for(Future future:futures) {
+			future.get(5, TimeUnit.SECONDS);
+		}
+		// Check message count
+        producingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
+        consumingMockEndpoint.expectedMessageCount(PRODUCER_COUNT * MESSAGE_COUNT);
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
+    }
+}


Mime
View raw message