Updated Branches:
refs/heads/camel-2.12.x 3ad782cfe -> 3cc1f5c65
CAMEL-7107 avoid the NPE in case of connection loss with thanks to Marios
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/3cc1f5c6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/3cc1f5c6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/3cc1f5c6
Branch: refs/heads/camel-2.12.x
Commit: 3cc1f5c6564735d71d5da8e702e13cc84e10f81d
Parents: cef0dd0
Author: Willem Jiang <willem.jiang@gmail.com>
Authored: Fri Jan 10 17:57:07 2014 +0800
Committer: Willem Jiang <willem.jiang@gmail.com>
Committed: Fri Jan 10 18:51:18 2014 +0800
----------------------------------------------------------------------
.../component/sjms/producer/InOnlyProducer.java | 35 +++++++++++---------
1 file changed, 20 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/3cc1f5c6/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index df689b2..84eb1f5 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.sjms.producer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import javax.jms.Connection;
@@ -25,9 +26,9 @@ import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.camel.AsyncCallback;
+import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.component.sjms.BatchMessage;
-import org.apache.camel.component.sjms.SjmsEndpoint;
import org.apache.camel.component.sjms.SjmsProducer;
import org.apache.camel.component.sjms.TransactionCommitStrategy;
import org.apache.camel.component.sjms.jms.JmsMessageHelper;
@@ -40,7 +41,7 @@ import org.apache.camel.component.sjms.tx.SessionTransactionSynchronization;
*/
public class InOnlyProducer extends SjmsProducer {
- public InOnlyProducer(SjmsEndpoint endpoint) {
+ public InOnlyProducer(final Endpoint endpoint) {
super(endpoint);
}
@@ -55,11 +56,9 @@ public class InOnlyProducer extends SjmsProducer {
Connection conn = null;
try {
conn = getConnectionResource().borrowConnection();
-
TransactionCommitStrategy commitStrategy = null;
- Session session = null;
- MessageProducer messageProducer = null;
-
+ Session session;
+
if (isEndpointTransacted()) {
if (getCommitStrategy() != null) {
commitStrategy = getCommitStrategy();
@@ -70,6 +69,8 @@ public class InOnlyProducer extends SjmsProducer {
} else {
session = conn.createSession(false, getAcknowledgeMode());
}
+
+ MessageProducer messageProducer;
if (isTopic()) {
messageProducer = JmsObjectFactory.createMessageProducer(session, getDestinationName(),
isTopic(), isPersistent(), getTtl());
} else {
@@ -94,16 +95,16 @@ public class InOnlyProducer extends SjmsProducer {
* @throws Exception
*/
@Override
- public void sendMessage(Exchange exchange, AsyncCallback callback) throws Exception {
- List<Message> messages = new ArrayList<Message>();
+ public void sendMessage(final Exchange exchange, final AsyncCallback callback) throws
Exception {
+ Collection<Message> messages = new ArrayList<Message>(1);
MessageProducerResources producer = getProducers().borrowObject();
try {
- if (getProducers() != null) {
+ if (producer != null) {
if (exchange.getIn().getBody() != null) {
if (exchange.getIn().getBody() instanceof List) {
- List<?> payload = (List<?>)exchange.getIn().getBody();
- for (Object object : payload) {
- Message message = null;
+ Iterable<?> payload = (Iterable<?>)exchange.getIn().getBody();
+ for (final Object object : payload) {
+ Message message;
if (BatchMessage.class.isInstance(object)) {
BatchMessage<?> batchMessage = (BatchMessage<?>)object;
message = JmsMessageHelper.createMessage(producer.getSession(),
batchMessage.getPayload(), batchMessage.getHeaders(), getSjmsEndpoint()
@@ -124,14 +125,18 @@ public class InOnlyProducer extends SjmsProducer {
if (isEndpointTransacted()) {
exchange.getUnitOfWork().addSynchronization(new SessionTransactionSynchronization(producer.getSession(),
producer.getCommitStrategy()));
}
- for (Message message : messages) {
+ for (final Message message : messages) {
producer.getMessageProducer().send(message);
}
+ } else {
+ exchange.setException(new Exception("Unable to send message: connection not
available"));
}
} catch (Exception e) {
- exchange.setException(new Exception("Unable to complet sending the message: "
+ e.getLocalizedMessage()));
+ exchange.setException(new Exception("Unable to complete sending the message:
" + e.getLocalizedMessage()));
} finally {
- getProducers().returnObject(producer);
+ if (producer != null) {
+ getProducers().returnObject(producer);
+ }
callback.done(isSynchronous());
}
}
|