activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r358349 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/activemq/transport/stomp/ test/java/org/activemq/transport/stomp/ test/resources/
Date Wed, 21 Dec 2005 19:23:25 GMT
Author: chirino
Date: Wed Dec 21 11:23:20 2005
New Revision: 358349

URL: http://svn.apache.org/viewcvs?rev=358349&view=rev
Log:
Got the stomp SUBSCRIBE test case working.

Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/AckListener.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
    incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Ack.java
Wed Dec 21 11:23:20 2005
@@ -3,17 +3,14 @@
  */
 package org.activemq.transport.stomp;
 
-import org.activemq.command.ActiveMQDestination;
-import org.activemq.command.ActiveMQMessage;
-import org.activemq.command.MessageAck;
-import org.activemq.command.TransactionId;
-
 import java.io.DataInput;
 import java.io.IOException;
 import java.net.ProtocolException;
-import java.util.List;
 import java.util.Properties;
 
+import org.activemq.command.MessageAck;
+import org.activemq.command.TransactionId;
+
 class Ack implements StompCommand {
     private final StompWireFormat format;
     private static final HeaderParser parser = new HeaderParser();
@@ -28,41 +25,22 @@
         if (message_id == null)
             throw new ProtocolException("ACK received without a message-id to acknowledge!");
 
-        List listeners = format.getAckListeners();
-        for (int i = 0; i < listeners.size(); i++) {
-            AckListener listener = (AckListener) listeners.get(i);
-            if (listener.handle(message_id)) {
-                listeners.remove(i);
-                ActiveMQMessage msg = listener.getMessage();
-                MessageAck ack = new MessageAck();
-                ack.setDestination((ActiveMQDestination) msg.getJMSDestination());
-                ack.setConsumerId(listener.getConsumerId());
-                ack.setMessageID(msg.getMessageId());
-                ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-
-                /*
-                 * ack.setMessageRead(true);
-                 * ack.setProducerKey(msg.getProducerKey());
-                 * ack.setSequenceNumber(msg.getSequenceNumber());
-                 * ack.setPersistent(msg.getJMSDeliveryMode() ==
-                 * DeliveryMode.PERSISTENT);
-                 * ack.setSessionId(format.getSessionId());
-                 */
-
-                if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
-                    TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
-                    if (tx_id == null)
-                        throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION)
+ " is an invalid transaction id");
-                    ack.setTransactionId(tx_id);
-                }
-
-                while ((in.readByte()) != 0) {
-                }
-                return new CommandEnvelope(ack, headers);
-            }
+        Subscription sub = (Subscription) format.getDispachedMap().get(message_id);
+        if( sub ==null ) 
+            throw new ProtocolException("Unexpected ACK received for message-id [" + message_id
+ "]");
+            
+        MessageAck ack = sub.createMessageAck(message_id);
+        
+        if (headers.containsKey(Stomp.Headers.TRANSACTION)) {
+            TransactionId tx_id = format.getTransactionId(headers.getProperty(Stomp.Headers.TRANSACTION));
+            if (tx_id == null)
+                throw new ProtocolException(headers.getProperty(Stomp.Headers.TRANSACTION)
+ " is an invalid transaction id");
+            ack.setTransactionId(tx_id);
         }
+
         while ((in.readByte()) != 0) {
         }
