Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 62089 invoked from network); 6 Nov 2007 19:45:25 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 6 Nov 2007 19:45:25 -0000 Received: (qmail 83647 invoked by uid 500); 6 Nov 2007 19:45:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 83620 invoked by uid 500); 6 Nov 2007 19:45:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 83610 invoked by uid 99); 6 Nov 2007 19:45:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Nov 2007 11:45:12 -0800 X-ASF-Spam-Status: No, hits=-100.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Nov 2007 19:45:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 917F11A9832; Tue, 6 Nov 2007 11:45:00 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r592531 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp: FrameTranslator.java LegacyFrameTranslator.java ProtocolConverter.java Date: Tue, 06 Nov 2007 19:44:59 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071106194500.917F11A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Tue Nov 6 11:44:58 2007 New Revision: 592531 URL: http://svn.apache.org/viewvc?rev=592531&view=rev Log: Added better temp destination support. We now properly create temp destinations that the client can subscribe to Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=592531&r1=592530&r2=592531&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Tue Nov 6 11:44:58 2007 @@ -33,13 +33,13 @@ * from one to the other */ public interface FrameTranslator { - ActiveMQMessage convertFrame(StompFrame frame) throws JMSException, ProtocolException; + ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame frame) throws JMSException, ProtocolException; - StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException; + StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException; - String convertDestination(Destination d); + String convertDestination(ProtocolConverter converter, Destination d); - ActiveMQDestination convertDestination(String name) throws ProtocolException; + ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException; /** * Helper class which holds commonly needed functions used when implementing @@ -50,9 +50,9 @@ private Helper() { } - public static void copyStandardHeadersFromMessageToFrame(ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { + public static void copyStandardHeadersFromMessageToFrame(ProtocolConverter converter, ActiveMQMessage message, StompFrame command, FrameTranslator ft) throws IOException { final Map headers = command.getHeaders(); - headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(message.getDestination())); + headers.put(Stomp.Headers.Message.DESTINATION, ft.convertDestination(converter, message.getDestination())); headers.put(Stomp.Headers.Message.MESSAGE_ID, message.getJMSMessageID()); if (message.getJMSCorrelationID() != null) { @@ -66,7 +66,7 @@ headers.put(Stomp.Headers.Message.PRORITY, "" + message.getJMSPriority()); if (message.getJMSReplyTo() != null) { - headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(message.getJMSReplyTo())); + headers.put(Stomp.Headers.Message.REPLY_TO, ft.convertDestination(converter, message.getJMSReplyTo())); } headers.put(Stomp.Headers.Message.TIMESTAMP, "" + message.getJMSTimestamp()); @@ -83,10 +83,10 @@ } } - public static void copyStandardHeadersFromFrameToMessage(StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException { + public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException { final Map headers = new HashMap(command.getHeaders()); final String destination = headers.remove(Stomp.Headers.Send.DESTINATION); - msg.setDestination(ft.convertDestination(destination)); + msg.setDestination(ft.convertDestination(converter, destination)); // the standard JMS headers msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID)); @@ -108,7 +108,7 @@ o = headers.remove(Stomp.Headers.Send.REPLY_TO); if (o != null) { - msg.setJMSReplyTo(ft.convertDestination((String)o)); + msg.setJMSReplyTo(ft.convertDestination(converter, (String)o)); } o = headers.remove(Stomp.Headers.Send.PERSISTENT); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=592531&r1=592530&r2=592531&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Tue Nov 6 11:44:58 2007 @@ -32,7 +32,9 @@ * Implements ActiveMQ 4.0 translations */ public class LegacyFrameTranslator implements FrameTranslator { - public ActiveMQMessage convertFrame(StompFrame command) throws JMSException, ProtocolException { + + + public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { final Map headers = command.getHeaders(); final ActiveMQMessage msg; if (headers.containsKey(Stomp.Headers.CONTENT_LENGTH)) { @@ -49,17 +51,17 @@ } msg = text; } - FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(command, msg, this); + FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage(converter, command, msg, this); return msg; } - public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { + public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { StompFrame command = new StompFrame(); command.setAction(Stomp.Responses.MESSAGE); Map headers = new HashMap(25); command.setHeaders(headers); - FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(message, command, this); + FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame(converter, message, command, this); if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { @@ -79,23 +81,28 @@ return command; } - public String convertDestination(Destination d) { + public String convertDestination(ProtocolConverter converter, Destination d) { if (d == null) { return null; } ActiveMQDestination activeMQDestination = (ActiveMQDestination)d; String physicalName = activeMQDestination.getPhysicalName(); + String rc = converter.getCreatedTempDestinationName(activeMQDestination); + if( rc!=null ) { + return rc; + } + StringBuffer buffer = new StringBuffer(); if (activeMQDestination.isQueue()) { if (activeMQDestination.isTemporary()) { - buffer.append("/temp-queue/"); + buffer.append("/remote-temp-queue/"); } else { buffer.append("/queue/"); } } else { if (activeMQDestination.isTemporary()) { - buffer.append("/temp-topic/"); + buffer.append("/remote-temp-topic/"); } else { buffer.append("/topic/"); } @@ -104,7 +111,7 @@ return buffer.toString(); } - public ActiveMQDestination convertDestination(String name) throws ProtocolException { + public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException { if (name == null) { return null; } else if (name.startsWith("/queue/")) { @@ -113,12 +120,16 @@ } else if (name.startsWith("/topic/")) { String tName = name.substring("/topic/".length(), name.length()); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE); - } else if (name.startsWith("/temp-queue/")) { - String tName = name.substring("/temp-queue/".length(), name.length()); + } else if (name.startsWith("/remote-temp-queue/")) { + String tName = name.substring("/remote-temp-queue/".length(), name.length()); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE); - } else if (name.startsWith("/temp-topic/")) { - String tName = name.substring("/temp-topic/".length(), name.length()); + } else if (name.startsWith("/remote-temp-topic/")) { + String tName = name.substring("/remote-temp-topic/".length(), name.length()); return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE); + } else if (name.startsWith("/temp-queue/")) { + return converter.createTempQueue(name); + } else if (name.startsWith("/temp-topic/")) { + return converter.createTempTopic(name); } else { throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations " + "must begine with one of: /queue/ /topic/ /temp-queue/ /temp-topic/"); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=592531&r1=592530&r2=592531&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Tue Nov 6 11:44:58 2007 @@ -25,15 +25,19 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Destination; import javax.jms.JMSException; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTempTopic; import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; @@ -65,9 +69,12 @@ private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator transactionIdGenerator = new LongSequenceGenerator(); + private final LongSequenceGenerator tempDestinationGenerator = new LongSequenceGenerator(); private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + private final ConcurrentHashMap tempDestinations = new ConcurrentHashMap(); + private final ConcurrentHashMap tempDestinationAmqToStompMap = new ConcurrentHashMap(); private final Map transactions = new ConcurrentHashMap(); private final StompTransportFilter transportFilter; @@ -325,7 +332,7 @@ String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); - ActiveMQDestination actualDest = frameTranslator.convertDestination(destination); + ActiveMQDestination actualDest = frameTranslator.convertDestination(this, destination); ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); consumerInfo.setPrefetchSize(1000); @@ -336,7 +343,7 @@ IntrospectionSupport.setProperties(consumerInfo, headers, "activemq."); - consumerInfo.setDestination(frameTranslator.convertDestination(destination)); + consumerInfo.setDestination(frameTranslator.convertDestination(this, destination)); StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo); stompSubscription.setDestination(actualDest); @@ -360,7 +367,7 @@ ActiveMQDestination destination = null; Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION); if (o != null) { - destination = frameTranslator.convertDestination((String)o); + destination = frameTranslator.convertDestination(this, (String)o); } String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); @@ -489,15 +496,40 @@ } public ActiveMQMessage convertMessage(StompFrame command) throws IOException, JMSException { - ActiveMQMessage msg = frameTranslator.convertFrame(command); + ActiveMQMessage msg = frameTranslator.convertFrame(this, command); return msg; } public StompFrame convertMessage(ActiveMQMessage message) throws IOException, JMSException { - return frameTranslator.convertMessage(message); + return frameTranslator.convertMessage(this, message); } public StompTransportFilter getTransportFilter() { return transportFilter; } + + public ActiveMQDestination createTempQueue(String name) { + ActiveMQDestination rc = tempDestinations.get(name); + if( rc == null ) { + rc = new ActiveMQTempQueue(connectionId, tempDestinationGenerator.getNextSequenceId()); + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + tempDestinations.put(name, rc); + } + return rc; + } + + public ActiveMQDestination createTempTopic(String name) { + ActiveMQDestination rc = tempDestinations.get(name); + if( rc == null ) { + rc = new ActiveMQTempTopic(connectionId, tempDestinationGenerator.getNextSequenceId()); + sendToActiveMQ(new DestinationInfo(connectionId, DestinationInfo.ADD_OPERATION_TYPE, rc), null); + tempDestinations.put(name, rc); + tempDestinationAmqToStompMap.put(rc.getQualifiedName(), name); + } + return rc; + } + + public String getCreatedTempDestinationName(ActiveMQDestination destination) { + return tempDestinationAmqToStompMap.get(destination.getQualifiedName()); + } }