camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/4] git commit: Seda consumer should validate that thet are all have same multiple consumers option as they cannot have different values. This is per queue.
Date Thu, 13 Jun 2013 12:04:50 GMT
Updated Branches:
  refs/heads/master 011002fd6 -> 0cad912e0


Seda consumer should validate that thet are all have same multiple consumers option as they
cannot have different values. This is per queue.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ad43a48a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ad43a48a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ad43a48a

Branch: refs/heads/master
Commit: ad43a48a65a42847dc76b6f3a8e78f9387ed6b72
Parents: de9de10
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Wed Jun 12 16:00:29 2013 -0400
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Jun 12 16:00:29 2013 -0400

----------------------------------------------------------------------
 .../camel/component/seda/SedaComponent.java     | 22 +++++-
 .../camel/component/seda/SedaEndpoint.java      | 13 +++-
 ...edaQueueMultipleConsumersDifferenceTest.java | 70 ++++++++++++++++++++
 3 files changed, 102 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ad43a48a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
index 2578da9..77a8177 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaComponent.java
@@ -61,7 +61,15 @@ public class SedaComponent extends UriEndpointComponent {
         return defaultConcurrentConsumers;
     }
 
+    /**
+     * @deprecated use {@link #getOrCreateQueue(String, Integer, Boolean)}
+     */
+    @Deprecated
     public synchronized QueueReference getOrCreateQueue(String uri, Integer size) {
+        return getOrCreateQueue(uri, size, null);
+    }
+
+    public synchronized QueueReference getOrCreateQueue(String uri, Integer size, Boolean
multipleConsumers) {
         String key = getQueueKey(uri);
 
         QueueReference ref = getQueues().get(key);
@@ -97,7 +105,7 @@ public class SedaComponent extends UriEndpointComponent {
         log.debug("Created queue {} with size {}", key, size);
 
         // create and add a new reference queue
-        ref = new QueueReference(queue, size);
+        ref = new QueueReference(queue, size, multipleConsumers);
         ref.addReference();
         getQueues().put(key, ref);
 
@@ -108,6 +116,10 @@ public class SedaComponent extends UriEndpointComponent {
         return queues;
     }
 
+    public QueueReference getQueueReference(String key) {
+        return queues.get(key);
+    }
+
     @Override
     protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object>
parameters) throws Exception {
         int consumers = getAndRemoveParameter(parameters, "concurrentConsumers", Integer.class,
defaultConcurrentConsumers);
@@ -165,10 +177,12 @@ public class SedaComponent extends UriEndpointComponent {
         private final BlockingQueue<Exchange> queue;
         private volatile int count;
         private Integer size;
+        private Boolean multipleConsumers;
 
-        private QueueReference(BlockingQueue<Exchange> queue, Integer size) {
+        private QueueReference(BlockingQueue<Exchange> queue, Integer size, Boolean
multipleConsumers) {
             this.queue = queue;
             this.size = size;
+            this.multipleConsumers = multipleConsumers;
         }
         
         void addReference() {
@@ -195,6 +209,10 @@ public class SedaComponent extends UriEndpointComponent {
             return size;
         }
 
+        public Boolean getMultipleConsumers() {
+            return multipleConsumers;
+        }
+
         /**
          * Gets the queue
          */

http://git-wip-us.apache.org/repos/asf/camel/blob/ad43a48a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
index 56a5d4e..656736c 100644
--- a/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
@@ -106,6 +106,17 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
     }
 
     public Consumer createConsumer(Processor processor) throws Exception {
+        if (getComponent() != null) {
+            // all consumers must match having the same multipleConsumers options
+            String key = getComponent().getQueueKey(getEndpointUri());
+            SedaComponent.QueueReference ref = getComponent().getQueueReference(key);
+            if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers())
{
+                // there is already a multiple consumers, so make sure they matches
+                throw new IllegalArgumentException("Cannot use existing queue " + key + "
as the existing queue multiple consumers "
+                        + ref.getMultipleConsumers() + " does not match given multiple consumers
" + multipleConsumers);
+            }
+        }
+
         Consumer answer = new SedaConsumer(this, processor);
         configureConsumer(answer);
         return answer;
@@ -119,7 +130,7 @@ public class SedaEndpoint extends DefaultEndpoint implements BrowsableEndpoint,
             if (getComponent() != null) {
                 // use null to indicate default size (= use what the existing queue has been
configured with)
                 Integer size = getSize() == Integer.MAX_VALUE ? null : getSize();
-                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(),
size);
+                SedaComponent.QueueReference ref = getComponent().getOrCreateQueue(getEndpointUri(),
size, isMultipleConsumers());
                 queue = ref.getQueue();
                 String key = getComponent().getQueueKey(getEndpointUri());
                 LOG.info("Endpoint {} is using shared queue: {} with size: {}", new Object[]{this,
key, ref.getSize() !=  null ? ref.getSize() : Integer.MAX_VALUE});

http://git-wip-us.apache.org/repos/asf/camel/blob/ad43a48a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
new file mode 100644
index 0000000..98856df
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/seda/SameSedaQueueMultipleConsumersDifferenceTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.seda;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.ResolveEndpointFailedException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class SameSedaQueueMultipleConsumersDifferenceTest extends ContextTestSupport {
+
+    public void testSameOptions() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo?multipleConsumers=true", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testSameOptionsProducerStillOkay() throws Exception {
+        getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:bar").expectedBodiesReceived("Hello World");
+
+        template.sendBody("seda:foo", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAddConsumer() throws Exception {
+        try {
+            context.addRoutes(new RouteBuilder() {
+                @Override
+                public void configure() throws Exception {
+                    from("seda:foo").routeId("fail").to("mock:fail");
+                }
+            });
+            fail("Should have thrown exception");
+        } catch (IllegalArgumentException e) {
+            assertEquals("Cannot use existing queue seda://foo as the existing queue multiple
consumers true does not match given multiple consumers false", e.getMessage());
+        }
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:foo?multipleConsumers=true").routeId("foo").to("mock:foo");
+                from("seda:foo?multipleConsumers=true").routeId("bar").to("mock:bar");
+            }
+        };
+    }
+}


Mime
View raw message