camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sully6...@apache.org
Subject [1/3] git commit: CAMEL-6362: Consumers should always use dedicated Sessions
Date Wed, 30 Oct 2013 13:21:58 GMT
Updated Branches:
  refs/heads/master 0ab6bba5b -> dbe794c37


CAMEL-6362: Consumers should always use dedicated Sessions

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/520e55fa
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/520e55fa
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/520e55fa

Branch: refs/heads/master
Commit: 520e55fa81f7aa9faa65960ad36be0af8f3d03d0
Parents: 29e4411
Author: Scott England-Sullivan <sully6768@apache.org>
Authored: Tue Oct 29 11:17:35 2013 -0500
Committer: Scott England-Sullivan <sully6768@apache.org>
Committed: Tue Oct 29 11:17:35 2013 -0500

----------------------------------------------------------------------
 .../camel/component/sjms/SjmsConsumer.java      |  75 +++++------
 .../camel/component/sjms/SjmsEndpoint.java      |  42 +-----
 .../camel/component/sjms/SjmsProducer.java      |   5 +-
 .../component/sjms/producer/InOnlyProducer.java | 102 +++++++++------
 .../component/sjms/producer/InOutProducer.java  | 127 ++++++++++++-------
 .../InOnlyTopicDurableConsumerTest.java         |   4 +-
 6 files changed, 180 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
index 806537d..5b01c3a 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsConsumer.java
@@ -23,6 +23,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.apache.camel.CamelException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
@@ -32,7 +33,6 @@ import org.apache.camel.component.sjms.consumer.InOutMessageHandler;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.JmsObjectFactory;
 import org.apache.camel.component.sjms.jms.ObjectPool;
-import org.apache.camel.component.sjms.jms.SessionPool;
 import org.apache.camel.component.sjms.taskmanager.TimedTaskManager;
 import org.apache.camel.component.sjms.tx.BatchTransactionCommitStrategy;
 import org.apache.camel.component.sjms.tx.DefaultTransactionCommitStrategy;
