camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/5] camel git commit: CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.
Date Mon, 07 Sep 2015 09:38:50 GMT
Repository: camel
Updated Branches:
  refs/heads/master eef0c490d -> 25103bf65


CAMEL-9116: camel-sjms should use same binding to/from JMS as camel-jms does.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5b1d8da9
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5b1d8da9
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5b1d8da9

Branch: refs/heads/master
Commit: 5b1d8da9b9b537615e764ebb4f0f8298e83421a8
Parents: eef0c49
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Sep 7 09:31:30 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Sep 7 09:31:30 2015 +0200

----------------------------------------------------------------------
 .../camel/component/sjms/jms/JmsConstants.java  |   5 +
 .../component/sjms/jms/JmsMessageHelper.java    | 339 ++++++++++++-------
 .../component/sjms/producer/InOnlyProducer.java |   8 +-
 3 files changed, 227 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
index 08e0339..ccf1f96 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsConstants.java
@@ -21,6 +21,11 @@ package org.apache.camel.component.sjms.jms;
  */
 public interface JmsConstants {
 
+    String QUEUE_PREFIX = "queue:";
+    String TOPIC_PREFIX = "topic:";
+    String TEMP_QUEUE_PREFIX = "temp:queue:";
+    String TEMP_TOPIC_PREFIX = "temp:topic:";
+
     /**
      * Set by the publishing Client
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
index 79787c9..0a93f16 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/jms/JmsMessageHelper.java
@@ -20,10 +20,13 @@ import java.io.File;
 import java.io.InputStream;
 import java.io.Reader;
 import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
@@ -47,16 +50,20 @@ import org.apache.camel.TypeConverter;
 import org.apache.camel.component.sjms.SjmsConstants;
 import org.apache.camel.component.sjms.SjmsEndpoint;
 import org.apache.camel.impl.DefaultMessage;
+import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.camel.util.ObjectHelper.removeStartingCharacters;
+
 /**
  * Utility class for {@link javax.jms.Message}.
  */
 public final class JmsMessageHelper implements JmsConstants {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(JmsMessageHelper.class);
+    private static final Logger LOG = LoggerFactory.getLogger(JmsMessageHelper.class);
 
     private JmsMessageHelper() {
     }
@@ -98,7 +105,7 @@ public final class JmsMessageHelper implements JmsConstants {
                 case Bytes:
                     BytesMessage bytesMessage = (BytesMessage) message;
                     if (bytesMessage.getBodyLength() > Integer.MAX_VALUE) {
-                        LOGGER.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
+                        LOG.warn("Length of BytesMessage is too long: {}", bytesMessage.getBodyLength());
                         return null;
                     }
                     byte[] result = new byte[(int) bytesMessage.getBodyLength()];
@@ -164,16 +171,17 @@ public final class JmsMessageHelper implements JmsConstants {
             bodyHeaders = new HashMap<String, Object>(exchange.getIn().getHeaders());
         }
 
-        answer = createMessage(session, body, bodyHeaders, endpoint);
+        answer = createMessage(exchange, session, body, bodyHeaders, endpoint);
         return answer;
     }
 
-    public static Message createMessage(Session session, Object payload, Map<String, Object>
messageHeaders, SjmsEndpoint endpoint) throws Exception {
-        return createMessage(session, payload, messageHeaders, endpoint.isAllowNullBody(),
endpoint.getJmsKeyFormatStrategy(), endpoint.getCamelContext().getTypeConverter());
+    public static Message createMessage(Exchange exchange, Session session, Object payload,
Map<String, Object> messageHeaders, SjmsEndpoint endpoint) throws Exception {
+        return createMessage(exchange, session, payload, messageHeaders, endpoint.isAllowNullBody(),
+                endpoint.getSjmsHeaderFilterStrategy(), endpoint.getJmsKeyFormatStrategy(),
endpoint.getCamelContext().getTypeConverter());
     }
 
-    private static Message createMessage(Session session, Object payload, Map<String,
Object> messageHeaders, boolean allowNullBody,
-                                         KeyFormatStrategy keyFormatStrategy, TypeConverter
typeConverter) throws Exception {
+    private static Message createMessage(Exchange exchange, Session session, Object payload,
Map<String, Object> messageHeaders, boolean allowNullBody,
+                                         HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy
keyFormatStrategy, TypeConverter typeConverter) throws Exception {
         Message answer = null;
         JmsMessageType messageType = JmsMessageHelper.discoverMessageTypeFromPayload(payload);
 
@@ -224,113 +232,134 @@ public final class JmsMessageHelper implements JmsConstants {
             break;
         }
 
-        if (messageHeaders != null && !messageHeaders.isEmpty()) {
-            answer = JmsMessageHelper.setJmsMessageHeaders(answer, messageHeaders, keyFormatStrategy);
-        }
+        appendJmsProperties(answer, exchange, exchange.getIn(), headerFilterStrategy, keyFormatStrategy);
         return answer;
     }
 
     /**
-     * Adds or updates the {@link Message} headers. Header names and values are
-     * checked for JMS 1.1 compliance.
-     *
-     * @param jmsMessage        the {@link Message} to add or update the headers on
-     * @param messageHeaders    a {@link Map} of String/Object pairs
-     * @param keyFormatStrategy the a {@link KeyFormatStrategy} to used to
-     *                          format keys in a JMS 1.1 compliant manner.
-     * @return {@link Message}
+     * Appends the JMS headers from the Camel {@link Message}
      */
-    private static Message setJmsMessageHeaders(final Message jmsMessage, Map<String,
Object> messageHeaders, KeyFormatStrategy keyFormatStrategy) throws IllegalHeaderException
{
-
-        Map<String, Object> headers = new HashMap<String, Object>(messageHeaders);
-        for (final Map.Entry<String, Object> entry : headers.entrySet()) {
+    private static void appendJmsProperties(Message jmsMessage, Exchange exchange, org.apache.camel.Message
in,
+                                            HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy
keyFormatStrategy) throws JMSException {
+        Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
+        for (Map.Entry<String, Object> entry : entries) {
             String headerName = entry.getKey();
             Object headerValue = entry.getValue();
+            appendJmsProperty(jmsMessage, exchange, in, headerName, headerValue, headerFilterStrategy,
keyFormatStrategy);
+        }
+    }
 
-            if (headerName.equalsIgnoreCase(JMS_CORRELATION_ID)) {
-                if (headerValue == null) {
-                    // Value can be null but we can't cast a null to a String
-                    // so pass null to the setter
-                    setCorrelationId(jmsMessage, null);
-                } else if (headerValue instanceof String) {
-                    setCorrelationId(jmsMessage, (String) headerValue);
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_CORRELATION_ID + " must
either be a String or null.  Found: " + headerValue.getClass().getName());
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_REPLY_TO)) {
+    private static void appendJmsProperty(Message jmsMessage, Exchange exchange, org.apache.camel.Message
in,
+                                  String headerName, Object headerValue,
+                                  HeaderFilterStrategy headerFilterStrategy, KeyFormatStrategy
keyFormatStrategy) throws JMSException {
+        if (isStandardJMSHeader(headerName)) {
+            if (headerName.equals("JMSCorrelationID")) {
+                jmsMessage.setJMSCorrelationID(ExchangeHelper.convertToType(exchange, String.class,
headerValue));
+            } else if (headerName.equals("JMSReplyTo") && headerValue != null) {
                 if (headerValue instanceof String) {
-                    // FIXME Setting the reply to appears broken. walk back
-                    // through it. If the value is a String we must normalize it
-                    // first
-                } else {
-                    // TODO write destination converter
-                    // Destination replyTo =
-                    // ExchangeHelper.convertToType(exchange,
-                    // Destination.class,
-                    // headerValue);
-                    // jmsMessage.setJMSReplyTo(replyTo);
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_TYPE)) {
-                if (headerValue == null) {
-                    // Value can be null but we can't cast a null to a String
-                    // so pass null to the setter
-                    setMessageType(jmsMessage, null);
-                } else if (headerValue instanceof String) {
-                    // Not null but is a String
-                    setMessageType(jmsMessage, (String) headerValue);
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_TYPE + " must either be
a String or null.  Found: " + headerValue.getClass().getName());
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_PRIORITY)) {
-                if (headerValue instanceof Integer) {
-                    try {
-                        jmsMessage.setJMSPriority((Integer) headerValue);
-                    } catch (JMSException e) {
-                        throw new IllegalHeaderException("Failed to set the " + JMS_PRIORITY
+ " header. Cause: " + e.getLocalizedMessage(), e);
-                    }
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_PRIORITY + " must be a
Integer.  Type found: " + headerValue.getClass().getName());
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_DELIVERY_MODE)) {
-                try {
-                    JmsMessageHelper.setJMSDeliveryMode(jmsMessage, headerValue);
-                } catch (JMSException e) {
-                    throw new IllegalHeaderException("Failed to set the " + JMS_DELIVERY_MODE
+ " header. Cause: " + e.getLocalizedMessage(), e);
-                }
-            } else if (headerName.equalsIgnoreCase(JMS_EXPIRATION)) {
-                if (headerValue instanceof Long) {
-                    try {
-                        jmsMessage.setJMSExpiration((Long) headerValue);
-                    } catch (JMSException e) {
-                        throw new IllegalHeaderException("Failed to set the " + JMS_EXPIRATION
+ " header. Cause: " + e.getLocalizedMessage(), e);
-                    }
-                } else {
-                    throw new IllegalHeaderException("The " + JMS_EXPIRATION + " must be
a Long.  Type found: " + headerValue.getClass().getName());
+                    // if the value is a String we must normalize it first, and must include
the prefix
+                    // as ActiveMQ requires that when converting the String to a javax.jms.Destination
type
+                    headerValue = normalizeDestinationName((String) headerValue, true);
                 }
+                Destination replyTo = ExchangeHelper.convertToType(exchange, Destination.class,
headerValue);
+                JmsMessageHelper.setJMSReplyTo(jmsMessage, replyTo);
+            } else if (headerName.equals("JMSType")) {
+                jmsMessage.setJMSType(ExchangeHelper.convertToType(exchange, String.class,
headerValue));
+            } else if (headerName.equals("JMSPriority")) {
+                jmsMessage.setJMSPriority(ExchangeHelper.convertToType(exchange, Integer.class,
headerValue));
+            } else if (headerName.equals("JMSDeliveryMode")) {
+                JmsMessageHelper.setJMSDeliveryMode(exchange, jmsMessage, headerValue);
+            } else if (headerName.equals("JMSExpiration")) {
+                jmsMessage.setJMSExpiration(ExchangeHelper.convertToType(exchange, Long.class,
headerValue));
             } else {
-                LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
-                if (headerName.equalsIgnoreCase(JMS_DESTINATION) || headerName.equalsIgnoreCase(JMS_MESSAGE_ID)
|| headerName.equalsIgnoreCase(JMS_TIMESTAMP)
-                        || headerName.equalsIgnoreCase(JMS_REDELIVERED)) {
-                    // The following properties are set by the
-                    // MessageProducer:
-                    // JMSDestination
-                    // The following are set on the underlying JMS provider:
-                    // JMSMessageID, JMSTimestamp, JMSRedelivered
-                    // log at trace level to not spam log
-                    LOGGER.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
-                } else {
-                    if (!(headerValue instanceof JmsMessageType)) {
-                        String encodedName = keyFormatStrategy.encodeKey(headerName);
-                        try {
-                            JmsMessageHelper.setProperty(jmsMessage, encodedName, headerValue);
-                        } catch (JMSException e) {
-                            throw new IllegalHeaderException("Failed to set the header "
+ encodedName + " header. Cause: " + e.getLocalizedMessage(), e);
-                        }
-                    }
-                }
+                // The following properties are set by the MessageProducer:
+                // JMSDestination
+                // The following are set on the underlying JMS provider:
+                // JMSMessageID, JMSTimestamp, JMSRedelivered
+                // log at trace level to not spam log
+                LOG.trace("Ignoring JMS header: {} with value: {}", headerName, headerValue);
+            }
+        } else if (shouldOutputHeader(in, headerName, headerValue, exchange, headerFilterStrategy))
{
+            // only primitive headers and strings is allowed as properties
+            // see message properties: http://java.sun.com/j2ee/1.4/docs/api/javax/jms/Message.html
+            Object value = getValidJMSHeaderValue(headerName, headerValue);
+            if (value != null) {
+                // must encode to safe JMS header name before setting property on jmsMessage
+                String key = keyFormatStrategy.encodeKey(headerName);
+                // set the property
+                JmsMessageHelper.setProperty(jmsMessage, key, value);
+            } else if (LOG.isDebugEnabled()) {
+                // okay the value is not a primitive or string so we cannot sent it over
the wire
+                LOG.debug("Ignoring non primitive header: {} of class: {} with value: {}",
+                        new Object[]{headerName, headerValue.getClass().getName(), headerValue});
+            }
+        }
+    }
+
+    /**
+     * Is the given header a standard JMS header
+     * @param headerName the header name
+     * @return <tt>true</tt> if its a standard JMS header
+     */
+    protected static boolean isStandardJMSHeader(String headerName) {
+        if (!headerName.startsWith("JMS")) {
+            return false;
+        }
+        if (headerName.startsWith("JMSX")) {
+            return false;
+        }
+        // vendors will use JMS_XXX as their special headers (where XXX is vendor name, such
as JMS_IBM)
+        if (headerName.startsWith("JMS_")) {
+            return false;
+        }
+
+        // the 4th char must be a letter to be a standard JMS header
+        if (headerName.length() > 3) {
+            Character fourth = headerName.charAt(3);
+            if (Character.isLetter(fourth)) {
+                return true;
             }
         }
-        return jmsMessage;
+
+        return false;
+    }
+
+    /**
+     * Strategy to test if the given header is valid according to the JMS spec to be set
as a property
+     * on the JMS message.
+     * <p/>
+     * This default implementation will allow:
+     * <ul>
+     *   <li>any primitives and their counter Objects (Integer, Double etc.)</li>
+     *   <li>String and any other literals, Character, CharSequence</li>
+     *   <li>Boolean</li>
+     *   <li>Number</li>
+     *   <li>java.util.Date</li>
+     * </ul>
+     *
+     * @param headerName   the header name
+     * @param headerValue  the header value
+     * @return  the value to use, <tt>null</tt> to ignore this header
+     */
+    protected static Object getValidJMSHeaderValue(String headerName, Object headerValue)
{
+        if (headerValue instanceof String) {
+            return headerValue;
+        } else if (headerValue instanceof BigInteger) {
+            return headerValue.toString();
+        } else if (headerValue instanceof BigDecimal) {
+            return headerValue.toString();
+        } else if (headerValue instanceof Number) {
+            return headerValue;
+        } else if (headerValue instanceof Character) {
+            return headerValue;
+        } else if (headerValue instanceof CharSequence) {
+            return headerValue.toString();
+        } else if (headerValue instanceof Boolean) {
+            return headerValue;
+        } else if (headerValue instanceof Date) {
+            return headerValue.toString();
+        }
+        return null;
     }
 
     @SuppressWarnings("unchecked")
@@ -375,6 +404,17 @@ public final class JmsMessageHelper implements JmsConstants {
     }
 
     /**
+     * Strategy to allow filtering of headers which are put on the JMS message
+     * <p/>
+     * <b>Note</b>: Currently only supports sending java identifiers as keys
+     */
+    protected static boolean shouldOutputHeader(org.apache.camel.Message camelMessage, String
headerName,
+                                               Object headerValue, Exchange exchange, HeaderFilterStrategy
headerFilterStrategy) {
+        return headerFilterStrategy == null
+                || !headerFilterStrategy.applyFilterToCamelHeaders(headerName, headerValue,
exchange);
+    }
+
+    /**
      * Gets the JMSType from the message.
      *
      * @param message the message
@@ -393,41 +433,41 @@ public final class JmsMessageHelper implements JmsConstants {
     /**
      * Sets the JMSDeliveryMode on the message.
      *
-     * @param message      the message
-     * @param deliveryMode the delivery mode, either as a String or integer
+     * @param exchange the exchange
+     * @param message  the message
+     * @param deliveryMode  the delivery mode, either as a String or integer
      * @throws javax.jms.JMSException is thrown if error setting the delivery mode
      */
-    public static void setJMSDeliveryMode(Message message, Object deliveryMode) throws JMSException
{
-        Integer mode;
+    public static void setJMSDeliveryMode(Exchange exchange, Message message, Object deliveryMode)
throws JMSException {
+        Integer mode = null;
 
         if (deliveryMode instanceof String) {
             String s = (String) deliveryMode;
-            if (JMS_DELIVERY_MODE_PERSISTENT.equalsIgnoreCase(s)) {
+            if ("PERSISTENT".equalsIgnoreCase(s)) {
                 mode = DeliveryMode.PERSISTENT;
-            } else if (JMS_DELIVERY_MODE_NON_PERSISTENT.equalsIgnoreCase(s)) {
+            } else if ("NON_PERSISTENT".equalsIgnoreCase(s)) {
                 mode = DeliveryMode.NON_PERSISTENT;
             } else {
                 // it may be a number in the String so try that
-                Integer value = null;
-                try {
-                    value = Integer.valueOf(s);
-                } catch (NumberFormatException e) {
-                    // Do nothing. The error handler below is sufficient
-                }
+                Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
                 if (value != null) {
                     mode = value;
                 } else {
                     throw new IllegalArgumentException("Unknown delivery mode with value:
" + deliveryMode);
                 }
             }
-        } else if (deliveryMode instanceof Integer) {
-            // fallback and try to convert to a number
-            mode = (Integer) deliveryMode;
         } else {
-            throw new IllegalArgumentException("Unable to convert the given delivery mode
of type " + deliveryMode.getClass().getName() + " with value: " + deliveryMode);
+            // fallback and try to convert to a number
+            Integer value = ExchangeHelper.convertToType(exchange, Integer.class, deliveryMode);
+            if (value != null) {
+                mode = value;
+            }
         }
 
-        message.setJMSDeliveryMode(mode);
+        if (mode != null) {
+            message.setJMSDeliveryMode(mode);
+            message.setIntProperty(JmsConstants.JMS_DELIVERY_MODE, mode);
+        }
     }
 
     /**
@@ -442,7 +482,7 @@ public final class JmsMessageHelper implements JmsConstants {
         try {
             message.setJMSType(type);
         } catch (JMSException e) {
-            LOGGER.debug("Error setting the message type: {}", type, e);
+            LOG.debug("Error setting the message type: {}", type, e);
         }
     }
 
@@ -458,7 +498,7 @@ public final class JmsMessageHelper implements JmsConstants {
         try {
             message.setJMSCorrelationID(correlationId);
         } catch (JMSException e) {
-            LOGGER.debug("Error setting the correlationId: {}", correlationId, e);
+            LOG.debug("Error setting the correlationId: {}", correlationId, e);
         }
     }
 
@@ -472,7 +512,7 @@ public final class JmsMessageHelper implements JmsConstants {
         try {
             message.setJMSReplyTo(replyTo);
         } catch (Exception e) {
-            LOGGER.debug("Error setting the correlationId: {}", replyTo.toString());
+            LOG.debug("Error setting the correlationId: {}", replyTo.toString());
         }
     }
 
@@ -589,4 +629,61 @@ public final class JmsMessageHelper implements JmsConstants {
         return key == null || key.isEmpty() || key.contains(".") || key.contains("-");
     }
 
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>,
which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination) {
+        // do not include prefix which is the current behavior when using this method.
+        return normalizeDestinationName(destination, false);
+    }
+
+    /**
+     * Normalizes the destination name.
+     * <p/>
+     * This ensures the destination name is correct, and we do not create queues as <tt>queue://queue:foo</tt>,
which
+     * was intended as <tt>queue://foo</tt>.
+     *
+     * @param destination the destination
+     * @param includePrefix whether to include <tt>queue://</tt>, or <tt>topic://</tt>
prefix in the normalized destination name
+     * @return the normalized destination
+     */
+    public static String normalizeDestinationName(String destination, boolean includePrefix)
{
+        if (ObjectHelper.isEmpty(destination)) {
+            return destination;
+        }
+        if (destination.startsWith(QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(QUEUE_PREFIX.length()),
'/');
+            if (includePrefix) {
+                s = QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_QUEUE_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_QUEUE_PREFIX.length()),
'/');
+            if (includePrefix) {
+                s = TEMP_QUEUE_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TOPIC_PREFIX.length()),
'/');
+            if (includePrefix) {
+                s = TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else if (destination.startsWith(TEMP_TOPIC_PREFIX)) {
+            String s = removeStartingCharacters(destination.substring(TEMP_TOPIC_PREFIX.length()),
'/');
+            if (includePrefix) {
+                s = TEMP_TOPIC_PREFIX + "//" + s;
+            }
+            return s;
+        } else {
+            return destination;
+        }
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/5b1d8da9/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
index 71f5770..3fa23b0 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/producer/InOnlyProducer.java
@@ -93,19 +93,19 @@ public class InOnlyProducer extends SjmsProducer {
                         Message message;
                         if (BatchMessage.class.isInstance(object)) {
                             BatchMessage<?> batchMessage = (BatchMessage<?>)
object;
-                            message = JmsMessageHelper.createMessage(producer.getSession(),
batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
+                            message = JmsMessageHelper.createMessage(exchange, producer.getSession(),
batchMessage.getPayload(), batchMessage.getHeaders(), getEndpoint());
                         } else {
-                            message = JmsMessageHelper.createMessage(producer.getSession(),
object, exchange.getIn().getHeaders(), getEndpoint());
+                            message = JmsMessageHelper.createMessage(exchange, producer.getSession(),
object, exchange.getIn().getHeaders(), getEndpoint());
                         }
                         messages.add(message);
                     }
                 } else {
                     Object payload = exchange.getIn().getBody();
-                    Message message = JmsMessageHelper.createMessage(producer.getSession(),
payload, exchange.getIn().getHeaders(), getEndpoint());
+                    Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(),
payload, exchange.getIn().getHeaders(), getEndpoint());
                     messages.add(message);
                 }
             } else {
-                Message message = JmsMessageHelper.createMessage(producer.getSession(), null,
exchange.getIn().getHeaders(), getEndpoint());
+                Message message = JmsMessageHelper.createMessage(exchange, producer.getSession(),
null, exchange.getIn().getHeaders(), getEndpoint());
                 messages.add(message);
             }
 


Mime
View raw message