activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r592531 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp: FrameTranslator.java LegacyFrameTranslator.java ProtocolConverter.java
Date Tue, 06 Nov 2007 19:44:59 GMT
Author: chirino
Date: Tue Nov  6 11:44:58 2007
New Revision: 592531

URL: http://svn.apache.org/viewvc?rev=592531&view=rev
Log:
Added better temp destination support.  We now properly create temp destinations that the
client can subscribe to

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=592531&r1=592530&r2=592531&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
Tue Nov  6 11:44:58 2007
@@ -33,13 +33,13 @@
  * from one to the other
  */
 public interface FrameTranslator {
-    ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException;
+    ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException,
ProtocolException;
 
-    StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException;
+    StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws
IOException, JMSException;
 
-    String convertDestination(Destination d);
+    String convertDestination(ProtocolConverter converter, Destination d);
 
-    ActiveMQDestination convertDestination(String name) throws ProtocolException;
+    ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws
ProtocolException;
 
     /**
      * Helper class which holds commonly needed functions used when implementing
@@ -50,9 +50,9 @@
         private Helper() {
         }
 
-        public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message,
StompFrame command, FrameTranslator ft) throws IOException {
+        public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter,
ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException {
             final Map<String, String> headers = command.getHeaders();
-            headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination()));
+            headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(converter,
message.getDestination()));
             headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID());
 
             if (message.getJMSCorrelationID() != null) {
@@ -66,7 +66,7 @@
             headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority());
 
             if (message.getJMSReplyTo() != null) {
-                headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo()));
+                headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(converter,
message.getJMSReplyTo()));
             }
             headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp());
 
@@ -83,10 +83,10 @@
             }
         }
 
-        public static void copyStandardHeadersFromFrameToMessage(StompFrame command, ActiveMQMessage
msg, FrameTranslator ft) throws ProtocolException, JMSException {
+        public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter,
StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException
{
             final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
             final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
-            msg.setDestination(ft.convertDestination(destination));
+            msg.setDestination(ft.convertDestination(converter, destination));
 
             // the standard JMS headers
             msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
@@ -108,7 +108,7 @@
 
             o = headers.remove(Stomp.Headers.Send.REPLY_TO);
             if (o != null) {
-                msg.setJMSReplyTo(ft.convertDestination((String)o));
+                msg.setJMSReplyTo(ft.convertDestination(converter, (String)o));
             }
 
             o = headers.remove(Stomp.Headers.Send.PERSISTENT);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=592531&r1=592530&r2=592531&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
Tue Nov  6 11:44:58 2007
@@ -32,7 +32,9 @@
  * Implements ActiveMQ 4.0 translations
  */
 public class LegacyFrameTranslator implements FrameTranslator {
-    public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException
{
+	
+	
+    public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command)
throws JMSException, ProtocolException {
         final Map headers = command.getHeaders();
         final ActiveMQMessage msg;
         if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) {
@@ -49,17 +51,17 @@
             }
             msg = text;
         }
-        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this);
+        FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command,
msg, this);
         return msg;
     }
 
-    public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
+    public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message)
throws IOException, JMSException {
         StompFrame command = new StompFrame();
         command.setAction(Stomp.Responses.MESSAGE);
         Map<String, String> headers = new HashMap<String, String>(25);
         command.setHeaders(headers);
 
-        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this);
+        FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message,
command, this);
 
         if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
 
@@ -79,23 +81,28 @@
         return command;
     }
 
