activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r475701 - in /incubator/activemq/trunk: activemq-core/src/main/java/org/apache/activemq/ activemq-optional/src/main/java/org/apache/activemq/util/xstream/ activemq-optional/src/test/java/org/apache/activemq/util/xstream/
Date Thu, 16 Nov 2006 12:08:23 GMT
Author: jstrachan
Date: Thu Nov 16 04:08:22 2006
New Revision: 475701

URL: http://svn.apache.org/viewvc?view=rev&rev=475701
Log:
Fix for AMQ-1053. Added support for pluggable message transformation together with an implementation
using XStream to marshall ObjectMessage instances as TextMessages. See the XStreamTransformTest
to see how a producer can send ObjectMessages but then consumers can see either ObjectMessages
or TextMessages depending on their requirements.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
  (with props)
    incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/
    incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
  (with props)
    incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/
    incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
  (with props)
    incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
  (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?view=diff&rev=475701&r1=475700&r2=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Thu Nov 16 04:08:22 2006
@@ -117,6 +117,8 @@
     // Configuration options variables
     private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
     private RedeliveryPolicy redeliveryPolicy;
+    private MessageTransformer transformer;
+
     private boolean disableTimeStampsByDefault = false;
     private boolean optimizedMessageDispatch = true;
     private boolean copyMessageOnSend = true;
@@ -873,7 +875,19 @@
     public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
         this.sessionTaskRunner = sessionTaskRunner;
     }
-    
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on to the JMS
bus
+     * or when they are received from the bus but before they are delivered to the JMS client
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
     /**
      * @return the statsEnabled
      */
@@ -1450,7 +1464,7 @@
     }
     
     /**
-     * @param command - the command to consume
+     * @param o - the command to consume
      */
     public void onCommand(final Object o) {
     	final Command command = (Command) o;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=475701&r1=475700&r2=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Thu Nov 16 04:08:22 2006
@@ -72,6 +72,7 @@
     // optimization flags
     private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
     private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+    private MessageTransformer transformer;
 
     private boolean disableTimeStampsByDefault = false;
     private boolean optimizedMessageDispatch = true;
@@ -256,6 +257,7 @@
             connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
             connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
             connection.setRedeliveryPolicy(getRedeliveryPolicy());
+            connection.setTransformer(getTransformer());
 
             transport.start();
 
@@ -444,6 +446,18 @@
      */
     public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
         this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on to the JMS
bus
+     * or when they are received from the bus but before they are delivered to the JMS client
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
     }
 
     public void buildFromProperties(Properties properties) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=475701&r1=475700&r2=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Thu Nov 16 04:08:22 2006
@@ -119,7 +119,8 @@
     private boolean optimizeAcknowledge;
     private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
     private ExecutorService executorService = null;
-   
+    private MessageTransformer transformer;
+
     /**
      * Create a MessageConsumer
      * 
@@ -162,6 +163,7 @@
         this.session = session;
         this.selector = selector;
         this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
+        setTransformer(session.getTransformer());
 
         this.info = new ConsumerInfo(consumerId);
         this.info.setSubscriptionName(name);
@@ -223,6 +225,18 @@
         this.redeliveryPolicy = redeliveryPolicy;
     }
 
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on to the JMS
bus
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
+
     /**
      * @return Returns the value.
      */
@@ -435,8 +449,14 @@
      * @param md
      * @return
      */
