activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r468414 [4/7] - in /incubator/activemq/sandbox/qpid/src/main/java/org/apache: activemq/amqp/ activemq/amqp/broker/ activemq/amqp/command/ activemq/amqp/transport/ activemq/amqp/wireformat/ activemq/amqp/wireformat/v8_0/ activemq/qpid/broker...
Date Fri, 27 Oct 2006 15:27:30 GMT
Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxRollbackOkBodyMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxRollbackOkBodyMarshaller.java?view=diff&rev=468414&r1=468132&r2=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxRollbackOkBodyMarshaller.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxRollbackOkBodyMarshaller.java Fri Oct 27 08:27:20 2006
@@ -16,15 +16,15 @@
  *
  */
     
-package org.apache.activemq.qpid.wireformat.v8_0;
+package org.apache.activemq.amqp.wireformat.v8_0;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.amqp.command.*;
+import org.apache.activemq.amqp.wireformat.*;
 import org.apache.activemq.qpid.*;
-import org.apache.activemq.qpid.command.*;
-import org.apache.activemq.qpid.wireformat.*;
 
 /**
  * This class is autogenerated, do not modify. [From AMQ protocol 0.80 (major=8, minor=0)]

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectBodyMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectBodyMarshaller.java?view=diff&rev=468414&r1=468132&r2=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectBodyMarshaller.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectBodyMarshaller.java Fri Oct 27 08:27:20 2006
@@ -16,15 +16,15 @@
  *
  */
     
-package org.apache.activemq.qpid.wireformat.v8_0;
+package org.apache.activemq.amqp.wireformat.v8_0;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.amqp.command.*;
+import org.apache.activemq.amqp.wireformat.*;
 import org.apache.activemq.qpid.*;
-import org.apache.activemq.qpid.command.*;
-import org.apache.activemq.qpid.wireformat.*;
 
 /**
  * This class is autogenerated, do not modify. [From AMQ protocol 0.80 (major=8, minor=0)]

Modified: incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectOkBodyMarshaller.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectOkBodyMarshaller.java?view=diff&rev=468414&r1=468132&r2=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectOkBodyMarshaller.java (original)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/activemq/amqp/wireformat/v8_0/TxSelectOkBodyMarshaller.java Fri Oct 27 08:27:20 2006
@@ -16,15 +16,15 @@
  *
  */
     
-package org.apache.activemq.qpid.wireformat.v8_0;
+package org.apache.activemq.amqp.wireformat.v8_0;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
+import org.apache.activemq.amqp.command.*;
+import org.apache.activemq.amqp.wireformat.*;
 import org.apache.activemq.qpid.*;
