camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/5] camel git commit: CAMEL-8711: Expose JMS session to Camel Message so end users can access it and use it for client ack mode etc.
Date Wed, 06 May 2015 10:37:42 GMT
Repository: camel
Updated Branches:
  refs/heads/master e816247d6 -> e319c9e08


CAMEL-8711: Expose JMS session to Camel Message so end users can access it and use it for
client ack mode etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e00e0d65
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e00e0d65
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e00e0d65

Branch: refs/heads/master
Commit: e00e0d6599b01733c270f3053e23118d35ea0881
Parents: e816247
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Wed May 6 11:20:43 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed May 6 11:20:43 2015 +0200

----------------------------------------------------------------------
 .../jms/DefaultQueueBrowseStrategy.java         |  4 +--
 .../component/jms/EndpointMessageListener.java  | 12 +++++---
 .../apache/camel/component/jms/JmsEndpoint.java |  5 +--
 .../apache/camel/component/jms/JmsMessage.java  | 32 ++++++++++++++------
 .../camel/component/jms/JmsPollingConsumer.java |  3 +-
 .../component/jms/reply/QueueReplyManager.java  |  4 +--
 .../camel/component/jms/reply/ReplyHandler.java |  6 ++--
 .../camel/component/jms/reply/ReplyHolder.java  | 17 +++++++++--
 .../camel/component/jms/reply/ReplyManager.java |  4 +--
 .../jms/reply/ReplyManagerSupport.java          | 12 +++++---
 .../jms/reply/TemporaryQueueReplyHandler.java   |  5 +--
 .../jms/reply/TemporaryQueueReplyManager.java   |  4 +--
 12 files changed, 72 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
