camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [5/6] camel git commit: added documentation and test case
Date Tue, 28 Mar 2017 08:24:21 GMT
added documentation and test case


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/94496488
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/94496488
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/94496488

Branch: refs/heads/master
Commit: 944964888ed512501ed7495f51dc7468a3059c46
Parents: bd6b87c
Author: Bryan Love <bryan.love@iovation.com>
Authored: Thu Mar 23 14:04:36 2017 -0700
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Mar 28 10:03:54 2017 +0200

----------------------------------------------------------------------
 .../src/main/docs/sjms-batch-component.adoc     |  3 +-
 .../component/sjms/batch/SjmsBatchConsumer.java |  6 +--
 .../component/sjms/batch/SjmsBatchEndpoint.java | 12 ++++-
 .../sjms/batch/SjmsBatchConsumerTest.java       | 49 +++++++++++++++++++-
 .../component/sjms/support/MockConnection.java  | 43 +++++++++++++++++
 .../sjms/support/MockConnectionFactory.java     | 42 +++++++++++++++++
 .../sjms/support/MockMessageConsumer.java       | 29 ++++++++++++
 .../component/sjms/support/MockSession.java     | 45 ++++++++++++++++++
 8 files changed, 222 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
index cf8f2b2..3ed1d86 100644
--- a/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
+++ b/components/camel-sjms/src/main/docs/sjms-batch-component.adoc
@@ -148,7 +148,7 @@ with the following path and query parameters:
 | **destinationName** | *Required* The destination name. Only queues are supported names
may be prefixed by 'queue:'. |  | String
 |=======================================================================
 
-#### Query Parameters (22 parameters):
+#### Query Parameters (23 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |=======================================================================
@@ -171,6 +171,7 @@ with the following path and query parameters:
 | **asyncStartListener** (advanced) | Whether to startup the consumer message listener asynchronously
when starting a route. For example if a JmsConsumer cannot get a connection to a remote JMS
broker then it may block while retrying and/or failover. This will cause Camel to block while
starting routes. By setting this option to true you will let routes startup while the JmsConsumer
connects to the JMS broker using a dedicated thread in asynchronous mode. If this option is
used then beware that if the connection could not be established then an exception is logged
at WARN level and the consumer will not be able to receive messages; You can then restart
the route to retry. | false | boolean
 | **headerFilterStrategy** (advanced) | To use a custom HeaderFilterStrategy to filter header
to and from Camel message. |  | HeaderFilterStrategy
 | **jmsKeyFormatStrategy** (advanced) | Pluggable strategy for encoding and decoding JMS
keys so they can be compliant with the JMS specification. Camel provides two implementations
out of the box: default and passthrough. The default strategy will safely marshal dots and
hyphens (. and -). The passthrough strategy leaves the key as is. Can be used for JMS brokers
which do not care whether JMS header keys contain illegal characters. You can provide your
own implementation of the org.apache.camel.component.jms.JmsKeyFormatStrategy and refer to
it using the notation. |  | JmsKeyFormatStrategy
+| **keepAliveDelay** (advanced) | The delay in millis between attempts to re-establish a
valid session. If this is a positive value the SjmsBatchConsumer will attempt to create a
new session if it sees an IllegalStateException during message consumption. This delay value
allows you to pause between attempts to prevent spamming the logs. If this is a negative value
(default is -1) then the SjmsBatchConsumer will behave as it always has before - that is it
will bail out and the route will shut down if it sees an IllegalStateException. | -1 | int
 | **messageCreatedStrategy** (advanced) | To use the given MessageCreatedStrategy which are
invoked when Camel creates new instances of javax.jms.Message objects when Camel is sending
a JMS message. |  | MessageCreatedStrategy
 | **recoveryInterval** (advanced) | Specifies the interval between recovery attempts i.e.
when a connection is being refreshed in milliseconds. The default is 5000 ms that is 5 seconds.
| 5000 | int
 | **synchronous** (advanced) | Sets whether synchronous processing should be strictly used
or Camel is allowed to use asynchronous processing (if supported). | false | boolean

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index c386c66..a32cc3d 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -322,12 +322,12 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         }
                     } catch (javax.jms.IllegalStateException ex) {
                         // from consumeBatchesOnLoop
-                        // if keepAliveDelay was not specified just rethrow to break the
loop. This preserves original default behavior
-                        if(keepAliveDelay == -1) throw ex;
+                        // if keepAliveDelay was not specified (defaults to -1) just rethrow
to break the loop. This preserves original default behavior
+                        if(keepAliveDelay < 0) throw ex;
                         // this will log the exception and the parent loop will create a
new session
                         getExceptionHandler().handleException("Exception caught consuming
from " + destinationName, ex);
                         //sleep to avoid log spamming
-                        Thread.sleep(keepAliveDelay);
+                        if(keepAliveDelay > 0) Thread.sleep(keepAliveDelay);
                     } finally {
                         closeJmsSession(session);
                     }

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 2e8affb..395c23f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -399,8 +399,18 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
         return recoveryInterval;
     }
 