-    public String convertDestination(Destination d) {
+    public String convertDestination(ProtocolConverter converter, Destination d) {
         if (d == null) {
             return null;
         }
         ActiveMQDestination activeMQDestination = (ActiveMQDestination)d;
         String physicalName = activeMQDestination.getPhysicalName();
 
+        String rc = converter.getCreatedTempDestinationName(activeMQDestination);
+        if( rc!=null ) {
+        	return rc;
+        }
+        
         StringBuffer buffer = new StringBuffer();
         if (activeMQDestination.isQueue()) {
             if (activeMQDestination.isTemporary()) {
-                buffer.append("/temp-queue/");
+                buffer.append("/remote-temp-queue/");
             } else {
                 buffer.append("/queue/");
             }
         } else {
             if (activeMQDestination.isTemporary()) {
-                buffer.append("/temp-topic/");
+                buffer.append("/remote-temp-topic/");
             } else {
                 buffer.append("/topic/");
             }
@@ -104,7 +111,7 @@
         return buffer.toString();
     }
 
-    public ActiveMQDestination convertDestination(String name) throws ProtocolException {
+    public ActiveMQDestination convertDestination(ProtocolConverter converter, String name)
throws ProtocolException {
         if (name == null) {
             return null;
         } else if (name.startsWith("/queue/")) {
@@ -113,12 +120,16 @@
         } else if (name.startsWith("/topic/")) {
             String tName = name.substring("/topic/".length(), name.length());
             return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
-        } else if (name.startsWith("/temp-queue/")) {
-            String tName = name.substring("/temp-queue/".length(), name.length());
+        } else if (name.startsWith("/remote-temp-queue/")) {
+            String tName = name.substring("/remote-temp-queue/".length(), name.length());
             return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
-        } else if (name.startsWith("/temp-topic/")) {
-            String tName = name.substring("/temp-topic/".length(), name.length());
+        } else if (name.startsWith("/remote-temp-topic/")) {
+            String tName = name.substring("/remote-temp-topic/".length(), name.length());
             return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
+        } else if (name.startsWith("/temp-queue/")) {
+            return converter.createTempQueue(name);
+        } else if (name.startsWith("/temp-topic/")) {
+            return converter.createTempTopic(name);
         } else {
             throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ
STOMP destinations "
                                         + "must begine with one of: /queue/ /topic/ /temp-queue/
/temp-topic/");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=592531&r1=592530&r2=592531&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
Tue Nov  6 11:44:58 2007
@@ -25,15 +25,19 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import javax.jms.Destination;
 import javax.jms.JMSException;
 
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
 import org.apache.activemq.command.LocalTransactionId;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
@@ -65,9 +69,12 @@
     private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
     private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator();
+    private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator();
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new
ConcurrentHashMap<Integer, ResponseHandler>();
     private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, StompSubscription>();
+    private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations =
new ConcurrentHashMap<String, ActiveMQDestination>();
+    private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap =
new ConcurrentHashMap<String, String>();
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String,
LocalTransactionId>();
     private final StompTransportFilter transportFilter;
 
@@ -325,7 +332,7 @@
         String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
         String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
 
-        ActiveMQDestination actualDest = frameTranslator.convertDestination(destination);
+        ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination);
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
         consumerInfo.setPrefetchSize(1000);
@@ -336,7 +343,7 @@
 
         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
 
-        consumerInfo.setDestination(frameTranslator.convertDestination(destination));
+        consumerInfo.setDestination(frameTranslator.convertDestination(this, destination));
 
         StompSubscription stompSubscription = new StompSubscription(this, subscriptionId,
consumerInfo);
         stompSubscription.setDestination(actualDest);
@@ -360,7 +367,7 @@
         ActiveMQDestination destination = null;
         Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
         if (o != null) {
-            destination = frameTranslator.convertDestination((String)o);
+            destination = frameTranslator.convertDestination(this, (String)o);
         }
 
         String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
@@ -489,15 +496,40 @@
     }
 
     public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException
{
-        ActiveMQMessage msg = frameTranslator.convertFrame(command);
+        ActiveMQMessage msg = frameTranslator.convertFrame(this, command);
         return msg;
     }
 
     public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException
{
-        return frameTranslator.convertMessage(message);
+        return frameTranslator.convertMessage(this, message);
     }
 
     public StompTransportFilter getTransportFilter() {
         return transportFilter;
     }
+
+	public ActiveMQDestination createTempQueue(String name) {
+        ActiveMQDestination rc = tempDestinations.get(name);
+        if( rc == null ) {
+            rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId());
+            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
+            tempDestinations.put(name, rc);
+        }        
+        return rc;
+	}
+
+	public ActiveMQDestination createTempTopic(String name) {
+        ActiveMQDestination rc = tempDestinations.get(name);
+        if( rc == null ) {
+            rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId());
+            sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE,
rc), null);
+            tempDestinations.put(name, rc);
+            tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name);
+        }        
+        return rc;
+	}
+
+	public String getCreatedTempDestinationName(ActiveMQDestination destination) {
+		return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
+	}
 }



Mime
View raw message