-        throw new ProtocolException("Unexepected ACK received for message-id [" + message_id
+ "]");
+        
+        return new CommandEnvelope(ack, headers);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Stomp.java
Wed Dec 21 11:23:20 2005
@@ -75,6 +75,7 @@
 
         public interface Unsubscribe {
             String DESTINATION = "destination";
+            String ID = "id";
         }
 
         public interface Connect {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/StompWireFormat.java
Wed Dec 21 11:23:20 2005
@@ -11,6 +11,7 @@
 import java.net.ProtocolException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.jms.JMSException;
 
@@ -19,9 +20,7 @@
 import org.activeio.adapter.PacketInputStream;
 import org.activeio.command.WireFormat;
 import org.activeio.packet.ByteArrayPacket;
-import org.activemq.command.ActiveMQBytesMessage;
 import org.activemq.command.ActiveMQDestination;
-import org.activemq.command.ActiveMQTextMessage;
 import org.activemq.command.Command;
 import org.activemq.command.CommandTypes;
 import org.activemq.command.ConnectionId;
@@ -29,11 +28,14 @@
 import org.activemq.command.ConsumerId;
 import org.activemq.command.FlushCommand;
 import org.activemq.command.LocalTransactionId;
+import org.activemq.command.Message;
+import org.activemq.command.MessageDispatch;
 import org.activemq.command.MessageId;
 import org.activemq.command.ProducerId;
 import org.activemq.command.Response;
 import org.activemq.command.SessionId;
 import org.activemq.command.TransactionId;
+import org.activemq.filter.DestinationMap;
 import org.activemq.util.IOExceptionSupport;
 import org.activemq.util.IdGenerator;
 import org.activemq.util.LongSequenceGenerator;
@@ -53,15 +55,17 @@
     private static int transactionIdCounter;
 
     private int version = 1;
-    private CommandParser commandParser = new CommandParser(this);
-    private HeaderParser headerParser = new HeaderParser();
+    private final CommandParser commandParser = new CommandParser(this);
+    private final HeaderParser headerParser = new HeaderParser();
 
-    private BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
-    private BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
-    private List receiptListeners = new CopyOnWriteArrayList();
-    private Map subscriptions = new ConcurrentHashMap();
-    private List ackListeners = new CopyOnWriteArrayList();
+    private final BlockingQueue pendingReadCommands = new LinkedBlockingQueue();
+    private final BlockingQueue pendingWriteFrames = new LinkedBlockingQueue();
+    private final List receiptListeners = new CopyOnWriteArrayList();
+    private final Map subscriptionsByConsumerId = new ConcurrentHashMap();
+    private final Map subscriptionsByName = new ConcurrentHashMap();
+    private final DestinationMap subscriptionsByDestination = new DestinationMap();
     private final Map transactions = new ConcurrentHashMap();
+    private final Map dispachedMap = new ConcurrentHashMap();
     private short lastCommandId;
 
     private final ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
@@ -119,18 +123,11 @@
                 }
             }
         }
-
-        if (packet.getDataStructureType() == CommandTypes.ACTIVEMQ_TEXT_MESSAGE) {
-            assert (packet instanceof ActiveMQTextMessage);
-            ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
-            Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
-            sub.receive(msg, out);
-        }
-        else if (packet.getDataStructureType() == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
-            assert (packet instanceof ActiveMQBytesMessage);
-            ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet;
-            Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
-            sub.receive(msg, out);
+        if( packet.isMessageDispatch() ) {
+            MessageDispatch md = (MessageDispatch)packet;
+            Message message = md.getMessage();
+            Subscription sub = (Subscription) subscriptionsByConsumerId.get(md.getConsumerId());
+            sub.receive(md, out);
         }
         return null;
     }
@@ -184,17 +181,35 @@
     public ProducerId getProducerId() {
         return producerId;
     }
+
+    
+    public Subscription getSubcription(ConsumerId consumerId) {
+        return (Subscription) subscriptionsByConsumerId.get(consumerId);
+    }
+    public Set getSubcriptions(ActiveMQDestination destination) {
+        return subscriptionsByDestination.get(destination);
+    }
+    public Subscription getSubcription(String name) {
+        return (Subscription) subscriptionsByName.get(name);
+    }
     
     public void addSubscription(Subscription s) {
-        if (subscriptions.containsKey(s.getDestination())) {
-            Subscription old = (Subscription) subscriptions.get(s.getDestination());
-            Command p = old.close();
-            enqueueCommand(p);
-            subscriptions.put(s.getDestination(), s);
-        }
-        else {
-            subscriptions.put(s.getDestination(), s);
-        }
+        if (s.getSubscriptionId()!=null && subscriptionsByName.containsKey(s.getSubscriptionId()))
{
+            Subscription old = (Subscription) subscriptionsByName.get(s.getSubscriptionId());
+            removeSubscription(old);
+            enqueueCommand(old.close());
+        }
+        if( s.getSubscriptionId()!=null )
+            subscriptionsByName.put(s.getSubscriptionId(), s);
+        subscriptionsByConsumerId.put(s.getConsumerInfo().getConsumerId(), s);
+        subscriptionsByDestination.put(s.getConsumerInfo().getDestination(), s);
+    }
+    
+    public void removeSubscription(Subscription s) {
+        if( s.getSubscriptionId()!=null )
+            subscriptionsByName.remove(s.getSubscriptionId());
+        subscriptionsByConsumerId.remove(s.getConsumerInfo().getConsumerId());
+        subscriptionsByDestination.remove(s.getConsumerInfo().getDestination(), s);
     }
 
     public void enqueueCommand(final Command ack) {
@@ -205,18 +220,6 @@
         });
     }
 