index 3258d61..45f5847 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/DefaultQueueBrowseStrategy.java
@@ -51,7 +51,7 @@ public class DefaultQueueBrowseStrategy implements QueueBrowseStrategy {
                     Enumeration<?> iter = browser.getEnumeration();
                     for (int i = 0; i < size && iter.hasMoreElements(); i++) {
                         Message message = (Message) iter.nextElement();
-                        Exchange exchange = endpoint.createExchange(message);
+                        Exchange exchange = endpoint.createExchange(message, session);
                         answer.add(exchange);
                     }
                     return answer;
@@ -71,7 +71,7 @@ public class DefaultQueueBrowseStrategy implements QueueBrowseStrategy {
                     Enumeration<?> iter = browser.getEnumeration();
                     for (int i = 0; i < size && iter.hasMoreElements(); i++) {
                         Message message = (Message) iter.nextElement();
-                        Exchange exchange = endpoint.createExchange(message);
+                        Exchange exchange = endpoint.createExchange(message, session);
                         answer.add(exchange);
                     }
                     return answer;

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
index f2afc60..3980128 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.jms.core.JmsOperations;
 import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.SessionAwareMessageListener;
 
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
@@ -46,7 +47,7 @@ import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
  *
  * @version 
  */
-public class EndpointMessageListener implements MessageListener {
+public class EndpointMessageListener implements SessionAwareMessageListener {
     private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class);
     private final JmsEndpoint endpoint;
     private final AsyncProcessor processor;
@@ -62,7 +63,8 @@ public class EndpointMessageListener implements MessageListener {
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
 
-    public void onMessage(final Message message) {
+    @Override
+    public void onMessage(Message message, Session session) throws JMSException {
         LOG.trace("onMessage START");
 
         LOG.debug("{} consumer received JMS message: {}", endpoint, message);
@@ -75,7 +77,7 @@ public class EndpointMessageListener implements MessageListener {
             // and disableReplyTo hasn't been explicit enabled
             sendReply = replyDestination != null && !disableReplyTo;
 
-            final Exchange exchange = createExchange(message, replyDestination);
+            final Exchange exchange = createExchange(message, session, replyDestination);
             if (eagerLoadingOfProperties) {
                 exchange.getIn().getHeaders();
             }
@@ -233,11 +235,11 @@ public class EndpointMessageListener implements MessageListener {
         }
     }
 
-    public Exchange createExchange(Message message, Object replyDestination) {
+    public Exchange createExchange(Message message, Session session, Object replyDestination)
{
         Exchange exchange = endpoint.createExchange();
         JmsBinding binding = getBinding();
         exchange.setProperty(Exchange.BINDING, binding);
-        exchange.setIn(new JmsMessage(message, binding));
+        exchange.setIn(new JmsMessage(message, session, binding));
 
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index ff9bc32..6389226 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -25,6 +25,7 @@ import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
@@ -292,9 +293,9 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return exchange;
     }
 
-    public Exchange createExchange(Message message) {
+    public Exchange createExchange(Message message, Session session) {
         Exchange exchange = createExchange(getExchangePattern());
-        exchange.setIn(new JmsMessage(message, getBinding()));
+        exchange.setIn(new JmsMessage(message, session, getBinding()));
         return exchange;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
index 45274cf..1dfbd48 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessage.java
@@ -22,6 +22,7 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.Topic;
 
 import org.apache.camel.RuntimeExchangeException;
@@ -39,10 +40,12 @@ import org.slf4j.LoggerFactory;
 public class JmsMessage extends DefaultMessage {
     private static final Logger LOG = LoggerFactory.getLogger(JmsMessage.class);
     private Message jmsMessage;
+    private Session jmsSession;
     private JmsBinding binding;
 
-    public JmsMessage(Message jmsMessage, JmsBinding binding) {
+    public JmsMessage(Message jmsMessage, Session jmsSession, JmsBinding binding) {
         setJmsMessage(jmsMessage);
+        setJmsSession(jmsSession);
         setBinding(binding);
     }
 
@@ -98,13 +101,6 @@ public class JmsMessage extends DefaultMessage {
         }
     }
 
-    /**
-     * Returns the underlying JMS message
-     */
-    public Message getJmsMessage() {
-        return jmsMessage;
-    }
-
     public JmsBinding getBinding() {
         if (binding == null) {
             binding = ExchangeHelper.getBinding(getExchange(), JmsBinding.class);
@@ -116,6 +112,13 @@ public class JmsMessage extends DefaultMessage {
         this.binding = binding;
     }
 
+    /**
+     * Returns the underlying JMS message
+     */
+    public Message getJmsMessage() {
+        return jmsMessage;
+    }
+
     public void setJmsMessage(Message jmsMessage) {
         if (jmsMessage != null) {
             try {
@@ -127,6 +130,17 @@ public class JmsMessage extends DefaultMessage {
         this.jmsMessage = jmsMessage;
     }
 
+    /**
+     * Returns the underlying JMS session
+     */
+    public Session getJmsSession() {
+        return jmsSession;
+    }
+
+    public void setJmsSession(Session jmsSession) {
+        this.jmsSession = jmsSession;
+    }
+
     @Override
     public void setBody(Object body) {
         super.setBody(body);
@@ -186,7 +200,7 @@ public class JmsMessage extends DefaultMessage {
 
     @Override
     public JmsMessage newInstance() {
-        return new JmsMessage(null, binding);
+        return new JmsMessage(null, null, binding);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java
index 2df30cf..cbb74b0 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsPollingConsumer.java
@@ -56,6 +56,7 @@ public class JmsPollingConsumer extends PollingConsumerSupport implements
Servic
     }
 
     public Exchange receive(long timeout) {
+        // TODO: use api so we can get hold of session
         setReceiveTimeout(timeout);
         Message message;
         // using the selector
@@ -65,7 +66,7 @@ public class JmsPollingConsumer extends PollingConsumerSupport implements
Servic
             message = template.receive();
         }
         if (message != null) {
-            return getEndpoint().createExchange(message);
+            return getEndpoint().createExchange(message, null);
         }
         return null;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
index 07ddfad..1b72680 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java
@@ -65,7 +65,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
         correlation.put(newCorrelationId, handler, requestTimeout);
     }
 
-    protected void handleReplyMessage(String correlationID, Message message) {
+    protected void handleReplyMessage(String correlationID, Message message, Session session)
{
         ReplyHandler handler = correlation.get(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
@@ -73,7 +73,7 @@ public class QueueReplyManager extends ReplyManagerSupport {
 
         if (handler != null) {
             correlation.remove(correlationID);
-            handler.onReply(correlationID, message);
+            handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and
therefore
             // we cannot continue routing the unknown message

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
index 5579745..e05b5c4 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHandler.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import javax.jms.Message;
+import javax.jms.Session;
 
 /**
  * Handles a reply.
@@ -29,9 +30,10 @@ public interface ReplyHandler {
      * The reply message was received
      *
      * @param correlationId  the correlation id
-     * @param reply  the reply message
+     * @param reply  the JMS reply message
+     * @param session the JMS session
      */
-    void onReply(String correlationId, Message reply);
+    void onReply(String correlationId, Message reply, Session session);
 
     /**
      * The reply message was not received and a timeout triggered

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
index a967a06..84945ad 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import javax.jms.Message;
+import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -32,6 +33,7 @@ public class ReplyHolder {
     private final Exchange exchange;
     private final AsyncCallback callback;
     private final Message message;
+    private final Session session;
     private final String originalCorrelationId;
     private final String correlationId;
     private long timeout;
@@ -40,12 +42,13 @@ public class ReplyHolder {
      * Constructor to use when a reply message was received
      */
     public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
-                       String correlationId, Message message) {
+                       String correlationId, Message message, Session session) {
         this.exchange = exchange;
         this.callback = callback;
         this.originalCorrelationId = originalCorrelationId;
         this.correlationId = correlationId;
         this.message = message;
+        this.session = session;
     }
 
     /**
@@ -53,7 +56,7 @@ public class ReplyHolder {
      */
     public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
                        String correlationId, long timeout) {
-        this(exchange, callback, originalCorrelationId, correlationId, null);
+        this(exchange, callback, originalCorrelationId, correlationId, null, null);
         this.timeout = timeout;
     }
 
@@ -95,6 +98,16 @@ public class ReplyHolder {
     }
 
     /**
+     * Gets the JMS session from the received message
+     *
+     * @return  the JMS session, or <tt>null</tt> if timeout occurred and no
message has been received
+     * @see #isTimeout()
+     */
+    public Session getSession() {
+        return session;
+    }
+
+    /**
      * Whether timeout triggered or not.
      * <p/>
      * A timeout is triggered if <tt>requestTimeout</tt> option has been configured,
and a reply message has <b>not</b> been

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
index 9eb0085..e3b65aa 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManager.java
@@ -20,11 +20,11 @@ import java.util.concurrent.ScheduledExecutorService;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageListener;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.component.jms.JmsEndpoint;
+import org.springframework.jms.listener.SessionAwareMessageListener;
 
 /**
  * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a>
@@ -32,7 +32,7 @@ import org.apache.camel.component.jms.JmsEndpoint;
  *
  * @version 
  */
-public interface ReplyManager extends MessageListener {
+public interface ReplyManager extends SessionAwareMessageListener {
 
     /**
      * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}.

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 83a7729..a2f70c3 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -114,7 +115,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements
Repl
     protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange
exchange, AsyncCallback callback,
                                 String originalCorrelationId, String correlationId, long
requestTimeout);
 
-    public void onMessage(Message message) {
+    public void onMessage(Message message, Session session) throws JMSException {
         String correlationID = null;
         try {
             correlationID = message.getJMSCorrelationID();
@@ -129,14 +130,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements
Repl
         log.debug("Received reply message with correlationID [{}] -> {}", correlationID,
message);
 
         // handle the reply message
-        handleReplyMessage(correlationID, message);
+        handleReplyMessage(correlationID, message, session);
     }
 
     public void processReply(ReplyHolder holder) {
         if (holder != null && isRunAllowed()) {
             try {
                 Exchange exchange = holder.getExchange();
-                Message message = holder.getMessage();
 
                 boolean timeout = holder.isTimeout();
                 if (timeout) {
@@ -151,7 +151,9 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements
Repl
                     String msg = "reply message with correlationID: " + holder.getCorrelationId()
+ " not received on destination: " + replyTo;
                     exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(),
msg));
                 } else {
-                    JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+                    Message message = holder.getMessage();
+                    Session session = holder.getSession();
+                    JmsMessage response = new JmsMessage(message, session, endpoint.getBinding());
                     // the JmsBinding is designed to be "pull-based": it will populate the
Camel message on demand
                     // therefore, we link Exchange and OUT message before continuing, so
that the JmsBinding has full access 
                     // to everything it may need, and can populate headers, properties, etc.
accordingly (solves CAMEL-6218).
@@ -181,7 +183,7 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements
Repl
         }
     }
 
-    protected abstract void handleReplyMessage(String correlationID, Message message);
+    protected abstract void handleReplyMessage(String correlationID, Message message, Session
session);
 
     protected abstract AbstractMessageListenerContainer createListenerContainer() throws
Exception;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
index a2b28a2..f752a05 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.jms.reply;
 
 import javax.jms.Message;
+import javax.jms.Session;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
@@ -47,9 +48,9 @@ public class TemporaryQueueReplyHandler implements ReplyHandler {
         this.timeout = timeout;
     }
 
-    public void onReply(String correlationId, Message reply) {
+    public void onReply(String correlationId, Message reply, Session session) {
         // create holder object with the the reply
-        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId,
reply);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId,
reply, session);
         // process the reply
         replyManager.processReply(holder);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/e00e0d65/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 0e3d98b..6d1f51d 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -72,7 +72,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
     }
 
     @Override
-    protected void handleReplyMessage(String correlationID, Message message) {
+    protected void handleReplyMessage(String correlationID, Message message, Session session)
{
         ReplyHandler handler = correlation.get(correlationID);
         if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) {
             handler = waitForProvisionCorrelationToBeUpdated(correlationID, message);
@@ -80,7 +80,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
 
         if (handler != null) {
             correlation.remove(correlationID);
-            handler.onReply(correlationID, message);
+            handler.onReply(correlationID, message, session);
         } else {
             // we could not correlate the received reply message to a matching request and
therefore
             // we cannot continue routing the unknown message


Mime
View raw message