-    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) {
+    private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException
{
         ActiveMQMessage m = (ActiveMQMessage) md.getMessage().copy();
+        if (transformer != null) {
+            Message transformedMessage = transformer.consumerTransform(session, this, m);
+            if (transformedMessage != null) {
+                m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
+            }
+        }
         if (session.isClientAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
                 public void execute() throws Exception {
@@ -538,7 +558,7 @@
     public void close() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
             dispose();
-            this.session.syncSendPacket(info.createRemoveCommand());
+            this.session.asyncSendPacket(info.createRemoveCommand());
         }
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java?view=diff&rev=475701&r1=475700&r2=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageProducer.java
Thu Nov 16 04:08:22 2006
@@ -82,6 +82,7 @@
     private int defaultPriority;
     private long defaultTimeToLive;
     private long startTime;
+    private MessageTransformer transformer;
 
     protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination
destination)
             throws JMSException {
@@ -98,6 +99,7 @@
         this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
         this.session.addProducer(this);        
         this.session.asyncSendPacket(info);
+        setTransformer(session.getTransformer());
     }
 
     public StatsImpl getStats() {
@@ -461,9 +463,27 @@
         if (dest == null) {
             throw new JMSException("No destination specified");
         }
-        
+
+        if (transformer != null) {
+            Message transformedMessage = transformer.producerTransform(session, this, message);
+            if (transformedMessage != null) {
+                message = transformedMessage;
+            }
+        }
         this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
         stats.onMessage();            
+    }
+
+
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on to the JMS
bus
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
     }
 
     /**

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java?view=diff&rev=475701&r1=475700&r2=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageTransformation.java
Thu Nov 16 04:08:22 2006
@@ -168,26 +168,37 @@
                 activeMessage.setConnection(connection);
             }
 
-            activeMessage.setJMSMessageID(message.getJMSMessageID());
-            activeMessage.setJMSCorrelationID(message.getJMSCorrelationID());
-            activeMessage.setJMSReplyTo(transformDestination(message.getJMSReplyTo()));
-            activeMessage.setJMSDestination(transformDestination(message.getJMSDestination()));
-            activeMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
-            activeMessage.setJMSRedelivered(message.getJMSRedelivered());
-            activeMessage.setJMSType(message.getJMSType());
-            activeMessage.setJMSExpiration(message.getJMSExpiration());
-            activeMessage.setJMSPriority(message.getJMSPriority());
-            activeMessage.setJMSTimestamp(message.getJMSTimestamp());
+            copyProperties(message, activeMessage);
 
-            Enumeration propertyNames = message.getPropertyNames();
+            return activeMessage;
+        }
+    }
 
-            while (propertyNames.hasMoreElements()) {
-                String name = propertyNames.nextElement().toString();
-                Object obj = message.getObjectProperty(name);
-                activeMessage.setObjectProperty(name, obj);
-            }
+    /**
+     * Copies the standard JMS and user defined properties from the givem message to the
specified message
+     *
+     * @param fromMessage the message to take the properties from
+     * @param toMesage the message to add the properties to
+     * @throws JMSException
+     */
+    public static void copyProperties(Message fromMessage, Message toMesage) throws JMSException
{
+        toMesage.setJMSMessageID(fromMessage.getJMSMessageID());
+        toMesage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
+        toMesage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
+        toMesage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
+        toMesage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
+        toMesage.setJMSRedelivered(fromMessage.getJMSRedelivered());
+        toMesage.setJMSType(fromMessage.getJMSType());
+        toMesage.setJMSExpiration(fromMessage.getJMSExpiration());
+        toMesage.setJMSPriority(fromMessage.getJMSPriority());
+        toMesage.setJMSTimestamp(fromMessage.getJMSTimestamp());
 
-            return activeMessage;
+        Enumeration propertyNames = fromMessage.getPropertyNames();
+
+        while (propertyNames.hasMoreElements()) {
+            String name = propertyNames.nextElement().toString();
+            Object obj = fromMessage.getObjectProperty(name);
+            toMesage.setObjectProperty(name, obj);
         }
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?view=diff&rev=475701&r1=475700&r2=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Thu Nov 16 04:08:22 2006
@@ -183,7 +183,8 @@
     private JMSSessionStatsImpl stats;
     private TransactionContext transactionContext;
     private DeliveryListener deliveryListener;
-    
+    private MessageTransformer transformer;
+
     protected final ActiveMQConnection connection;
     protected final SessionInfo info;
     protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@@ -224,6 +225,7 @@
         connection.addSession(this);
         stats = new JMSSessionStatsImpl(producers, consumers);
         this.connection.asyncSendPacket(info);
+        setTransformer(connection.getTransformer());
         
         if( connection.isStarted() )
             start();
@@ -1702,7 +1704,19 @@
         this.sessionAsyncDispatch=sessionAsyncDispatch;
     }
 
-	public List getUnconsumedMessages() {
+    public MessageTransformer getTransformer() {
+        return transformer;
+    }
+
+    /**
+     * Sets the transformer used to transform messages before they are sent on to the JMS
bus
+     * or when they are received from the bus but before they are delivered to the JMS client
+     */
+    public void setTransformer(MessageTransformer transformer) {
+        this.transformer = transformer;
+    }
+
+    public List getUnconsumedMessages() {
 		return executor.getUnconsumedMessages();
 	}
     

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java?view=auto&rev=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
Thu Nov 16 04:08:22 2006
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import org.apache.activemq.command.ActiveMQMessage;
+
+import javax.jms.Message;
+import javax.jms.Session;
+import javax.jms.MessageProducer;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+
+/**
+ * A plugin strategy for transforming a message before it is sent by the JMS client or before
it is
+ * dispatched to the JMS consumer
+ *
+ * @version $Revision$
+ */
+public interface MessageTransformer {
+
+    /**
+     * Transforms the given message inside the producer before it is sent to the JMS bus.
+     */
+    public Message producerTransform(Session session, MessageProducer producer, Message message)
throws JMSException;
+
+    /**
+     * Transforms the given message inside the consumer before being dispatched to the client
code
+     */
+    public Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws
JMSException;
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java?view=auto&rev=475701
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
Thu Nov 16 04:08:22 2006
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+/**
+ * A useful base class for message transformers.
+ *
+ * @version $Revision$
+ */
+public abstract class MessageTransformerSupport implements MessageTransformer {
+
+    /**
+     * Copies the standard JMS and user defined properties from the givem message to the
specified message
+     *
+     * @param fromMessage the message to take the properties from
+     * @param toMesage the message to add the properties to
+     * @throws JMSException
+     */
+    protected void copyProperties(Message fromMessage, Message toMesage) throws JMSException
{
+        ActiveMQMessageTransformation.copyProperties(fromMessage, toMesage);
+    }
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/MessageTransformerSupport.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java?view=auto&rev=475701
==============================================================================
--- incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
(added)
+++ incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
Thu Nov 16 04:08:22 2006
@@ -0,0 +1,112 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util.xstream;
+
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.HierarchicalStreamReader;
+import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
+import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
+import com.thoughtworks.xstream.io.xml.XppReader;
+import org.apache.activemq.MessageTransformerSupport;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.io.Serializable;
+import java.io.StringReader;
+import java.io.StringWriter;
+
+/**
+ * Transforms object messages to text messages using {@link XStream}
+ *
+ * @version $Revision$
+ */
+public class XStreamMessageTransformer extends MessageTransformerSupport {
+    private XStream xStream;
+
+
+    public Message producerTransform(Session session, MessageProducer producer, Message message)
throws JMSException {
+        if (message instanceof ObjectMessage) {
+            TextMessage answer = session.createTextMessage(marshall(session, producer, (ObjectMessage)
message));
+            copyProperties(message, answer);
+            return answer;
+        }
+        return message;
+    }
+
+
+    public Message consumerTransform(Session session, MessageConsumer consumer, Message message)
throws JMSException {
+        if (message instanceof TextMessage) {
+            TextMessage textMessage = (TextMessage) message;
+            Object object = unmarshall(session, consumer, textMessage);
+            if (object instanceof Serializable) {
+                ObjectMessage answer = session.createObjectMessage((Serializable) object);
+                copyProperties(message, answer);
+                return answer;
+            }
+            else {
+                throw new JMSException("Object is not serializable: " + object);
+            }
+        }
+        return message;
+    }
+
+
+    // Properties
+    // -------------------------------------------------------------------------
+    public XStream getXStream() {
+        if (xStream == null) {
+            xStream = createXStream();
+        }
+        return xStream;
+    }
+
+    public void setXStream(XStream xStream) {
+        this.xStream = xStream;
+    }
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected XStream createXStream() {
+        return new XStream();
+    }
+
+    /**
+     * Marshalls the Object in the {@link ObjectMessage} to a string using XML encoding
+     */
+    protected String marshall(Session session, MessageProducer producer, ObjectMessage objectMessage)
throws JMSException {
+        Serializable object = objectMessage.getObject();
+        StringWriter buffer = new StringWriter();
+        HierarchicalStreamWriter out = new PrettyPrintWriter(buffer);
+        getXStream().marshal(object, out);
+        return buffer.toString();
+    }
+
+    /**
+     * Unmarshalls the Object using XML encoding of the String
+     */
+    protected Object unmarshall(Session session, MessageConsumer consumer, TextMessage textMessage)
throws JMSException {
+        HierarchicalStreamReader in = new XppReader(new StringReader(textMessage.getText()));
+        return getXStream().unmarshal(in);
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-optional/src/main/java/org/apache/activemq/util/xstream/XStreamMessageTransformer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java?view=auto&rev=475701
==============================================================================
--- incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
(added)
+++ incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
Thu Nov 16 04:08:22 2006
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util.xstream;
+
+import java.io.Serializable;
+
+/**
+ * @version $Revision$
+ */
+public class SamplePojo implements Serializable {
+    private String name;
+    private String city;
+
+    public SamplePojo() {
+    }
+
+    public SamplePojo(String name, String city) {
+        this.name = name;
+        this.city = city;
+    }
+
+
+    public String getCity() {
+        return city;
+    }
+
+    public void setCity(String city) {
+        this.city = city;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}

Propchange: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/SamplePojo.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java?view=auto&rev=475701
==============================================================================
--- incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
(added)
+++ incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
Thu Nov 16 04:08:22 2006
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util.xstream;
+
+import junit.framework.TestCase;
+
+import javax.jms.*;
+
+import org.apache.activemq.*;
+
+/**
+ * @version $Revision$
+ */
+public class XStreamTransformTest extends TestCase {
+    protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+    protected Connection connection;
+    protected long timeout = 5000;
+
+    public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception
{
+        // lets create the consumers
+        Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = objectSession.createTopic(getClass().getName());
+        MessageConsumer objectConsumer = objectSession.createConsumer(destination);
+
+        Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer textConsumer = textSession.createConsumer(destination);
+        // lets clear the transformer on this consumer so we see the message as it really
is
+        ((ActiveMQMessageConsumer) textConsumer).setTransformer(null);
+
+
+        // send a message
+        Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = producerSession.createProducer(destination);
+
+        ObjectMessage request = producerSession.createObjectMessage(new SamplePojo("James",
"London"));
+        producer.send(request);
+
+
+        // lets consume it as an object message
+        Message message = objectConsumer.receive(timeout);
+        assertNotNull("Should have received a message!", message);
+        assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
+        ObjectMessage objectMessage = (ObjectMessage) message;
+        Object object = objectMessage.getObject();
+        assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
+        SamplePojo body = (SamplePojo) object;
+        assertEquals("name", "James", body.getName());
+        assertEquals("city", "London", body.getCity());
+
+
+        // lets consume it as a text message
+        message = textConsumer.receive(timeout);
+        assertNotNull("Should have received a message!", message);
+        assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
+        TextMessage textMessage = (TextMessage) message;
+        String text = textMessage.getText();
+        assertTrue("Text should be non-empty!", text != null && text.length() >
0);
+        System.out.println("Received XML...");
+        System.out.println(text);
+    }
+
+
+    protected void setUp() throws Exception {
+        connectionFactory.setTransformer(new XStreamMessageTransformer());
+        connection = connectionFactory.createConnection();
+        connection.start();
+    }
+
+
+    protected void tearDown() throws Exception {
+        if (connection != null) {
+            connection.close();
+        }
+    }
+}

Propchange: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-optional/src/test/java/org/apache/activemq/util/xstream/XStreamTransformTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message