@@ -66,12 +66,7 @@ public class SjmsConsumer extends DefaultConsumer {
          */
         @Override
         protected MessageConsumerResources createObject() throws Exception {
-            MessageConsumerResources model = null;
-            if (isTransacted() || getEndpoint().getExchangePattern().equals(ExchangePattern.InOut))
{
-                model = createConsumerWithDedicatedSession();
-            } else {
-                model = createConsumerListener();
-            }
+            MessageConsumerResources model = createConsumer();
             return model;
         }
 
@@ -168,39 +163,41 @@ public class SjmsConsumer extends DefaultConsumer {
      * Creates a {@link MessageConsumerResources} with a dedicated
      * {@link Session} required for transacted and InOut consumers.
      */
-    private MessageConsumerResources createConsumerWithDedicatedSession() throws Exception
{
-        Connection conn = getConnectionResource().borrowConnection();
-        Session session = null;
-        if (isTransacted()) {
-            session = conn.createSession(true, Session.SESSION_TRANSACTED);
-        } else {
-            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    @SuppressWarnings("unused")
+    private MessageConsumerResources createConsumer() throws Exception {
+        MessageConsumerResources answer = null;
+        Connection conn = null;
+        try {
+            conn = getConnectionResource().borrowConnection();
+            
+            Session session = null;
+            MessageConsumer messageConsumer = null;
+            if (isTransacted()) {
+                session = conn.createSession(true, Session.SESSION_TRANSACTED);
+            } else {
+                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+            messageConsumer = JmsObjectFactory.createMessageConsumer(session, getDestinationName(),
getMessageSelector(), isTopic(), getDurableSubscriptionId());
+            MessageListener handler = createMessageHandler(session);
+            messageConsumer.setMessageListener(handler);
+            
+            if (session == null) {
+                throw new CamelException("Message Consumer Creation Exception: Session is
NULL");
+            }
+            if (messageConsumer == null) {
+                throw new CamelException("Message Consumer Creation Exception: MessageConsumer
is NULL");
+            }
+            answer = new MessageConsumerResources(session, messageConsumer);
+        } catch (Exception e) {
+            log.error("Unable to create the MessageConsumer: " + e.getLocalizedMessage());
+        } finally {
+            if (conn != null) {
+                getConnectionResource().returnConnection(conn);
+            }
         }
-        MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session,
getDestinationName(), getMessageSelector(), isTopic(), getDurableSubscriptionId());
-        MessageListener handler = createMessageHandler(session);
-        messageConsumer.setMessageListener(handler);
-        getConnectionResource().returnConnection(conn);
-        return new MessageConsumerResources(session, messageConsumer);
+        return answer;
     }
 
-    /**
-     * Creates a {@link MessageConsumerResources} with a shared {@link Session}
-     * for non-transacted InOnly consumers.
-     */
-    private MessageConsumerResources createConsumerListener() throws Exception {
-        Session queueSession = getSessionPool().borrowObject();
-        MessageConsumer messageConsumer = null;
-        if (isTopic()) {
-            messageConsumer = JmsObjectFactory.createTopicConsumer(queueSession, getDestinationName(),
getMessageSelector());
-        } else {
-            messageConsumer = JmsObjectFactory.createQueueConsumer(queueSession, getDestinationName(),
getMessageSelector());
-        }
-        getSessionPool().returnObject(queueSession);
-        // Don't pass in the session. Only needed if we are transacted
-        MessageListener handler = createMessageHandler(null);
-        messageConsumer.setMessageListener(handler);
-        return new MessageConsumerResources(messageConsumer);
-    }
 
     /**
      * Helper factory method used to create a MessageListener based on the MEP
@@ -253,10 +250,6 @@ public class SjmsConsumer extends DefaultConsumer {
         return getEndpoint().getConnectionResource();
     }
 
-    protected SessionPool getSessionPool() {
-        return getEndpoint().getSessions();
-    }
-
     public int getAcknowledgementMode() {
         return getEndpoint().getAcknowledgementMode().intValue();
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index edaa70c..63118ec 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -25,7 +25,6 @@ import org.apache.camel.Producer;
 import org.apache.camel.component.sjms.jms.ConnectionResource;
 import org.apache.camel.component.sjms.jms.KeyFormatStrategy;
 import org.apache.camel.component.sjms.jms.SessionAcknowledgementType;
-import org.apache.camel.component.sjms.jms.SessionPool;
 import org.apache.camel.component.sjms.producer.InOnlyProducer;
 import org.apache.camel.component.sjms.producer.InOutProducer;
 import org.apache.camel.impl.DefaultEndpoint;
@@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory;
 public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSupport {
     protected final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private SessionPool sessions;
+    
     @UriParam
     private boolean synchronous = true;
     @UriParam
@@ -95,28 +94,10 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-
-        //
-        // TODO since we only need a session pool for one use case, find a
-        // better way
-        //
-        // We only create a session pool when we are not transacted.
-        // Transacted listeners or producers need to be paired with the
-        // Session that created them.
-        if (!isTransacted() && getExchangePattern().equals(ExchangePattern.InOnly))
{
-            sessions = new SessionPool(getSessionCount(), getConnectionResource());
-
-            // TODO fix the string hack
-            sessions.setAcknowledgeMode(SessionAcknowledgementType.valueOf(getAcknowledgementMode()
+ ""));
-            getSessions().fillPool();
-        }
     }
 
     @Override
     protected void doStop() throws Exception {
-        if (getSessions() != null) {
-            getSessions().drainPool();
-        }
         super.doStop();
     }
 
@@ -169,25 +150,6 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
     }
 
     /**
-     * Returns a SessionPool if available.
-     * 
-     * @return the sessions
-     */
-    public SessionPool getSessions() {
-        return sessions;
-    }
-
-    /**
-     * SessionPool used by endpoints that do not require a dedicated session per
-     * consumer or producer.
-     * 
-     * @param sessions default null
-     */
-    public void setSessions(SessionPool sessions) {
-        this.sessions = sessions;
-    }
-
-    /**
      * Use to determine whether or not to process exchanges synchronously.
      * 
      * @return true if endoint is synchronous, otherwise false
@@ -239,6 +201,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
      * 
      * @return the sessionCount
      */
+    @Deprecated
     public int getSessionCount() {
         return sessionCount;
     }
@@ -250,6 +213,7 @@ public class SjmsEndpoint extends DefaultEndpoint implements MultipleConsumersSu
      * 
      * @param sessionCount the number of Session instances, default is 1
      */
+    @Deprecated
     public void setSessionCount(int sessionCount) {
         this.sessionCount = sessionCount;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
index d4e1a1d..c5d9c1f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsProducer.java
@@ -157,7 +157,7 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
         if (log.isDebugEnabled()) {
             log.debug("Processing Exchange.id:{}", exchange.getExchangeId());
         }
-        
+
         try {
             if (!isSynchronous()) {
                 if (log.isDebugEnabled()) {
@@ -189,10 +189,9 @@ public abstract class SjmsProducer extends DefaultAsyncProducer {
             exchange.setException(e);
         }
         log.debug("Processing Exchange.id:{}", exchange.getExchangeId() + " - SUCCESS");
-        
+
         return isSynchronous();
     }
-    
 
     protected SjmsEndpoint getSjmsEndpoint() {
         return (SjmsEndpoint)this.getEndpoint();

http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index ca49765..df689b2 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -51,32 +51,44 @@ public class InOnlyProducer extends SjmsProducer {
      */
     @Override
     public MessageProducerResources doCreateProducerModel() throws Exception {
-        Connection conn = getConnectionResource().borrowConnection();
-        TransactionCommitStrategy commitStrategy = null;
-        Session session = null;
-        if (isEndpointTransacted()) {
-            if (getCommitStrategy() != null) {
-                commitStrategy = getCommitStrategy();
+        MessageProducerResources answer = null;
+        Connection conn = null;
+        try {
+            conn = getConnectionResource().borrowConnection();
+            
+            TransactionCommitStrategy commitStrategy = null;
+            Session session = null;
+            MessageProducer messageProducer = null;
+            
+            if (isEndpointTransacted()) {
+                if (getCommitStrategy() != null) {
+                    commitStrategy = getCommitStrategy();
+                } else {
+                    commitStrategy = new DefaultTransactionCommitStrategy();
+                }
+                session = conn.createSession(true, getAcknowledgeMode());
             } else {
-                commitStrategy = new DefaultTransactionCommitStrategy();
+                session = conn.createSession(false, getAcknowledgeMode());
+            }
+            if (isTopic()) {
+                messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(),
isTopic(), isPersistent(), getTtl());
+            } else {
+                messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+            }
+            answer = new MessageProducerResources(session, messageProducer, commitStrategy);
+        } catch (Exception e) {
+            log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage());
+        } finally {
+            if (conn != null) {
+                getConnectionResource().returnConnection(conn);
             }
-            session = conn.createSession(true, getAcknowledgeMode());
-        } else {
-            session = conn.createSession(false, getAcknowledgeMode());
-        }
-        MessageProducer messageProducer = null;
-        if (isTopic()) {
-            messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(),
isTopic(), isPersistent(), getTtl());
-        } else {
-            messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
         }
-        getConnectionResource().returnConnection(conn);
-        return new MessageProducerResources(session, messageProducer, commitStrategy);
+        return answer;
     }
 
     /*
-     * @see org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange,
org.apache.camel.AsyncCallback)
-     *
+     * @see
+     * org.apache.camel.component.sjms.SjmsProducer#sendMessage(org.apache.camel.Exchange,
org.apache.camel.AsyncCallback)
      * @param exchange
      * @param callback
      * @throws Exception
@@ -85,34 +97,40 @@ public class InOnlyProducer extends SjmsProducer {
     public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
         List<Message> messages = new ArrayList<Message>();
         MessageProducerResources producer = getProducers().borrowObject();
-        if (getProducers() != null) {
-            if (exchange.getIn().getBody() != null) {
-                if (exchange.getIn().getBody() instanceof List) {
-                    List<?> payload = (List<?>)exchange.getIn().getBody();
-                    for (Object object : payload) {
-                        Message message = null;
-                        if (BatchMessage.class.isInstance(object)) {
-                            BatchMessage<?> batchMessage = (BatchMessage<?>)object;
-                            message = JmsMessageHelper.createMessage(producer.getSession(),
batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
-                                .getJmsKeyFormatStrategy());
-                        } else {
-                            message = JmsMessageHelper.createMessage(producer.getSession(),
object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+        try {
+            if (getProducers() != null) {
+                if (exchange.getIn().getBody() != null) {
+                    if (exchange.getIn().getBody() instanceof List) {
+                        List<?> payload = (List<?>)exchange.getIn().getBody();
+                        for (Object object : payload) {
+                            Message message = null;
+                            if (BatchMessage.class.isInstance(object)) {
+                                BatchMessage<?> batchMessage = (BatchMessage<?>)object;
+                                message = JmsMessageHelper.createMessage(producer.getSession(),
batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
+                                    .getJmsKeyFormatStrategy());
+                            } else {
+                                message = JmsMessageHelper.createMessage(producer.getSession(),
object, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
+                            }
+                            messages.add(message);
                         }
+                    } else {
+                        Object payload = exchange.getIn().getBody();
+                        Message message = JmsMessageHelper
+                            .createMessage(producer.getSession(), payload, exchange.getIn().getHeaders(),
getSjmsEndpoint().getJmsKeyFormatStrategy());
                         messages.add(message);
                     }
-                } else {
-                    Object payload = exchange.getIn().getBody();
-                    Message message = JmsMessageHelper.createMessage(producer.getSession(),
payload, exchange.getIn().getHeaders(), getSjmsEndpoint().getJmsKeyFormatStrategy());
-                    messages.add(message);
                 }
-            }
 
-            if (isEndpointTransacted()) {
-                exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(),
producer.getCommitStrategy()));
-            }
-            for (Message message : messages) {
-                producer.getMessageProducer().send(message);
+                if (isEndpointTransacted()) {
+                    exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(),
producer.getCommitStrategy()));
+                }
+                for (Message message : messages) {
+                    producer.getMessageProducer().send(message);
+                }
             }
+        } catch (Exception e) {
+            exchange.setException(new Exception("Unable to complet sending the message: "
+ e.getLocalizedMessage()));
+        } finally {
             getProducers().returnObject(producer);
             callback.done(isSynchronous());
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
index c98e136..0936ecf 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOutProducer.java
@@ -79,40 +79,49 @@ public class InOutProducer extends SjmsProducer {
 
         @Override
         protected MessageConsumerResource createObject() throws Exception {
-            Connection conn = getConnectionResource().borrowConnection();
+            MessageConsumerResource answer = null;
+            Connection conn = null;
             Session session = null;
-            if (isEndpointTransacted()) {
-                session = conn.createSession(true, Session.SESSION_TRANSACTED);
-            } else {
-                session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            }
-            getConnectionResource().returnConnection(conn);
-            Destination replyToDestination = null;
-            if (ObjectHelper.isEmpty(getNamedReplyTo())) {
-                replyToDestination = JmsObjectFactory.createTemporaryDestination(session,
isTopic());
-            } else {
-                replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(),
isTopic());
-            }
-            MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session,
replyToDestination, null, isTopic(), null, true);
-            messageConsumer.setMessageListener(new MessageListener() {
-
-                @Override
-                public void onMessage(Message message) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Message Received in the Consumer Pool");
-                        logger.debug("  Message : {}", message);
-                    }
-                    try {
-                        Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
-                        exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
-                    } catch (Exception e) {
-                        ObjectHelper.wrapRuntimeCamelException(e);
-                    }
+            try {
+                conn = getConnectionResource().borrowConnection();
+                if (isEndpointTransacted()) {
+                    session = conn.createSession(true, Session.SESSION_TRANSACTED);
+                } else {
+                    session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                }
 
+                Destination replyToDestination = null;
+                if (ObjectHelper.isEmpty(getNamedReplyTo())) {
+                    replyToDestination = JmsObjectFactory.createTemporaryDestination(session,
isTopic());
+                } else {
+                    replyToDestination = JmsObjectFactory.createDestination(session, getNamedReplyTo(),
isTopic());
                 }
-            });
-            MessageConsumerResource mcm = new MessageConsumerResource(session, messageConsumer,
replyToDestination);
-            return mcm;
+                MessageConsumer messageConsumer = JmsObjectFactory.createMessageConsumer(session,
replyToDestination, null, isTopic(), null, true);
+                messageConsumer.setMessageListener(new MessageListener() {
+
+                    @Override
+                    public void onMessage(Message message) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Message Received in the Consumer Pool");
+                            logger.debug("  Message : {}", message);
+                        }
+                        try {
+                            Exchanger<Object> exchanger = exchangerMap.get(message.getJMSCorrelationID());
+                            exchanger.exchange(message, getResponseTimeOut(), TimeUnit.MILLISECONDS);
+                        } catch (Exception e) {
+                            ObjectHelper.wrapRuntimeCamelException(e);
+                        }
+
+                    }
+                });
+                answer = new MessageConsumerResource(session, messageConsumer, replyToDestination);
+            } catch (Exception e) {
+                log.error("Unable to create the MessageConsumerResource: " + e.getLocalizedMessage());
+                throw new CamelException(e);
+            } finally {
+                getConnectionResource().returnConnection(conn);
+            }
+            return answer;
         }
 
         @Override
@@ -227,21 +236,42 @@ public class InOutProducer extends SjmsProducer {
 
     @Override
     public MessageProducerResources doCreateProducerModel() throws Exception {
-        Connection conn = getConnectionResource().borrowConnection();
-        Session session = null;
-        if (isEndpointTransacted()) {
-            session = conn.createSession(true, getAcknowledgeMode());
-        } else {
-            session = conn.createSession(false, getAcknowledgeMode());
-        }
-        MessageProducer messageProducer = null;
-        if (isTopic()) {
-            messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(),
isTopic(), isPersistent(), getTtl());
-        } else {
-            messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+        MessageProducerResources answer = null;
+        Connection conn = null;
+        try {
+            MessageProducer messageProducer = null;
+            Session session = null;
+            
+            conn = getConnectionResource().borrowConnection();
+            if (isEndpointTransacted()) {
+                session = conn.createSession(true, getAcknowledgeMode());
+            } else {
+                session = conn.createSession(false, getAcknowledgeMode());
+            }
+            if (isTopic()) {
+                messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(),
isTopic(), isPersistent(), getTtl());
+            } else {
+                messageProducer = JmsObjectFactory.createQueueProducer(session, getDestinationName());
+            }
+
+            if (session == null) {
+                throw new CamelException("Message Consumer Creation Exception: Session is
NULL");
+            }
+            if (messageProducer == null) {
+                throw new CamelException("Message Consumer Creation Exception: MessageProducer
is NULL");
+            }
+            
+            answer = new MessageProducerResources(session, messageProducer);
+
+        } catch (Exception e) {
+            log.error("Unable to create the MessageProducer: " + e.getLocalizedMessage());
+        } finally {
+            if (conn != null) {
+                getConnectionResource().returnConnection(conn);
+            }
         }
-        getConnectionResource().returnConnection(conn);
-        return new MessageProducerResources(session, messageProducer);
+        
+        return answer;
     }
 
     /**
@@ -269,9 +299,9 @@ public class InOutProducer extends SjmsProducer {
                 if (isEndpointTransacted()) {
                     exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(),
getCommitStrategy()));
                 }
-                
+
                 Message request = SjmsExchangeMessageHelper.createMessage(exchange, producer.getSession(),
getSjmsEndpoint().getJmsKeyFormatStrategy());
-                
+
                 // TODO just set the correlation id don't get it from the
                 // message
                 String correlationId = null;
@@ -295,7 +325,8 @@ public class InOutProducer extends SjmsProducer {
                 consumers.returnObject(consumer);
                 producer.getMessageProducer().send(request);
 
-                // Return the producer to the pool so another waiting producer can move forward
+                // Return the producer to the pool so another waiting producer
+                // can move forward
                 // without waiting on us to complete the exchange
                 try {
                     getProducers().returnObject(producer);

http://git-wip-us.apache.org/repos/asf/camel/blob/520e55fa/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
index 9fa0a80..2420476 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyTopicDurableConsumerTest.java
@@ -78,10 +78,10 @@ public class InOnlyTopicDurableConsumerTest extends CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("sjms:topic:foo?durableSubscriptionId=bar")
+                from("sjms:topic:foo?durableSubscriptionId=bar1")
                     .to("mock:result");
 
-                from("sjms:topic:foo?durableSubscriptionId=bar")
+                from("sjms:topic:foo?durableSubscriptionId=bar2")
                     .to("mock:result2");
             }
         };


Mime
View raw message