+    /**
+     * The delay in millis between attempts to re-establish a valid session.
+     * If this is a positive value the SjmsBatchConsumer will attempt to create a new session
if it sees an IllegalStateException
+     * during message consumption. This delay value allows you to pause between attempts
to prevent spamming the logs.
+     * If this is a negative value (default is -1) then the SjmsBatchConsumer will behave
as it always has before - that is
+     * it will bail out and the route will shut down if it sees an IllegalStateException.
+     */
+    public void setKeepAliveDelay(int keepAliveDelay) {
+         this.keepAliveDelay = keepAliveDelay;
+    }
     public int getKeepAliveDelay() {
-        return recoveryInterval;
+        return keepAliveDelay;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
index e378457..72610de 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumerTest.java
@@ -21,7 +21,6 @@ import java.util.Date;
 import java.util.List;
 import javax.jms.ConnectionFactory;
 
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
@@ -29,6 +28,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.SjmsComponent;
+import org.apache.camel.component.sjms.support.MockConnectionFactory;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.SimpleRegistry;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -48,7 +48,7 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
     public CamelContext createCamelContext() throws Exception {
         SimpleRegistry registry = new SimpleRegistry();
         registry.put("testStrategy", new ListAggregationStrategy());
-        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTcpConnectorUri());
+        ConnectionFactory connectionFactory = new MockConnectionFactory(broker.getTcpConnectorUri());
 
         SjmsComponent sjmsComponent = new SjmsComponent();
         sjmsComponent.setConnectionFactory(connectionFactory);
@@ -338,6 +338,51 @@ public class SjmsBatchConsumerTest extends CamelTestSupport {
 
     }
 
+    @Test
+    public void testConsumptionBadSession() throws Exception {
+
+        final int messageCount = 5;
+        final int consumerCount = 1;
+        SjmsBatchComponent sb = (SjmsBatchComponent)context.getComponent("sjms-batch");
+        MockConnectionFactory cf = (MockConnectionFactory)sb.getConnectionFactory();
+        cf.returnBadSessionNTimes(2);
+
+        final String queueName = getQueueName();
+        context.addRoutes(new TransactedSendHarness(queueName));
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+
+                int completionTimeout = 1000;
+                int completionSize = 200;
+
+                // keepAliveDelay=300 is the key... it's a 300 millis delay between attempts
to create a new session.
+                fromF("sjms-batch:%s?completionTimeout=%s&completionSize=%s&consumerCount=%s&aggregationStrategy=#testStrategy&keepAliveDelay=300",
+                        queueName, completionTimeout, completionSize, consumerCount)
+                        .routeId("batchConsumer").startupOrder(10).autoStartup(false)
+                        .split(body())
+                        .to("mock:split");
+            }
+        });
+        context.start();
+
+        MockEndpoint mockBefore = getMockEndpoint("mock:before");
+        mockBefore.setExpectedMessageCount(messageCount);
+
+        MockEndpoint mockSplit = getMockEndpoint("mock:split");
+        mockSplit.setExpectedMessageCount(messageCount);
+
+        LOG.info("Sending messages");
+        template.sendBody("direct:in", generateStrings(messageCount));
+        LOG.info("Send complete");
+
+        StopWatch stopWatch = new StopWatch();
+        context.startRoute("batchConsumer");
+
+        assertMockEndpointsSatisfied();
+        long time = stopWatch.stop();
+
+    }
+
     private void assertFirstMessageBodyOfLength(MockEndpoint mockEndpoint, int expectedLength)
{
         Exchange exchange = mockEndpoint.getExchanges().get(0);
         assertEquals(expectedLength, exchange.getIn().getBody(List.class).size());

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
new file mode 100644
index 0000000..00f06be
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnection.java
@@ -0,0 +1,43 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.util.IdGenerator;
+
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockConnection extends ActiveMQConnection {
+    private int returnBadSessionNTimes = 0;
+
+    protected MockConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator
connectionIdGenerator, JMSStatsImpl factoryStats, int returnBadSessionNTimes) throws Exception
{
+        super(transport,  clientIdGenerator,  connectionIdGenerator,  factoryStats);
+        this.returnBadSessionNTimes = returnBadSessionNTimes;
+    }
+
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
+        this.checkClosedOrFailed();
+        this.ensureConnectionInfoSent();
+        if(!transacted) {
+            if(acknowledgeMode == 0) {
+                throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used
for an non-transacted Session");
+            }
+
+            if(acknowledgeMode < 0 || acknowledgeMode > 4) {
+                throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ".
Valid values are Session.AUTO_ACKNOWLEDGE (1), Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE
(3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED
(0)");
+            }
+        }
+
+        boolean useBadSession = false;
+        if(returnBadSessionNTimes > 0){
+            useBadSession = true;
+            returnBadSessionNTimes = returnBadSessionNTimes - 1;
+        }
+        return new MockSession(this, this.getNextSessionId(), transacted?0:acknowledgeMode,
this.isDispatchAsync(), this.isAlwaysSessionAsync(), useBadSession);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
new file mode 100644
index 0000000..75cbe0f
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockConnectionFactory.java
@@ -0,0 +1,42 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.management.JMSStatsImpl;
+import org.apache.activemq.transport.Transport;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockConnectionFactory extends ActiveMQConnectionFactory {
+    private int returnBadSessionNTimes = 0;
+
+    public Connection createConnection() throws JMSException {
+        return this.createActiveMQConnection();
+    }
+    public MockConnectionFactory(String brokerURL) {
+        super(createURI(brokerURL));
+    }
+    private static URI createURI(String brokerURL) {
+        try {
+            return new URI(brokerURL);
+        } catch (URISyntaxException var2) {
+            throw (IllegalArgumentException)(new IllegalArgumentException("Invalid broker
URI: " + brokerURL)).initCause(var2);
+        }
+    }
+
+    protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl
stats) throws Exception {
+        MockConnection connection = new MockConnection(transport, this.getClientIdGenerator(),
this.getConnectionIdGenerator(), stats, returnBadSessionNTimes);
+        return connection;
+    }
+
+    public void returnBadSessionNTimes(int returnBadSessionNTimes) {
+        this.returnBadSessionNTimes = returnBadSessionNTimes;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
new file mode 100644
index 0000000..624c152
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockMessageConsumer.java
@@ -0,0 +1,29 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.ActiveMQMessageConsumer;
+import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.MessageDispatch;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockMessageConsumer extends ActiveMQMessageConsumer{
+    private boolean isBadSession;
+
+    public MockMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination
dest, String name, String selector, int prefetch, int maximumPendingMessageCount, boolean
noLocal, boolean browser, boolean dispatchAsync, MessageListener messageListener, boolean
isBadSession) throws JMSException {
+        super(session, consumerId, dest, name, selector, prefetch, maximumPendingMessageCount,
noLocal, browser, dispatchAsync, messageListener);
+        this.isBadSession = isBadSession;
+    }
+
+    public Message receive(long timeout) throws JMSException {
+        if(isBadSession) throw new IllegalStateException("asdf");
+        return super.receive(timeout);
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/94496488/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
new file mode 100644
index 0000000..4290e34
--- /dev/null
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/support/MockSession.java
@@ -0,0 +1,45 @@
+package org.apache.camel.component.sjms.support;
+
+import org.apache.activemq.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.SessionId;
+
+import javax.jms.*;
+
+/**
+ * Created by bryan.love on 3/22/17.
+ */
+public class MockSession extends ActiveMQSession {
+    private boolean isBadSession = false;
+
+    protected MockSession(ActiveMQConnection connection, SessionId sessionId, int acknowledgeMode,
boolean asyncDispatch, boolean sessionAsyncDispatch, boolean isBadSession) throws JMSException
{
+        super(connection,  sessionId,  acknowledgeMode,  asyncDispatch,  sessionAsyncDispatch);
+        this.isBadSession = isBadSession;
+    }
+    public Queue createQueue(String queueName) throws JMSException {
+        this.checkClosed();
+        return (Queue)(queueName.startsWith("ID:")?new ActiveMQTempQueue(queueName):new ActiveMQQueue(queueName));
+    }
+
+    public MessageConsumer createConsumer(Destination destination, String messageSelector,
boolean noLocal, MessageListener messageListener) throws JMSException {
+        this.checkClosed();
+        if(destination instanceof CustomDestination) {
+            CustomDestination prefetchPolicy1 = (CustomDestination)destination;
+            return prefetchPolicy1.createConsumer(this, messageSelector, noLocal);
+        } else {
+            ActiveMQPrefetchPolicy prefetchPolicy = this.connection.getPrefetchPolicy();
+            boolean prefetch = false;
+            int prefetch1;
+            if(destination instanceof Topic) {
+                prefetch1 = prefetchPolicy.getTopicPrefetch();
+            } else {
+                prefetch1 = prefetchPolicy.getQueuePrefetch();
+            }
+
+            ActiveMQDestination activemqDestination = ActiveMQMessageTransformation.transformDestination(destination);
+            return new MockMessageConsumer(this, this.getNextConsumerId(), activemqDestination,
(String)null, messageSelector, prefetch1, prefetchPolicy.getMaximumPendingMessageLimit(),
noLocal, false, this.isAsyncDispatch(), messageListener, isBadSession);
+        }
+    }
+}


Mime
View raw message