-    public Subscription getSubscriptionFor(ActiveMQDestination destination) {
-        return (Subscription) subscriptions.get(destination);
-    }
-
-    public void addAckListener(AckListener listener) {
-        this.ackListeners.add(listener);
-    }
-
-    public List getAckListeners() {
-        return ackListeners;
-    }
-
     public TransactionId getTransactionId(String key) {
         return (TransactionId) transactions.get(key);
     }
@@ -291,6 +294,10 @@
         } catch (JMSException e) {
             throw IOExceptionSupport.create(e);
         }
+    }
+
+    public Map getDispachedMap() {
+        return dispachedMap;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscribe.java
Wed Dec 21 11:23:20 2005
@@ -3,14 +3,14 @@
  */
 package org.activemq.transport.stomp;
 
-import org.activemq.command.ActiveMQDestination;
-import org.activemq.command.ConsumerId;
-import org.activemq.command.ConsumerInfo;
-
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ConsumerInfo;
+import org.activemq.util.IntrospectionSupport;
+
 class Subscribe implements StompCommand {
     private HeaderParser headerParser = new HeaderParser();
     private StompWireFormat format;
@@ -20,19 +20,24 @@
     }
 
     public CommandEnvelope build(String commandLine, DataInput in) throws IOException {
-        ConsumerInfo ci = new ConsumerInfo();
         Properties headers = headerParser.parse(in);
+        
+        String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID);
         String destination = headers.getProperty(Stomp.Headers.Subscribe.DESTINATION);
+        
         ActiveMQDestination actual_dest = DestinationNamer.convert(destination);
+        ConsumerInfo ci = new ConsumerInfo(format.createConsumerId());
+        ci.setPrefetchSize(1000);
+        ci.setDispatchAsync(true);
+
+        IntrospectionSupport.setProperties(ci, headers, "activemq:");
+        
         ci.setDestination(DestinationNamer.convert(destination));
-        ConsumerId consumerId = format.createConsumerId();
-        ci.setConsumerId(consumerId);
-        ci.setResponseRequired(true);
-        // ci.setSessionId(format.getSessionId());
+        
         while (in.readByte() != 0) {
         }
-        String subscriptionId = headers.getProperty(Stomp.Headers.Subscribe.ID, Subscription.NO_ID);
-        Subscription s = new Subscription(format, consumerId, subscriptionId);
+        
+        Subscription s = new Subscription(format, subscriptionId, ci);
         s.setDestination(actual_dest);
         String ack_mode_key = headers.getProperty(Stomp.Headers.Subscribe.ACK_MODE);
         if (ack_mode_key != null && ack_mode_key.equals(Stomp.Headers.Subscribe.AckModeValues.CLIENT))
{

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Subscription.java
Wed Dec 21 11:23:20 2005
@@ -3,80 +3,79 @@
  */
 package org.activemq.transport.stomp;
 
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import javax.jms.JMSException;
+
 import org.activemq.command.ActiveMQBytesMessage;
 import org.activemq.command.ActiveMQDestination;
+import org.activemq.command.ActiveMQMessage;
 import org.activemq.command.ActiveMQTextMessage;
-import org.activemq.command.ConsumerId;
+import org.activemq.command.ConsumerInfo;
 import org.activemq.command.MessageAck;
+import org.activemq.command.MessageDispatch;
 import org.activemq.command.RemoveInfo;
 
-import javax.jms.JMSException;
-
-import java.io.DataOutput;
-import java.io.IOException;
-
 public class Subscription {
+    
     private ActiveMQDestination destination;
     private int ackMode = 1;
     private StompWireFormat format;
-    private final ConsumerId consumerId;
+
     private final String subscriptionId;
     public static final String NO_ID = "~~ NO SUCH THING ~~%%@#!Q";
-
-    public Subscription(StompWireFormat format, ConsumerId consumerId, String subscriptionId)
{
+    private final ConsumerInfo consumerInfo;
+    private final LinkedList dispatchedMessages = new LinkedList();
+    
+    public Subscription(StompWireFormat format, String subscriptionId, ConsumerInfo consumerInfo)
{
         this.format = format;
-        this.consumerId = consumerId;
         this.subscriptionId = subscriptionId;
+        this.consumerInfo = consumerInfo;
     }
 
     void setDestination(ActiveMQDestination actual_dest) {
         this.destination = actual_dest;
     }
 
-    void receive(ActiveMQTextMessage msg, DataOutput out) throws IOException, JMSException
{
+    void receive(MessageDispatch md, DataOutput out) throws IOException, JMSException {
+
+        ActiveMQMessage m = (ActiveMQMessage) md.getMessage();
+
         if (ackMode == CLIENT_ACK) {
-            AckListener listener = new AckListener(msg, consumerId, subscriptionId);
-            format.addAckListener(listener);
+            Subscription sub = format.getSubcription(md.getConsumerId());
+            sub.addMessageDispatch(md);
+            format.getDispachedMap().put(m.getJMSMessageID(), sub);
         }
         else if (ackMode == AUTO_ACK) {
-            MessageAck ack = new MessageAck();
-            // if (format.isInTransaction())
-            // ack.setTransactionIDString(format.getTransactionId());
-            ack.setDestination(msg.getDestination());
-            ack.setConsumerId(consumerId);
-            ack.setMessageID(msg.getMessageId());
-            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+            MessageAck ack = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1);
             format.enqueueCommand(ack);
         }
-        FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE).addHeaders(msg).setBody(msg.getText().getBytes());
-        if (!subscriptionId.equals(NO_ID)) {
+        
+        
+        FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE);
+        builder.addHeaders(m);
+        
+        if( m.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE ) {
+            builder.setBody(((ActiveMQTextMessage)m).getText().getBytes("UTF-8"));
+        } else if( m.getDataStructureType() == ActiveMQBytesMessage.DATA_STRUCTURE_TYPE )
{
+            ActiveMQBytesMessage msg = (ActiveMQBytesMessage)m;
+            byte data[] = new byte[(int) msg.getBodyLength()];
+            msg.readBytes(data);
+            builder.setBody(data);
+        }
+        
+        if (subscriptionId!=null) {
             builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
         }
+        
         out.write(builder.toFrame());
     }
 
-    void receive(ActiveMQBytesMessage msg, DataOutput out) throws IOException, JMSException
{
-        // @todo refactor this and the other receive form to remoce duplication
-        // -bmc
-        if (ackMode == CLIENT_ACK) {
-            AckListener listener = new AckListener(msg, consumerId, subscriptionId);
-            format.addAckListener(listener);
-        }
-        else if (ackMode == AUTO_ACK) {
-            MessageAck ack = new MessageAck();
-            // if (format.isInTransaction())
-            // ack.setTransactionIDString(format.getTransactionId());
-            ack.setDestination(msg.getDestination());
-            ack.setConsumerId(consumerId);
-            ack.setMessageID(msg.getMessageId());
-            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-            format.enqueueCommand(ack);
-        }
-        FrameBuilder builder = new FrameBuilder(Stomp.Responses.MESSAGE).addHeaders(msg).setBody(msg.getContent().getData());
-        if (!subscriptionId.equals(NO_ID)) {
-            builder.addHeader(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
-        }
-        out.write(builder.toFrame());
+    private void addMessageDispatch(MessageDispatch md) {
+        dispatchedMessages.addLast(md);
     }
 
     ActiveMQDestination getDestination() {
@@ -91,8 +90,39 @@
     }
 
     public RemoveInfo close() {
-        RemoveInfo unsub = new RemoveInfo();
-        unsub.setObjectId(consumerId);
-        return unsub;
+        return new RemoveInfo(consumerInfo.getConsumerId());
+    }
+
+    public ConsumerInfo getConsumerInfo() {
+        return consumerInfo;
+    }
+
+    public String getSubscriptionId() {
+        return subscriptionId;
+    }
+
+    public MessageAck createMessageAck(String message_id) {
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        
+        int count=0;
+        for (Iterator iter = dispatchedMessages.iterator(); iter.hasNext();) {
+            
+            MessageDispatch md = (MessageDispatch) iter.next();
+            String id = ((ActiveMQMessage)md.getMessage()).getJMSMessageID();
+            if( ack.getFirstMessageId()==null )
+                ack.setFirstMessageId(md.getMessage().getMessageId());
+
+            format.getDispachedMap().remove(id);
+            iter.remove();
+            count++;
+            if( id.equals(message_id)  ) {
+                ack.setLastMessageId(md.getMessage().getMessageId());
+            }
+        }
+        ack.setMessageCount(count);
+        return ack;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/activemq/transport/stomp/Unsubscribe.java
Wed Dec 21 11:23:20 2005
@@ -7,7 +7,10 @@
 
 import java.io.DataInput;
 import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.Iterator;
 import java.util.Properties;
+import java.util.Set;
 
 public class Unsubscribe implements StompCommand {
     private static final HeaderParser parser = new HeaderParser();
@@ -22,10 +25,25 @@
         while (in.readByte() == 0) {
         }
 
-        String dest_name = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
-        ActiveMQDestination destination = DestinationNamer.convert(dest_name);
+        String subscriptionId = headers.getProperty(Stomp.Headers.Unsubscribe.ID);
+        String destination = headers.getProperty(Stomp.Headers.Unsubscribe.DESTINATION);
+
+
+        if( subscriptionId!=null ) {
+            Subscription s = format.getSubcription(subscriptionId);
+            format.removeSubscription(s);
+            return new CommandEnvelope(s.close(), headers);
+        }
+        
+        ActiveMQDestination d = DestinationNamer.convert(destination);
+        Set subs = format.getSubcriptions(d);
+        for (Iterator iter = subs.iterator(); iter.hasNext();) {
+            Subscription s = (Subscription) iter.next();
+            format.removeSubscription(s);
+            return new CommandEnvelope(s.close(), headers);
+        }
+        
+        throw new ProtocolException("Unexpected UNSUBSCRIBE received.");
 
-        Subscription s = format.getSubscriptionFor(destination);
-        return new CommandEnvelope(s.close(), headers);
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/activemq/transport/stomp/StompTest.java
Wed Dec 21 11:23:20 2005
@@ -8,7 +8,6 @@
 import java.net.URI;
 
 import javax.jms.Connection;
-import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -18,7 +17,6 @@
 import org.activemq.CombinationTestSupport;
 import org.activemq.broker.BrokerService;
 import org.activemq.broker.TransportConnector;
-import org.activemq.command.ActiveMQDestination;
 import org.activemq.command.ActiveMQQueue;
 
 public class StompTest extends CombinationTestSupport {
@@ -115,6 +113,34 @@
         TextMessage message = (TextMessage) consumer.receive(1000);
         assertNotNull(message);
         assertEquals("Hello World", message.getText());
+        
+    }
+    
+    public void testSubscribeWithAutoAck() throws Exception {
+        
+        String frame = 
+            "CONNECT\n" + 
+            "login: brianm\n" + 
+            "passcode: wombats\n\n"+
+            Stomp.NULL;
+        sendFrame(frame);
+     
+        frame = receiveFrame(10000000);
+        assertTrue(frame.startsWith("CONNECTED"));
+        
+        frame = 
+            "SUBSCRIBE\n" + 
+            "destination:/queue/TEST\n" +
+            "ack:auto\n\n" + 
+            Stomp.NULL;
+        sendFrame(frame);
+
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage(getName());
+        producer.send(message);
+        
+        frame = receiveFrame(10000);
+        assertTrue(frame.startsWith("MESSAGE"));
         
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties?rev=358349&r1=358348&r2=358349&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties (original)
+++ incubator/activemq/trunk/activemq-core/src/test/resources/log4j.properties Wed Dec 21
11:23:20 2005
@@ -1,7 +1,7 @@
 #
 # The logging properties used during tests..
 #
-log4j.rootLogger=INFO, out
+log4j.rootLogger=DEBUG, stdout
 
 log4j.logger.org.activemq.spring=WARN
 



Mime
View raw message