-import org.apache.activemq.qpid.command.*;
-import org.apache.activemq.qpid.wireformat.*;
 
 /**
  * This class is autogenerated, do not modify. [From AMQ protocol 0.80 (major=8, minor=0)]

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/AMQException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/AMQException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/AMQException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/AMQException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Generic AMQ exception.
+ */
+public class AMQException extends Exception
+{
+    private int _errorCode;
+
+    public AMQException(String message)
+    {
+        super(message);
+    }
+
+    public AMQException(String msg, Throwable t)
+    {
+        super(msg, t);
+    }
+
+    public AMQException(int errorCode, String msg, Throwable t)
+    {
+        super(msg + " [error code " + errorCode + ']', t);
+        _errorCode = errorCode;
+    }
+
+    public AMQException(int errorCode, String msg)
+    {
+        super(msg + " [error code " + errorCode + ']');
+        _errorCode = errorCode;
+    }
+
+    public AMQException(Logger logger, String msg, Throwable t)
+    {
+        this(msg, t);
+        logger.error(getMessage(), this);
+    }
+
+    public AMQException(Logger logger, String msg)
+    {
+        this(msg);
+        logger.error(getMessage(), this);
+    }
+
+    public AMQException(Logger logger, int errorCode, String msg)
+    {
+        this(errorCode, msg);
+        logger.error(getMessage(), this);
+    }
+
+    public int getErrorCode()
+    {
+        return _errorCode;
+    }
+ 
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/exchange/ExchangeDefaults.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,33 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.exchange;
+
+public class ExchangeDefaults
+{
+    public final static String TOPIC_EXCHANGE_NAME = "amq.topic";
+
+    public final static String TOPIC_EXCHANGE_CLASS = "topic";
+
+    public final static String DIRECT_EXCHANGE_NAME = "amq.direct";
+
+    public final static String DIRECT_EXCHANGE_CLASS = "direct";
+
+    public final static String HEADERS_EXCHANGE_NAME = "amq.match";
+
+    public final static String HEADERS_EXCHANGE_CLASS = "headers";
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/protocol/AMQConstant.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/protocol/AMQConstant.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,108 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public final class AMQConstant
+{
+    private int _code;
+
+    private String _name;
+
+    private static Map _codeMap = new HashMap();
+
+    private AMQConstant(int code, String name, boolean map)
+    {
+        _code = code;
+        _name = name;
+        if (map)
+        {
+            _codeMap.put(new Integer(code), this);
+        }
+    }
+
+    public String toString()
+    {
+        return _code + ": " + _name;
+    }
+
+    public int getCode()
+    {
+        return _code;
+    }
+
+    public String getName()
+    {
+        return _name;
+    }
+    
+    public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true);
+
+    public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true);
+
+    public static final AMQConstant REPLY_SUCCESS = new AMQConstant(200, "reply success", true);
+
+    public static final AMQConstant NOT_DELIVERED = new AMQConstant(310, "not delivered", true);
+
+    public static final AMQConstant MESSAGE_TOO_LARGE = new AMQConstant(311, "message too large", true);
+
+    public static final AMQConstant NO_ROUTE = new AMQConstant(312, "no route", true);
+    
+    public static final AMQConstant NO_CONSUMERS = new AMQConstant(313, "no consumers", true);
+
+    public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
+
+    public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true);
+
+    public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
+
+    public static final AMQConstant ACCESS_REFUSED = new AMQConstant(403, "access refused", true);
+
+    public static final AMQConstant NOT_FOUND = new AMQConstant(404, "not found", true);
+
+    public static final AMQConstant FRAME_ERROR = new AMQConstant(501, "frame error", true);
+
+    public static final AMQConstant UNKNOWN_EXCHANGE = new AMQConstant(500, "unknown exchange", true);
+
+    public static final AMQConstant SYNTAX_ERROR = new AMQConstant(502, "syntax error", true);
+
+    public static final AMQConstant COMMAND_INVALID = new AMQConstant(503, "command invalid", true);
+
+    public static final AMQConstant CHANNEL_ERROR = new AMQConstant(504, "channel error", true);
+
+    public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
+
+    public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+
+    public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
+
+    public static final AMQConstant INTERNAL_ERROR = new AMQConstant(541, "internal error", true);
+
+    public static AMQConstant getConstant(int code)
+    {
+        AMQConstant c = (AMQConstant) _codeMap.get(new Integer(code));
+        if (c == null)
+        {
+            c = new AMQConstant(code, "unknown code", false);
+        }
+        return c;
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/AMQChannel.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/AMQChannel.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,798 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
+import org.apache.qpid.server.exchange.MessageRouter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.TxnBuffer;
+import org.apache.qpid.server.txn.TxnOp;
+
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class AMQChannel
+{
+    public static final int DEFAULT_PREFETCH = 5000;
+
+    private static final Logger _log = Logger.getLogger(AMQChannel.class);
+
+    private final int _channelId;
+
+    private boolean _transactional;
+
+    private long _prefetch_HighWaterMark;
+
+    private long _prefetch_LowWaterMark;
+
+    /**
+     * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that
+     * value of this represents the <b>last</b> tag sent out
+     */
+    private AtomicLong _deliveryTag = new AtomicLong(0);
+
+    /**
+     * A channel has a default queue (the last declared) that is used when no queue name is
+     * explictily set
+     */
+    private AMQQueue _defaultQueue;
+
+    /**
+     * This tag is unique per subscription to a queue. The server returns this in response to a
+     * basic.consume request.
+     */
+    private int _consumerTag;
+
+    /**
+     * The current message - which may be partial in the sense that not all frames have been received yet -
+     * which has been received by this channel. As the frames are received the message gets updated and once all
+     * frames have been received the message can then be routed.
+     */
+    private AMQMessage _currentMessage;
+
+    /**
+     * Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue.
+     */
+    private final Map<String, AMQQueue> _consumerTag2QueueMap = new TreeMap<String, AMQQueue>();
+
+    private final MessageStore _messageStore;
+
+    private final Object _unacknowledgedMessageMapLock = new Object();
+
+    private Map<Long, UnacknowledgedMessage> _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+
+    private long _lastDeliveryTag;
+
+    private final AtomicBoolean _suspended = new AtomicBoolean(false);
+
+    private final MessageRouter _exchanges;
+
+    private final TxnBuffer _txnBuffer;
+
+    private TxAck ackOp;
+
+    private final List<AMQDataBlock> _returns = new LinkedList<AMQDataBlock>();
+
+    public AMQChannel(int channelId, MessageStore messageStore, MessageRouter exchanges)
+            throws AMQException
+    {
+        _channelId = channelId;
+        _prefetch_HighWaterMark = DEFAULT_PREFETCH;
+        _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2;
+        _messageStore = messageStore;
+        _exchanges = exchanges;
+        _txnBuffer = new TxnBuffer(_messageStore);
+    }
+
+    public int getChannelId()
+    {
+        return _channelId;
+    }
+
+    public boolean isTransactional()
+    {
+        return _transactional;
+    }
+
+    public void setTransactional(boolean transactional)
+    {
+        _transactional = transactional;
+    }
+
+    public long getPrefetchCount()
+    {
+        return _prefetch_HighWaterMark;
+    }
+
+    public void setPrefetchCount(long prefetchCount)
+    {
+        _prefetch_HighWaterMark = prefetchCount;
+    }
+
+    public long getPrefetchLowMarkCount()
+    {
+        return _prefetch_LowWaterMark;
+    }
+
+    public void setPrefetchLowMarkCount(long prefetchCount)
+    {
+        _prefetch_LowWaterMark = prefetchCount;
+    }
+
+    public long getPrefetchHighMarkCount()
+    {
+        return _prefetch_HighWaterMark;
+    }
+
+    public void setPrefetchHighMarkCount(long prefetchCount)
+    {
+        _prefetch_HighWaterMark = prefetchCount;
+    }
+
+
+    public void setPublishFrame(BasicPublishBody publishBody, AMQProtocolSession publisher) throws AMQException
+    {
+        _currentMessage = new AMQMessage(_messageStore, publishBody);
+        _currentMessage.setPublisher(publisher);
+    }
+
+    public void publishContentHeader(ContentHeaderBody contentHeaderBody)
+            throws AMQException
+    {
+        if (_currentMessage == null)
+        {
+            throw new AMQException("Received content header without previously receiving a BasicDeliver frame");
+        }
+        else
+        {
+            _currentMessage.setContentHeaderBody(contentHeaderBody);
+            // check and route if header says body length is zero
+            if (contentHeaderBody.bodySize == 0)
+            {
+                routeCurrentMessage();
+            }
+        }
+    }
+
+    public void publishContentBody(ContentBody contentBody)
+            throws AMQException
+    {
+        if (_currentMessage == null)
+        {
+            throw new AMQException("Received content body without previously receiving a JmsPublishBody");
+        }
+        if (_currentMessage.getContentHeaderBody() == null)
+        {
+            throw new AMQException("Received content body without previously receiving a content header");
+        }
+
+        _currentMessage.addContentBodyFrame(contentBody);
+        if (_currentMessage.isAllContentReceived())
+        {
+            routeCurrentMessage();
+        }
+    }
+
+    protected void routeCurrentMessage() throws AMQException
+    {
+        if (_transactional)
+        {
+            //don't create a transaction unless needed
+            if (_currentMessage.isPersistent())
+            {
+                _txnBuffer.containsPersistentChanges();
+            }
+
+            //A publication will result in the enlisting of several
+            //TxnOps. The first is an op that will store the message.
+            //Following that (and ordering is important), an op will
+            //be added for every queue onto which the message is
+            //enqueued. Finally a cleanup op will be added to decrement
+            //the reference associated with the routing.
+            Store storeOp = new Store(_currentMessage);
+            _txnBuffer.enlist(storeOp);
+            _currentMessage.setTxnBuffer(_txnBuffer);
+            try
+            {
+                _exchanges.routeContent(_currentMessage);
+                _txnBuffer.enlist(new Cleanup(_currentMessage));
+            }
+            catch (RequiredDeliveryException e)
+            {
+                //Can only be due to the mandatory flag, as no attempt
+                //has yet been made to deliver the message. The
+                //message will thus not have been delivered to any
+                //queue so we can return the message (without killing
+                //the transaction) and for efficiency remove the store
+                //operation from the buffer.
+                _txnBuffer.cancel(storeOp);
+                throw e;
+            }
+            finally
+            {
+                _currentMessage = null;
+            }
+        }
+        else
+        {
+            try
+            {
+                _exchanges.routeContent(_currentMessage);
+                //following check implements the functionality
+                //required by the 'immediate' flag:
+                _currentMessage.checkDeliveredToConsumer();
+            }
+            finally
+            {
+                _currentMessage.decrementReference();
+                _currentMessage = null;
+            }
+        }
+    }
+
+    public long getNextDeliveryTag()
+    {
+        return _deliveryTag.incrementAndGet();
+    }
+
+    public int getNextConsumerTag()
+    {
+        return ++_consumerTag;
+    }
+
+    /**
+     * Subscribe to a queue. We register all subscriptions in the channel so that
+     * if the channel is closed we can clean up all subscriptions, even if the
+     * client does not explicitly unsubscribe from all queues.
+     *
+     * @param tag     the tag chosen by the client (if null, server will generate one)
+     * @param queue   the queue to subscribe to
+     * @param session the protocol session of the subscriber
+     * @return the consumer tag. This is returned to the subscriber and used in
+     *         subsequent unsubscribe requests
+     * @throws ConsumerTagNotUniqueException if the tag is not unique
+     * @throws AMQException                  if something goes wrong
+     */
+    public String subscribeToQueue(String tag, AMQQueue queue, AMQProtocolSession session, boolean acks) throws AMQException, ConsumerTagNotUniqueException
+    {
+        if (tag == null)
+        {
+            tag = "sgen_" + getNextConsumerTag();
+        }
+        if (_consumerTag2QueueMap.containsKey(tag))
+        {
+            throw new ConsumerTagNotUniqueException();
+        }
+
+        queue.registerProtocolSession(session, _channelId, tag, acks);
+        _consumerTag2QueueMap.put(tag, queue);
+        return tag;
+    }
+
+
+    public void unsubscribeConsumer(AMQProtocolSession session, String consumerTag) throws AMQException
+    {
+        AMQQueue q = _consumerTag2QueueMap.remove(consumerTag);
+        if (q != null)
+        {
+            q.unregisterProtocolSession(session, _channelId, consumerTag);
+        }
+        else
+        {
+            throw new AMQException(_log, "Consumer tag " + consumerTag + " not known to channel " +
+                                         _channelId);
+        }
+    }
+
+    /**
+     * Called from the protocol session to close this channel and clean up.
+     *
+     * @throws AMQException if there is an error during closure
+     */
+    public void close(AMQProtocolSession session) throws AMQException
+    {
+        if (_transactional)
+        {
+            synchronized(_txnBuffer)
+            {
+                _txnBuffer.rollback();//releases messages
+            }
+        }
+        unsubscribeAllConsumers(session);
+        requeue();
+    }
+
+    private void unsubscribeAllConsumers(AMQProtocolSession session) throws AMQException
+    {
+        _log.info("Unsubscribing all consumers on channel " + toString());
+        for (Map.Entry<String, AMQQueue> me : _consumerTag2QueueMap.entrySet())
+        {
+            me.getValue().unregisterProtocolSession(session, _channelId, me.getKey());
+        }
+        _consumerTag2QueueMap.clear();
+    }
+
+    /**
+     * Add a message to the channel-based list of unacknowledged messages
+     *
+     * @param message
+     * @param deliveryTag
+     * @param queue
+     */
+    public void addUnacknowledgedMessage(AMQMessage message, long deliveryTag, String consumerTag, AMQQueue queue)
+    {
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            _unacknowledgedMessageMap.put(deliveryTag, new UnacknowledgedMessage(queue, message, consumerTag, deliveryTag));
+            _lastDeliveryTag = deliveryTag;
+            checkSuspension();
+        }
+    }
+
+    /**
+     * Called to attempt re-enqueue all outstanding unacknowledged messages on the channel.
+     * May result in delivery to this same channel or to other subscribers.
+     */
+    public void requeue() throws AMQException
+    {
+        // we must create a new map since all the messages will get a new delivery tag when they are redelivered
+        Map<Long, UnacknowledgedMessage> currentList;
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            currentList = _unacknowledgedMessageMap;
+            _unacknowledgedMessageMap = new LinkedHashMap<Long, UnacknowledgedMessage>(DEFAULT_PREFETCH);
+        }
+
+        for (UnacknowledgedMessage unacked : currentList.values())
+        {
+            if (unacked.queue != null)
+            {
+                unacked.queue.deliver(unacked.message);
+            }
+        }
+    }
+
+    /**
+     * Called to resend all outstanding unacknowledged messages to this same channel.
+     */
+    public void resend(AMQProtocolSession session)
+    {
+        //messages go to this channel
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            for (Map.Entry<Long, UnacknowledgedMessage> entry : _unacknowledgedMessageMap.entrySet())
+            {
+                long deliveryTag = entry.getKey();
+                String consumerTag = entry.getValue().consumerTag;
+                AMQMessage msg = entry.getValue().message;
+
+                session.writeFrame(msg.getDataBlock(_channelId, consumerTag, deliveryTag));
+            }
+        }
+    }
+
+    /**
+     * Callback indicating that a queue has been deleted. We must update the structure of unacknowledged
+     * messages to remove the queue reference and also decrement any message reference counts, without
+     * actually removing the item sine we may get an ack for a delivery tag that was generated from the
+     * deleted queue.
+     *
+     * @param queue
+     */
+    public void queueDeleted(AMQQueue queue)
+    {
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            for (Map.Entry<Long, UnacknowledgedMessage> unacked : _unacknowledgedMessageMap.entrySet())
+            {
+                final UnacknowledgedMessage unackedMsg = unacked.getValue();
+                // we can compare the reference safely in this case
+                if (unackedMsg.queue == queue)
+                {
+                    unackedMsg.queue = null;
+                    try
+                    {
+                        unackedMsg.message.decrementReference();
+                    }
+                    catch (AMQException e)
+                    {
+                        _log.error("Error decrementing ref count on message " + unackedMsg.message.getMessageId() + ": " +
+                                   e, e);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Acknowledge one or more messages.
+     *
+     * @param deliveryTag the last delivery tag
+     * @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
+     *                    acknowledges the single message specified by the delivery tag
+     * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
+     */
+    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+    {
+        if (_transactional)
+        {
+            //check that the tag exists to give early failure
+            if (!multiple || deliveryTag > 0)
+            {
+                checkAck(deliveryTag);
+            }
+            //we use a single txn op for all acks and update this op
+            //as new acks come in. If this is the first ack in the txn
+            //we will need to create and enlist the op.
+            if (ackOp == null)
+            {
+                ackOp = new TxAck(new AckMap());
+                _txnBuffer.enlist(ackOp);
+            }
+            //update the op to include this ack request
+            if (multiple && deliveryTag == 0)
+            {
+                synchronized(_unacknowledgedMessageMapLock)
+                {
+                    //if have signalled to ack all, that refers only
+                    //to all at this time
+                    ackOp.update(_lastDeliveryTag, multiple);
+                }
+            }
+            else
+            {
+                ackOp.update(deliveryTag, multiple);
+            }
+        }
+        else
+        {
+            handleAcknowledgement(deliveryTag, multiple);
+        }
+    }
+
+    private void checkAck(long deliveryTag) throws AMQException
+    {
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+            {
+                throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+            }
+        }
+    }
+
+    private void handleAcknowledgement(long deliveryTag, boolean multiple) throws AMQException
+    {
+        if (multiple)
+        {
+            LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+            synchronized(_unacknowledgedMessageMapLock)
+            {
+                if (deliveryTag == 0)
+                {
+                    //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero, tells the server to acknowledge all outstanding mesages.
+                    _log.trace("Multiple ack on delivery tag 0. ACKing all messages. Current count:" + _unacknowledgedMessageMap.size());
+                    acked = new LinkedList<UnacknowledgedMessage>(_unacknowledgedMessageMap.values());
+                    _unacknowledgedMessageMap.clear();
+                }
+                else
+                {
+                    if (!_unacknowledgedMessageMap.containsKey(deliveryTag))
+                    {
+                        throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+                    }
+                    Iterator<Map.Entry<Long, UnacknowledgedMessage>> i = _unacknowledgedMessageMap.entrySet().iterator();
+
+                    while (i.hasNext())
+                    {
+
+                        Map.Entry<Long, UnacknowledgedMessage> unacked = i.next();
+
+                        if (unacked.getKey() > deliveryTag)
+                        {
+                            //This should not occur now.
+                            throw new AMQException("UnacknowledgedMessageMap is out of order:" + unacked.getKey() + " When deliveryTag is:" + deliveryTag + "ES:" + _unacknowledgedMessageMap.entrySet().toString());
+                        }
+
+                        i.remove();
+
+                        acked.add(unacked.getValue());
+                        if (unacked.getKey() == deliveryTag)
+                        {
+                            break;
+                        }
+                    }
+                }
+            }// synchronized
+
+            if (_log.isTraceEnabled())
+            {
+                _log.trace("Received multiple ack for delivery tag " + deliveryTag + ". Removing " +
+                           acked.size() + " items.");
+            }
+
+            for (UnacknowledgedMessage msg : acked)
+            {
+                msg.discard();
+            }
+
+        }
+        else
+        {
+            UnacknowledgedMessage msg;
+            synchronized(_unacknowledgedMessageMapLock)
+            {
+                msg = _unacknowledgedMessageMap.remove(deliveryTag);
+            }
+
+            if (msg == null)
+            {
+                _log.trace("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
+                throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" + _channelId);
+            }
+            msg.discard();
+            if (_log.isTraceEnabled())
+            {
+                _log.trace("Received non-multiple ack for messaging with delivery tag " + deliveryTag);
+            }
+        }
+
+        checkSuspension();
+    }
+
+    /**
+     * Used only for testing purposes.
+     *
+     * @return the map of unacknowledged messages
+     */
+    public Map<Long, UnacknowledgedMessage> getUnacknowledgedMessageMap()
+    {
+        return _unacknowledgedMessageMap;
+    }
+
+    private void checkSuspension()
+    {
+        boolean suspend;
+        //noinspection SynchronizeOnNonFinalField
+        synchronized(_unacknowledgedMessageMapLock)
+        {
+            suspend = _unacknowledgedMessageMap.size() >= _prefetch_HighWaterMark;
+        }
+        setSuspended(suspend);
+    }
+
+    public void setSuspended(boolean suspended)
+    {
+        boolean isSuspended = _suspended.get();
+
+        if (isSuspended && !suspended)
+        {
+            synchronized(_unacknowledgedMessageMapLock)
+            {
+                // Continue being suspended if we are above the _prefetch_LowWaterMark
+                suspended = _unacknowledgedMessageMap.size() > _prefetch_LowWaterMark;
+            }
+        }
+
+        boolean wasSuspended = _suspended.getAndSet(suspended);
+        if (wasSuspended != suspended)
+        {
+            if (wasSuspended)
+            {
+                _log.debug("Unsuspending channel " + this);
+                //may need to deliver queued messages
+                for (AMQQueue q : _consumerTag2QueueMap.values())
+                {
+                    q.deliverAsync();
+                }
+            }
+            else
+            {
+                _log.debug("Suspending channel " + this);
+            }
+        }
+    }
+
+    public boolean isSuspended()
+    {
+        return _suspended.get();
+    }
+
+    public void commit() throws AMQException
+    {
+        if (ackOp != null)
+        {
+            ackOp.consolidate();
+            if (ackOp.checkPersistent())
+            {
+                _txnBuffer.containsPersistentChanges();
+            }
+            ackOp = null;//already enlisted, after commit will reset regardless of outcome
+        }
+
+        _txnBuffer.commit();
+        //TODO: may need to return 'immediate' messages at this point
+    }
+
+    public void rollback() throws AMQException
+    {
+        //need to protect rollback and close from each other...
+        synchronized(_txnBuffer)
+        {
+            _txnBuffer.rollback();
+        }
+    }
+
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder(30);
+        sb.append("Channel: id ").append(_channelId).append(", transaction mode: ").append(_transactional);
+        sb.append(", prefetch marks: ").append(_prefetch_LowWaterMark);
+        sb.append("/").append(_prefetch_HighWaterMark);
+        return sb.toString();
+    }
+
+    public ObjectName getObjectName()
+            throws MalformedObjectNameException
+    {
+        StringBuilder sb = new StringBuilder(30);
+        sb.append("Channel:id=").append(_channelId);
+        sb.append(",transaction mode=").append(_transactional);
+        return new ObjectName(sb.toString());
+    }
+
+    public void setDefaultQueue(AMQQueue queue)
+    {
+        _defaultQueue = queue;
+    }
+
+    public AMQQueue getDefaultQueue()
+    {
+        return _defaultQueue;
+    }
+
+    public void processReturns(AMQProtocolSession session)
+    {
+        for (AMQDataBlock block : _returns)
+        {
+            session.writeFrame(block);
+        }
+        _returns.clear();
+    }
+
+    //we use this wrapper to ensure we are always using the correct
+    //map instance (its not final unfortunately)
+    private class AckMap implements UnacknowledgedMessageMap
+    {
+        public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+        {
+            impl().collect(deliveryTag, multiple, msgs);
+        }
+
+        public void remove(List<UnacknowledgedMessage> msgs)
+        {
+            impl().remove(msgs);
+        }
+
+        private UnacknowledgedMessageMap impl()
+        {
+            return new UnacknowledgedMessageMapImpl(_unacknowledgedMessageMapLock, _unacknowledgedMessageMap);
+        }
+    }
+
+    private class Store implements TxnOp
+    {
+        //just use this to do a store of the message during the
+        //prepare phase. Any enqueueing etc is done by TxnOps enlisted
+        //by the queues themselves.
+        private final AMQMessage _msg;
+
+        Store(AMQMessage msg)
+        {
+            _msg = msg;
+        }
+
+        public void prepare() throws AMQException
+        {
+            _msg.storeMessage();
+            //the routers reference can now be released
+            _msg.decrementReference();
+        }
+
+        public void undoPrepare()
+        {
+        }
+
+        public void commit()
+        {
+        }
+
+        public void rollback()
+        {
+        }
+    }
+
+    private class Cleanup implements TxnOp
+    {
+        private final AMQMessage _msg;
+
+        Cleanup(AMQMessage msg)
+        {
+            _msg = msg;
+        }
+
+        public void prepare() throws AMQException
+        {
+        }
+
+        public void undoPrepare()
+        {
+            //don't need to do anything here, if the store's txn failed
+            //when processing prepare then the message was not stored
+            //or enqueued on any queues and can be discarded
+        }
+
+        public void commit()
+        {
+            //The routers reference can now be released.  This is done
+            //here to ensure that it happens after the queues that
+            //enqueue it have incremented their counts (which as a
+            //memory only operation is done in the commit phase).
+            try
+            {
+                _msg.decrementReference();
+            }
+            catch (AMQException e)
+            {
+                _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+            }
+            try
+            {
+                _msg.checkDeliveredToConsumer();
+            }
+            catch (NoConsumersException e)
+            {
+                //TODO: store this for delivery after the commit-ok
+                _returns.add(e.getReturnMessage(_channelId));
+            }
+        }
+
+        public void rollback()
+        {
+        }
+    }
+
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,22 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+public class ConsumerTagNotUniqueException  extends Exception
+{
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ManagedChannel.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ManagedChannel.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ManagedChannel.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ManagedChannel.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,64 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.server;
+
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The managed interface exposed to allow management of channels.
+ * @author   Bhupendra Bhardwaj
+ * @version  0.1
+ */
+public interface ManagedChannel
+{
+    static final String TYPE = "Channel";
+
+    /**
+     * Tells whether the channel is transactional.
+     * @return true if the channel is transactional.
+     * @throws IOException
+     */
+    boolean isTransactional() throws IOException;
+
+    /**
+     * Tells the number of unacknowledged messages in this channel.
+     * @return number of unacknowledged messages.
+     * @throws IOException
+     */
+    int getUnacknowledgedMessageCount() throws IOException;
+
+    
+    //********** Operations *****************//
+
+    /**
+     * Commits the transactions if the channel is transactional.
+     * @throws IOException
+     * @throws JMException
+     */
+    void commitTransactions() throws IOException, JMException;
+
+    /**
+     * Rollsback the transactions if the channel is transactional.
+     * @throws IOException
+     * @throws JMException
+     */
+    void rollbackTransactions() throws IOException, JMException;
+
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,105 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+
+import java.util.List;
+
+/**
+ * Signals that a required delivery could not be made. This could be bacuse of
+ * the immediate flag being set and the queue having no consumers, or the mandatory
+ * flag being set and the exchange having no valid bindings.
+ */
+public abstract class RequiredDeliveryException extends AMQException
+{
+    private final String _message;
+    private final BasicPublishBody _publishBody;
+    private final ContentHeaderBody _contentHeaderBody;
+    private final List<ContentBody> _contentBodies;
+
+    public RequiredDeliveryException(String message, AMQMessage payload)
+    {
+        super(message);
+        _message = message;
+        _publishBody = payload.getPublishBody();
+        _contentHeaderBody = payload.getContentHeaderBody();
+        _contentBodies = payload.getContentBodies();
+    }
+
+    public RequiredDeliveryException(String message,
+                                BasicPublishBody publishBody,
+                                ContentHeaderBody contentHeaderBody,
+                                List<ContentBody> contentBodies)
+    {
+        super(message);
+        _message = message;
+        _publishBody = publishBody;
+        _contentHeaderBody = contentHeaderBody;
+        _contentBodies = contentBodies;
+    }
+
+    public BasicPublishBody getPublishBody()
+    {
+        return _publishBody;
+    }
+
+    public ContentHeaderBody getContentHeaderBody()
+    {
+        return _contentHeaderBody;
+    }
+
+    public List<ContentBody> getContentBodies()
+    {
+        return _contentBodies;
+    }
+
+    public CompositeAMQDataBlock getReturnMessage(int channel)
+    {
+        BasicReturnBody returnBody = new BasicReturnBody();
+        returnBody.exchange = _publishBody.exchange;
+        returnBody.replyCode = getReplyCode();
+        returnBody.replyText = _message;
+        returnBody.routingKey = _publishBody.routingKey;
+
+        AMQDataBlock[] allFrames = new AMQDataBlock[2 + _contentBodies.size()];
+        allFrames[0] = returnBody;
+        returnBody.channel = channel;
+        
+        allFrames[1] = _contentHeaderBody;
+        _contentHeaderBody.channel = channel;
+        
+        for (int i = 2; i < allFrames.length; i++)
+        {
+        	ContentBody body =  _contentBodies.get(i - 2);
+        	body.setChannel(channel);
+            allFrames[i] = body;
+        }
+
+        return new CompositeAMQDataBlock(allFrames);
+    }
+
+    public int getErrorCode()
+    {
+        return getReplyCode();
+    }    
+
+    public abstract int getReplyCode();
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/TxAck.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/TxAck.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/TxAck.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/TxAck.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,129 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.txn.TxnOp;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * A TxnOp implementation for handling accumulated acks
+ */    
+public class TxAck implements TxnOp
+{
+    private final UnacknowledgedMessageMap _map;
+    private final List <UnacknowledgedMessage> _unacked = new LinkedList<UnacknowledgedMessage>();
+    private final List<Long> _individual = new LinkedList<Long>();
+    private long _deliveryTag;
+    private boolean _multiple;
+
+    public TxAck(UnacknowledgedMessageMap map)
+    {
+        _map = map;
+    }
+
+    public void update(long deliveryTag, boolean multiple)
+    {
+        if(!multiple)
+        {
+            //have acked a single message that is not part of
+            //the previously acked region so record
+            //individually
+            _individual.add(deliveryTag);//_multiple && !multiple
+        }
+        else if(deliveryTag > _deliveryTag)
+        {
+            //have simply moved the last acked message on a
+            //bit
+            _deliveryTag = deliveryTag;
+            _multiple = true;
+        }
+    }
+
+    public void consolidate()
+    {
+        //lookup all the unacked messages that have been acked in this transaction
+        if(_multiple)
+        {
+            //get all the unacked messages for the accumulated
+            //multiple acks
+            _map.collect(_deliveryTag, true, _unacked);
+        }
+        //get any unacked messages for individual acks outside the
+        //range covered by multiple acks
+        for(long tag : _individual)
+        {
+            if(_deliveryTag < tag)
+            {
+                _map.collect(tag, false, _unacked);                
+            }
+        }
+    }
+
+    public boolean checkPersistent() throws AMQException
+    {     
+        //if any of the messages in unacked are persistent the txn
+        //buffer must be marked as persistent:
+        for(UnacknowledgedMessage msg : _unacked)
+        {
+            if(msg.message.isPersistent())
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void prepare() throws AMQException
+    {
+        //make persistent changes, i.e. dequeue and decrementReference
+        for(UnacknowledgedMessage msg : _unacked)
+        {
+            msg.discard();
+        }
+    }
+    
+    public void undoPrepare()
+    {
+        //decrementReference is annoyingly untransactional (due to
+        //in memory counter) so if we failed in prepare for full
+        //txn, this op will have to compensate by fixing the count
+        //in memory (persistent changes will be rolled back by store) 
+        for(UnacknowledgedMessage msg : _unacked)
+        {
+            msg.message.incrementReference();
+        }            
+    }
+
+    public void commit()
+    {
+        //remove the unacked messages from the channels map
+        _map.remove(_unacked);
+    }
+
+    public void rollback()
+    {
+    }
+}
+

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class UnacknowledgedMessage
+{
+    public final AMQMessage message;
+    public final String consumerTag;
+    public final long deliveryTag;
+    public AMQQueue queue;
+    
+    public UnacknowledgedMessage(AMQQueue queue, AMQMessage message, String consumerTag, long deliveryTag)
+    {
+        this.queue = queue;
+        this.message = message;
+        this.consumerTag = consumerTag;
+        this.deliveryTag = deliveryTag;
+    }
+
+    public void discard() throws AMQException
+    {
+        if (queue != null)
+        {
+            message.dequeue(queue);
+        }
+        message.decrementReference();
+    }
+}
+

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,27 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+
+public interface UnacknowledgedMessageMap
+{
+    public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs);
+    public void remove(List<UnacknowledgedMessage> msgs);
+}
+

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,81 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.ack;
+
+import java.util.List;
+import java.util.Map;
+
+public class UnacknowledgedMessageMapImpl implements UnacknowledgedMessageMap
+{
+    private final Object _lock;
+    private Map<Long, UnacknowledgedMessage> _map;
+
+    public UnacknowledgedMessageMapImpl(Object lock, Map<Long, UnacknowledgedMessage> map)
+    {
+        _lock = lock;
+        _map = map;
+    }
+
+    public void collect(long deliveryTag, boolean multiple, List<UnacknowledgedMessage> msgs)
+    {
+        if (multiple)
+        {
+            collect(deliveryTag, msgs);
+        }
+        else
+        {
+            msgs.add(get(deliveryTag));
+        }
+
+    }
+
+    public void remove(List<UnacknowledgedMessage> msgs)
+    {
+        synchronized(_lock)
+        {
+            for(UnacknowledgedMessage msg : msgs)
+            {
+                _map.remove(msg.deliveryTag);
+            }            
+        }
+    }
+
+    private UnacknowledgedMessage get(long key)
+    {
+        synchronized(_lock)
+        {
+            return _map.get(key);
+        }
+    }
+
+    private void collect(long key, List<UnacknowledgedMessage> msgs)
+    {
+        synchronized(_lock)
+        {
+            for(Map.Entry<Long, UnacknowledgedMessage> entry : _map.entrySet())
+            {
+                msgs.add(entry.getValue());
+                if (entry.getKey() == key)
+                {
+                    break;
+                }                        
+            }
+        }
+    }
+}
+

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,143 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.management.AMQManagedObject;
+import org.apache.qpid.server.management.Managable;
+import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.queue.QueueRegistry;
+
+import javax.management.NotCompliantMBeanException;
+
+public abstract class AbstractExchange implements Exchange, Managable
+{
+    private String _name;
+
+    protected boolean _durable;
+
+    protected int _ticket;
+
+    protected ExchangeMBean _exchangeMbean;
+
+    /**
+     * Whether the exchange is automatically deleted once all queues have detached from it
+     */
+    protected boolean _autoDelete;
+
+	protected QueueRegistry queueRegistry;
+
+    /**
+     * Abstract MBean class. This has some of the methods implemented from
+     * management intrerface for exchanges. Any implementaion of an
+     * Exchange MBean should extend this class.
+     */
+    protected abstract class ExchangeMBean extends AMQManagedObject implements ManagedExchange
+    {
+        public ExchangeMBean() throws NotCompliantMBeanException
+        {
+            super(ManagedExchange.class, ManagedExchange.TYPE);
+        }
+
+        public String getObjectInstanceName()
+        {
+            return _name;
+        }
+
+        public String getName()
+        {
+            return _name;
+        }
+
+        public Integer getTicketNo()
+        {
+            return _ticket;
+        }
+
+        public boolean isDurable()
+        {
+            return _durable;
+        }
+
+        public boolean isAutoDelete()
+        {
+            return _autoDelete;
+        }
+
+    } // End of MBean class
+
+    public String getName()
+    {
+        return _name;
+    }
+
+    /**
+     * Concrete exchanges must implement this method in order to create the managed representation. This is
+     * called during initialisation (template method pattern).
+     * @return the MBean
+     */
+    protected abstract ExchangeMBean createMBean() throws AMQException;
+
+    public void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+    {
+        _name = name;
+        _durable = durable;
+        _autoDelete = autoDelete;
+        _ticket = ticket;
+        _exchangeMbean = createMBean();
+        _exchangeMbean.register();
+    }
+
+    public boolean isDurable()
+    {
+        return _durable;
+    }
+
+    public boolean isAutoDelete()
+    {
+        return _autoDelete;
+    }
+
+    public int getTicket()
+    {
+        return _ticket;
+    }
+
+    public void close() throws AMQException
+    {
+        if (_exchangeMbean != null)
+        {
+            _exchangeMbean.unregister();
+        }
+    }
+
+    public String toString()
+    {
+        return getClass().getName() + "[" + getName() +"]";
+    }
+
+    public ManagedObject getManagedObject()
+    {
+        return _exchangeMbean;
+    }
+    
+    public void setQueueRegistry(QueueRegistry queueRegistry) {
+    	this.queueRegistry = queueRegistry;
+    }
+
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,68 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.QueueRegistry;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class DefaultExchangeFactory implements ExchangeFactory
+{
+    private static final Logger _logger = Logger.getLogger(DefaultExchangeFactory.class);
+
+    private Map<String, Class<? extends Exchange>> _exchangeClassMap = new HashMap<String, Class<? extends Exchange>>();
+
+	private final QueueRegistry queueRegistry;
+
+    public DefaultExchangeFactory(QueueRegistry queueRegistry)
+    {
+        this.queueRegistry = queueRegistry;
+		_exchangeClassMap.put("direct", org.apache.qpid.server.exchange.DestNameExchange.class);
+        _exchangeClassMap.put("topic", org.apache.qpid.server.exchange.DestWildExchange.class);
+        _exchangeClassMap.put("headers", org.apache.qpid.server.exchange.HeadersExchange.class);
+    }
+
+    public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete,
+                                   int ticket)
+            throws AMQException
+    {
+        Class<? extends Exchange> exchClass = _exchangeClassMap.get(type);
+        if (exchClass == null)
+        {
+            throw new AMQException(_logger, "Unknown exchange type: " + type);
+        }
+        try
+        {
+            Exchange e = exchClass.newInstance();
+            e.setQueueRegistry(queueRegistry);
+            e.initialise(exchange, durable, ticket, autoDelete);
+            return e;
+        }
+        catch (InstantiationException e)
+        {
+            throw new AMQException(_logger, "Unable to create exchange: " + e, e);
+        }
+        catch (IllegalAccessException e)
+        {
+            throw new AMQException(_logger, "Unable to create exchange: " + e, e);
+        }
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,99 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.ExchangeInitialiser;
+import org.apache.qpid.server.queue.AMQMessage;
+
+public class DefaultExchangeRegistry implements ExchangeRegistry
+{
+    private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
+
+    /**
+     * Maps from exchange name to exchange instance
+     */
+    private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
+
+	private final ExchangeFactory exchangeFactory;
+
+    public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
+    {
+        this.exchangeFactory = exchangeFactory;
+		//create 'standard' exchanges:
+        try
+        {
+            new ExchangeInitialiser().initialise(exchangeFactory, this);
+        }
+        catch(AMQException e)
+        {
+            _log.error("Failed to initialise exchanges: ", e);
+        }
+    }
+
+    public void registerExchange(Exchange exchange)
+    {
+        _exchangeMap.put(exchange.getName(), exchange);
+    }
+
+    public void unregisterExchange(String name, boolean inUse) throws AMQException
+    {
+        // TODO: check inUse argument
+        Exchange e = _exchangeMap.remove(name);
+        if (e != null)
+        {
+            e.close();
+        }
+        else
+        {
+            throw new AMQException("Unknown exchange " + name);
+        }
+    }
+
+    public Exchange getExchange(String name)
+    {
+        return _exchangeMap.get(name);
+    }
+
+    /**
+     * Routes content through exchanges, delivering it to 1 or more queues.
+     * @param payload
+     * @throws AMQException if something goes wrong delivering data
+     */
+    public void routeContent(AMQMessage payload) throws AMQException
+    {
+        final String exchange = payload.getPublishBody().exchange;
+        final Exchange exch = _exchangeMap.get(exchange);
+        // there is a small window of opportunity for the exchange to be deleted in between
+        // the JmsPublish being received (where the exchange is validated) and the final
+        // content body being received (which triggers this method)
+        if (exch == null)
+        {
+            throw new AMQException("Exchange '" + exchange + "' does not exist");
+        }
+        exch.route(payload);
+    }
+
+	public ExchangeFactory getExchangeFactory() {
+		return exchangeFactory;
+	}
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,229 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.NotCompliantMBeanException;
+import javax.management.openmbean.ArrayType;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.CompositeDataSupport;
+import javax.management.openmbean.CompositeType;
+import javax.management.openmbean.OpenDataException;
+import javax.management.openmbean.OpenType;
+import javax.management.openmbean.SimpleType;
+import javax.management.openmbean.TabularData;
+import javax.management.openmbean.TabularDataSupport;
+import javax.management.openmbean.TabularType;
+
+import org.apache.activemq.amqp.command.BasicPublishBody;
+import org.apache.activemq.amqp.command.FieldTable;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.management.MBeanConstructor;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+
+public class DestNameExchange extends AbstractExchange
+{
+    private static final Logger _logger = Logger.getLogger(DestNameExchange.class);
+
+    /**
+     * Maps from queue name to queue instances
+     */
+    private final Index _index = new Index();
+
+    /**
+     * MBean class implementing the management interfaces.
+     */
+    @MBeanDescription("Management Bean for Direct Exchange")
+    private final class DestNameExchangeMBean extends ExchangeMBean
+    {
+        private String[]   _bindingItemNames = {"BindingKey", "QueueNames"};
+        private String[]   _bindingItemDescriptions = {"Binding key", "Queue Names"};
+        private String[]   _bindingItemIndexNames = {"BindingKey"};
+        private OpenType[] _bindingItemTypes = new OpenType[2];
+
+        private CompositeType      _bindingDataType = null;
+        private TabularType        _bindinglistDataType = null;
+        private TabularDataSupport _bindingList = null;
+
+        @MBeanConstructor("Creates an MBean for AMQ direct exchange")
+        public DestNameExchangeMBean()  throws NotCompliantMBeanException
+        {
+            super();
+            init();
+        }
+
+        /**
+         * initialises the OpenType objects.
+         */
+        private void init()
+        {
+            try
+            {
+                _bindingItemTypes[0] = SimpleType.STRING;
+                //_bindingItemTypes[1] = ArrayType.getArrayType(SimpleType.STRING);
+                _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+
+                _bindingDataType = new CompositeType("QueueBinding",
+                                             "Binding key and bound Queue names",
+                                             _bindingItemNames,
+                                             _bindingItemDescriptions,
+                                             _bindingItemTypes);
+                _bindinglistDataType = new TabularType("Bindings",
+                                             "List of queue bindings for " + getName() ,
+                                             _bindingDataType,
+                                             _bindingItemIndexNames);
+            }
+            catch(OpenDataException ex)
+            {
+                //It should never occur.
+                _logger.error("OpenDataTypes could not be created.", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+
+        public TabularData viewBindings()
+            throws OpenDataException
+        {
+            Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+            _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+            for (Map.Entry<String, List<AMQQueue>> entry : bindings.entrySet())
+            {
+                String key = entry.getKey();
+                List<String> queueList = new ArrayList<String>();
+
+                List<AMQQueue> queues = entry.getValue();
+                for (AMQQueue q : queues)
+                {
+                    queueList.add(q.getName());
+                }
+
+                Object[] bindingItemValues = {key, queueList.toArray(new String[0])};
+                CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
+                                                                     _bindingItemNames,
+                                                                     bindingItemValues);
+                _bindingList.put(bindingData);
+            }
+
+            return _bindingList;
+        }
+
+        public void createBinding(String queueName, String binding)
+                throws JMException
+        {
+            AMQQueue queue = queueRegistry.getQueue(queueName);
+
+            if (queue == null)
+            {
+                throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+            }
+
+            try
+            {
+                registerQueue(binding, queue, null);
+                queue.bind(binding, DestNameExchange.this);
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex);
+            }
+        }
+
+    }// End of MBean class
+
+
+    protected ExchangeMBean createMBean() throws AMQException
+    {
+        try
+        {
+            return new DestNameExchangeMBean();
+        }
+        catch (NotCompliantMBeanException ex)
+        {
+            _logger.error("Exception occured in creating the DestNameExchenge", ex);
+            throw new AMQException("Exception occured in creating the DestNameExchenge", ex);
+        }
+    }
+
+    public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+        if (!_index.add(routingKey, queue))
+        {
+            _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+        }
+        else
+        {
+            _logger.debug("Binding queue " + queue + " with routing key " + routingKey
+                          + " to exchange " + this);
+        }
+    }
+
+    public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+
+        if (!_index.remove(routingKey, queue))
+        {
+            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey + ". No queue was registered with that routing key");
+        }
+    }
+
+    public void route(AMQMessage payload) throws AMQException
+    {
+        BasicPublishBody publishBody = payload.getPublishBody();
+
+        final String routingKey = publishBody.routingKey;
+        final List<AMQQueue> queues = _index.get(routingKey);
+        if (queues == null || queues.isEmpty())
+        {
+            String msg = "Routing key " + routingKey + " is not known to " + this;
+            if (publishBody.mandatory)
+            {
+                throw new NoRouteException(msg, payload);
+            }
+            else
+            {
+                _logger.warn(msg);
+            }
+        }
+        else
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Publishing message to queue " + queues);
+            }
+
+            for (AMQQueue q : queues)
+            {
+                q.deliver(payload);
+            }
+        }
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,221 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.activemq.amqp.command.*;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.management.MBeanDescription;
+import org.apache.qpid.server.management.MBeanConstructor;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import javax.management.NotCompliantMBeanException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class DestWildExchange extends AbstractExchange
+{
+    private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
+
+    private ConcurrentHashMap<String, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<String, List<AMQQueue>>();
+
+    /**
+     *  DestWildExchangeMBean class implements the management interface for the
+     *  Topic exchanges.
+     */
+    @MBeanDescription("Management Bean for Topic Exchange")
+    private final class DestWildExchangeMBean extends ExchangeMBean
+    {
+        private String[]   _bindingItemNames = {"BindingKey", "QueueNames"};
+        private String[]   _bindingItemDescriptions = {"Binding key", "Queue Names"};
+        private String[]   _bindingItemIndexNames = {"BindingKey"};
+        private OpenType[] _bindingItemTypes = new OpenType[2];
+
+        private CompositeType      _bindingDataType = null;
+        private TabularType        _bindinglistDataType = null;
+        private TabularDataSupport _bindingList = null;
+
+        @MBeanConstructor("Creates an MBean for AMQ topic exchange")
+        public DestWildExchangeMBean()  throws NotCompliantMBeanException
+        {
+            super();
+            init();
+        }
+
+        /**
+         * initialises the OpenType objects.
+         */
+        private void init()
+        {
+            try
+            {
+                _bindingItemTypes[0] = SimpleType.STRING;
+                _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+
+                _bindingDataType = new CompositeType("QueueBinding",
+                                             "Binding key and bound Queue names",
+                                             _bindingItemNames,
+                                             _bindingItemDescriptions,
+                                             _bindingItemTypes);
+                _bindinglistDataType = new TabularType("Bindings",
+                                             "List of queue bindings for " + getName(),
+                                             _bindingDataType,
+                                             _bindingItemIndexNames);
+            }
+            catch(OpenDataException ex)
+            {
+                //It should never occur.
+                _logger.error("OpenDataTypes could not be created.", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+
+        public TabularData viewBindings()
+            throws OpenDataException
+        {
+            _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+            for (Map.Entry<String, List<AMQQueue>> entry : _routingKey2queues.entrySet())
+            {
+                String key = entry.getKey();
+                List<String> queueList = new ArrayList<String>();
+
+                List<AMQQueue> queues = entry.getValue();
+                for (AMQQueue q : queues)
+                {
+                    queueList.add(q.getName());
+                }
+
+                Object[] bindingItemValues = {key, queueList.toArray(new String[0])};
+                CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
+                                                                     _bindingItemNames,
+                                                                     bindingItemValues);
+                _bindingList.put(bindingData);
+            }
+
+            return _bindingList;
+        }
+
+        public void createBinding(String queueName, String binding)
+            throws JMException
+        {
+            AMQQueue queue = queueRegistry.getQueue(queueName);
+
+            if (queue == null)
+                throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+
+            try
+            {
+                registerQueue(binding, queue, null);
+                queue.bind(binding, DestWildExchange.this);
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex);
+            }
+        }
+
+    } // End of MBean class
+
+
+    public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+        // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
+        List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+        // if we got null back, no previous value was associated with the specified routing key hence
+        // we need to read back the new value just put into the map
+        if (queueList == null)
+        {
+            queueList = _routingKey2queues.get(routingKey);
+        }
+        if (!queueList.contains(queue))
+        {
+            queueList.add(queue);
+        }
+        else if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+        }
+
+    }
+
+    public void route(AMQMessage payload) throws AMQException
+    {
+        BasicPublishBody publishBody = payload.getPublishBody();
+
+        final String routingKey = publishBody.routingKey;
+        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        // if we have no registered queues we have nothing to do
+        // TODO: add support for the immediate flag
+        if (queues == null)
+        {
+            //todo Check for valid topic - mritchie
+            return;
+        }
+
+        for (AMQQueue q : queues)
+        {
+            // TODO: modify code generator to add clone() method then clone the deliver body
+            // without this addition we have a race condition - we will be modifying the body
+            // before the encoder has encoded the body for delivery
+            q.deliver(payload);
+        }
+    }
+
+    public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+
+        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        if (queues == null)
+        {
+            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey + ". No queue was registered with that routing key");
+
+        }
+        boolean removedQ = queues.remove(queue);
+        if (!removedQ)
+        {
+            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey);
+        }
+    }
+
+    protected ExchangeMBean createMBean()  throws AMQException
+    {
+        try
+        {
+            return new DestWildExchangeMBean();
+        }
+        catch (NotCompliantMBeanException ex)
+        {
+            _logger.error("Exception occured in creating the DestWildExchenge", ex);
+            throw new AMQException("Exception occured in creating the DestWildExchenge", ex);
+        }
+    }
+}

Added: incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/Exchange.java?view=auto&rev=468414
==============================================================================
--- incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/Exchange.java (added)
+++ incubator/activemq/sandbox/qpid/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Oct 27 08:27:20 2006
@@ -0,0 +1,50 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.activemq.amqp.command.FieldTable;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.QueueRegistry;
+
+public interface Exchange
+{
+    String getName();
+
+    void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+
+    boolean isDurable();
+
+    /**
+     * @return true if the exchange will be deleted after all queues have been detached
+     */
+    boolean isAutoDelete();
+
+    int getTicket();
+
+    void close() throws AMQException;
+
+    void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+    void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
+
+    void route(AMQMessage message) throws AMQException;
+
+	void setQueueRegistry(QueueRegistry queueRegistry);
+}



Mime
View raw message