qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject svn commit: r447994 [11/46] - in /incubator/qpid/trunk/qpid: ./ cpp/ cpp/bin/ cpp/broker/ cpp/broker/inc/ cpp/broker/src/ cpp/broker/test/ cpp/client/ cpp/client/inc/ cpp/client/src/ cpp/client/test/ cpp/common/ cpp/common/concurrent/ cpp/common/concur...
Date Tue, 19 Sep 2006 22:07:25 GMT
Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,92 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.protocol.ExchangeInitialiser;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class DefaultExchangeRegistry implements ExchangeRegistry
+{
+    private static final Logger _log = Logger.getLogger(DefaultExchangeRegistry.class);
+
+    /**
+     * Maps from exchange name to exchange instance
+     */
+    private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
+
+    public DefaultExchangeRegistry(ExchangeFactory exchangeFactory)
+    {
+        //create 'standard' exchanges:
+        try
+        {
+            new ExchangeInitialiser().initialise(exchangeFactory, this);
+        }
+        catch(AMQException e)
+        {
+            _log.error("Failed to initialise exchanges: ", e);
+        }
+    }
+
+    public void registerExchange(Exchange exchange)
+    {
+        _exchangeMap.put(exchange.getName(), exchange);
+    }
+
+    public void unregisterExchange(String name, boolean inUse) throws AMQException
+    {
+        // TODO: check inUse argument
+        Exchange e = _exchangeMap.remove(name);
+        if (e != null)
+        {
+            e.close();
+        }
+        else
+        {
+            throw new AMQException("Unknown exchange " + name);
+        }
+    }
+
+    public Exchange getExchange(String name)
+    {
+        return _exchangeMap.get(name);
+    }
+
+    /**
+     * Routes content through exchanges, delivering it to 1 or more queues.
+     * @param payload
+     * @throws AMQException if something goes wrong delivering data
+     */
+    public void routeContent(AMQMessage payload) throws AMQException
+    {
+        final String exchange = payload.getPublishBody().exchange;
+        final Exchange exch = _exchangeMap.get(exchange);
+        // there is a small window of opportunity for the exchange to be deleted in between
+        // the JmsPublish being received (where the exchange is validated) and the final
+        // content body being received (which triggers this method)
+        if (exch == null)
+        {
+            throw new AMQException("Exchange '" + exchange + "' does not exist");
+        }
+        exch.route(payload);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,204 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.log4j.Logger;
+
+import javax.management.openmbean.*;
+import javax.management.MBeanException;
+import javax.management.JMException;
+import java.util.List;
+import java.util.Map;
+import java.util.ArrayList;
+
+public class DestNameExchange extends AbstractExchange
+{
+    private static final Logger _logger = Logger.getLogger(DestNameExchange.class);
+
+    /**
+     * Maps from queue name to queue instances
+     */
+    private final Index _index = new Index();
+
+    /**
+     * MBean class implementing the management interfaces.
+     */
+    private final class DestNameExchangeMBean extends ExchangeMBean
+    {
+        private String[]   _bindingItemNames = {"BindingKey", "QueueNames"};
+        private String[]   _bindingItemDescriptions = {"Binding key", "Queue Names"};
+        private String[]   _bindingItemIndexNames = {"BindingKey"};
+        private OpenType[] _bindingItemTypes = new OpenType[2];
+
+        private CompositeType      _bindingDataType = null;
+        private TabularType        _bindinglistDataType = null;
+        private TabularDataSupport _bindingList = null;
+
+        public DestNameExchangeMBean()
+        {
+            super();
+            init();
+        }
+
+        /**
+         * initialises the OpenType objects.
+         */
+        private void init()
+        {
+            try
+            {
+                _bindingItemTypes[0] = SimpleType.STRING;
+                //_bindingItemTypes[1] = ArrayType.getArrayType(SimpleType.STRING);
+                _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+
+                _bindingDataType = new CompositeType("QueueBinding",
+                                             "Queue and binding keye",
+                                             _bindingItemNames,
+                                             _bindingItemDescriptions,
+                                             _bindingItemTypes);
+                _bindinglistDataType = new TabularType("Bindings",
+                                             "List of queues and binding keys",
+                                             _bindingDataType,
+                                             _bindingItemIndexNames);
+            }
+            catch(OpenDataException ex)
+            {
+                //It should never occur.
+                _logger.error("OpenDataTypes could not be created.", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+
+        public TabularData viewBindings()
+            throws OpenDataException
+        {
+            Map<String, List<AMQQueue>> bindings = _index.getBindingsMap();
+            _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+            for (Map.Entry<String, List<AMQQueue>> entry : bindings.entrySet())
+            {
+                String key = entry.getKey();
+                List<String> queueList = new ArrayList<String>();
+
+                List<AMQQueue> queues = entry.getValue();
+                for (AMQQueue q : queues)
+                {
+                    queueList.add(q.getName());
+                }
+
+                Object[] bindingItemValues = {key, queueList.toArray(new String[0])};
+                CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
+                                                                     _bindingItemNames,
+                                                                     bindingItemValues);
+                _bindingList.put(bindingData);
+            }
+
+            return _bindingList;
+        }
+
+        public void createBinding(String queueName, String binding)
+            throws JMException
+        {
+            AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
+
+            if (queue == null)
+                throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+
+            try
+            {
+                registerQueue(binding, queue, null);
+                queue.bind(binding, DestNameExchange.this);
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex);
+            }
+        }
+
+    }// End of MBean class
+
+
+    protected ExchangeMBean createMBean()
+    {
+        return new DestNameExchangeMBean();
+    }
+
+    public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+        if(!_index.add(routingKey, queue))
+        {
+            _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+        }
+        else
+        {
+            _logger.debug("Binding queue " + queue + " with routing key " + routingKey
+                          + " to exchange " + this);
+        }
+    }
+
+    public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+
+        if (!_index.remove(routingKey, queue))
+        {
+            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey + ". No queue was registered with that routing key");
+        }
+    }
+
+    public void route(AMQMessage payload) throws AMQException
+    {
+        BasicPublishBody publishBody = payload.getPublishBody();
+
+        final String routingKey = publishBody.routingKey;
+        final List<AMQQueue> queues = _index.get(routingKey);
+        if (queues == null || queues.isEmpty())
+        {
+            String msg = "Routing key " + routingKey + " is not known to " + this;
+            if (publishBody.mandatory)
+            {
+                throw new NoRouteException(msg, payload);
+            }
+            else
+            {
+                _logger.warn(msg);
+            }
+        }
+        else
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Publishing message to queue " + queues);
+            }
+
+            for(AMQQueue q :queues)
+            {
+                q.deliver(payload);
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestNameExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,210 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+
+import javax.management.openmbean.*;
+import javax.management.JMException;
+import javax.management.MBeanException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class DestWildExchange extends AbstractExchange
+{
+    private static final Logger _logger = Logger.getLogger(DestWildExchange.class);
+
+    private ConcurrentHashMap<String, List<AMQQueue>> _routingKey2queues = new ConcurrentHashMap<String, List<AMQQueue>>();
+
+    /**
+     *  DestWildExchangeMBean class implements the management interface for the
+     *  Topic exchanges.
+     */
+    private final class DestWildExchangeMBean extends ExchangeMBean
+    {
+        private String[]   _bindingItemNames = {"BindingKey", "QueueNames"};
+        private String[]   _bindingItemDescriptions = {"Binding key", "Queue Names"};
+        private String[]   _bindingItemIndexNames = {"BindingKey"};
+        private OpenType[] _bindingItemTypes = new OpenType[2];
+
+        private CompositeType      _bindingDataType = null;
+        private TabularType        _bindinglistDataType = null;
+        private TabularDataSupport _bindingList = null;
+
+        public DestWildExchangeMBean()
+        {
+            super();
+            init();
+        }
+
+        /**
+         * initialises the OpenType objects.
+         */
+        private void init()
+        {
+            try
+            {
+                _bindingItemTypes[0] = SimpleType.STRING;
+                _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
+
+                _bindingDataType = new CompositeType("QueueBinding",
+                                             "Queue and binding keye",
+                                             _bindingItemNames,
+                                             _bindingItemDescriptions,
+                                             _bindingItemTypes);
+                _bindinglistDataType = new TabularType("Bindings",
+                                             "List of queues and binding keys",
+                                             _bindingDataType,
+                                             _bindingItemIndexNames);
+            }
+            catch(OpenDataException ex)
+            {
+                //It should never occur.
+                _logger.error("OpenDataTypes could not be created.", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+
+        public TabularData viewBindings()
+            throws OpenDataException
+        {
+            _bindingList = new TabularDataSupport(_bindinglistDataType);
+
+            for (Map.Entry<String, List<AMQQueue>> entry : _routingKey2queues.entrySet())
+            {
+                String key = entry.getKey();
+                List<String> queueList = new ArrayList<String>();
+
+                List<AMQQueue> queues = entry.getValue();
+                for (AMQQueue q : queues)
+                {
+                    queueList.add(q.getName());
+                }
+
+                Object[] bindingItemValues = {key, queueList.toArray(new String[0])};
+                CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
+                                                                     _bindingItemNames,
+                                                                     bindingItemValues);
+                _bindingList.put(bindingData);
+            }
+
+            return _bindingList;
+        }
+
+        public void createBinding(String queueName, String binding)
+            throws JMException
+        {
+            AMQQueue queue = ApplicationRegistry.getInstance().getQueueRegistry().getQueue(queueName);
+
+            if (queue == null)
+                throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
+
+            try
+            {
+                registerQueue(binding, queue, null);
+                queue.bind(binding, DestWildExchange.this);
+            }
+            catch (AMQException ex)
+            {
+                throw new MBeanException(ex);
+            }
+        }
+
+    } // End of MBean class
+
+
+    public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+        // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition
+        List<AMQQueue> queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList<AMQQueue>());
+        // if we got null back, no previous value was associated with the specified routing key hence
+        // we need to read back the new value just put into the map
+        if (queueList == null)
+        {
+            queueList = _routingKey2queues.get(routingKey);
+        }
+        if (!queueList.contains(queue))
+        {
+            queueList.add(queue);
+        }
+        else if(_logger.isDebugEnabled())
+        {
+            _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey);
+        }
+
+    }
+
+    public void route(AMQMessage payload) throws AMQException
+    {
+        BasicPublishBody publishBody = payload.getPublishBody();
+
+        final String routingKey = publishBody.routingKey;
+        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        // if we have no registered queues we have nothing to do
+        // TODO: add support for the immediate flag
+        if (queues == null)
+        {
+            //todo Check for valid topic - mritchie
+            return;
+        }
+
+        for (AMQQueue q : queues)
+        {
+            // TODO: modify code generator to add clone() method then clone the deliver body
+            // without this addition we have a race condition - we will be modifying the body
+            // before the encoder has encoded the body for delivery
+            q.deliver(payload);
+        }
+    }
+
+    public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+    {
+        assert queue != null;
+        assert routingKey != null;
+
+        List<AMQQueue> queues = _routingKey2queues.get(routingKey);
+        if (queues == null)
+        {
+            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey + ". No queue was registered with that routing key");
+
+        }
+        boolean removedQ = queues.remove(queue);
+        if (!removedQ)
+        {
+            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+                                   " with routing key " + routingKey);
+        }
+    }
+
+    protected ExchangeMBean createMBean()
+    {
+        return new DestWildExchangeMBean();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/DestWildExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Exchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Exchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Exchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,47 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+
+public interface Exchange
+{
+    String getName();
+
+    void initialise(String name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+
+    boolean isDurable();
+
+    /**
+     * @return true if the exchange will be deleted after all queues have been detached
+     */
+    boolean isAutoDelete();
+
+    int getTicket();
+
+    void close() throws AMQException;
+
+    void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException;
+
+    void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException;
+
+    void route(AMQMessage message) throws AMQException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Exchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeFactory.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeFactory.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeFactory.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+
+
+public interface ExchangeFactory
+{
+    Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete,
+                            int ticket)
+            throws AMQException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeInUseException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeInUseException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeInUseException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeInUseException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,28 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+
+public class ExchangeInUseException extends AMQException
+{
+    public ExchangeInUseException(String exchangeName)
+    {
+        super("Exchange " + exchangeName + " is currently in use");
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeInUseException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeRegistry.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeRegistry.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+
+
+public interface ExchangeRegistry extends MessageRouter
+{
+    void registerExchange(Exchange exchange);
+
+    /**
+     * Unregister an exchange
+     * @param name name of the exchange to delete
+     * @param inUse if true, do NOT delete the exchange if it is in use (has queues bound to it)
+     * @throws ExchangeInUseException when the exchange cannot be deleted because it is in use
+     * @throws AMQException
+     */
+    void unregisterExchange(String name, boolean inUse) throws ExchangeInUseException, AMQException;
+
+    Exchange getExchange(String name);
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ExchangeRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersBinding.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersBinding.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersBinding.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,142 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Defines binding and matching based on a set of headers.
+ */
+class HeadersBinding
+{
+    private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
+
+    private final Map _mappings = new HashMap();
+    private final Set<Object> required = new HashSet<Object>();
+    private final Set<Map.Entry> matches = new HashSet<Map.Entry>();
+    private boolean matchAny;
+
+    /**
+     * Creates a binding for a set of mappings. Those mappings whose value is
+     * null or the empty string are assumed only to be required headers, with
+     * no constraint on the value. Those with a non-null value are assumed to
+     * define a required match of value. 
+     * @param mappings the defined mappings this binding should use
+     */
+    HeadersBinding(Map mappings)
+    {
+        //noinspection unchecked
+        this(mappings == null ? new HashSet<Map.Entry>() : mappings.entrySet());
+        _mappings.putAll(mappings);
+    }
+
+    private HeadersBinding(Set<Map.Entry> entries)
+    {
+        for (Map.Entry e : entries)
+        {
+            if (isSpecial(e.getKey()))
+            {
+                processSpecial((String) e.getKey(), e.getValue());
+            }
+            else if (e.getValue() == null || e.getValue().equals(""))
+            {
+                required.add(e.getKey());
+            }
+            else
+            {
+                matches.add(e);
+            }
+        }
+    }
+
+    protected Map getMappings()
+    {
+        return _mappings;
+    }
+
+    /**
+     * Checks whether the supplied headers match the requirements of this binding
+     * @param headers the headers to check
+     * @return true if the headers define any required keys and match any required
+     * values
+     */
+    public boolean matches(Map headers)
+    {
+        if(headers == null)
+        {
+            return required.isEmpty() && matches.isEmpty();
+        }
+        else
+        {
+            return matchAny ? or(headers) : and(headers);
+        }
+    }
+
+    private boolean and(Map headers)
+    {
+        //need to match all the defined mapping rules:
+        return headers.keySet().containsAll(required)
+                && headers.entrySet().containsAll(matches);
+    }
+
+    private boolean or(Map headers)
+    {
+        //only need to match one mapping rule:
+        return !Collections.disjoint(headers.keySet(), required)
+                || !Collections.disjoint(headers.entrySet(), matches);
+    }
+
+    private void processSpecial(String key, Object value)
+    {
+        if("X-match".equalsIgnoreCase(key))
+        {
+            matchAny = isAny(value);
+        }
+        else
+        {
+            _logger.warn("Ignoring special header: " + key);
+        }
+    }
+
+    private boolean isAny(Object value)
+    {
+        if(value instanceof String)
+        {
+            if("any".equalsIgnoreCase((String) value)) return true;
+            if("all".equalsIgnoreCase((String) value)) return false;
+        }
+        _logger.warn("Ignoring unrecognised match type: " + value);
+        return false;//default to all
+    }
+
+    static boolean isSpecial(Object key)
+    {
+        return key instanceof String && isSpecial((String) key);
+    }
+
+    static boolean isSpecial(String key)
+    {
+        return key.startsWith("X-") || key.startsWith("x-");
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersBinding.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,226 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQMessage;
+
+import javax.management.openmbean.*;
+import javax.management.ServiceNotFoundException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An exchange that binds queues based on a set of required headers and header values
+ * and routes messages to these queues by matching the headers of the message against
+ * those with which the queues were bound.
+ * <p/>
+ * <pre>
+ * The Headers Exchange
+ *
+ *  Routes messages according to the value/presence of fields in the message header table.
+ *  (Basic and JMS content has a content header field called "headers" that is a table of
+ *   message header fields).
+ *
+ *  class = "headers"
+ *  routing key is not used
+ *
+ *  Has the following binding arguments:
+ *
+ *  the X-match field - if "all", does an AND match (used for GRM), if "any", does an OR match.
+ *  other fields prefixed with "X-" are ignored (and generate a console warning message).
+ *  a field with no value or empty value indicates a match on presence only.
+ *  a field with a value indicates match on field presence and specific value.
+ *
+ *  Standard instances:
+ *
+ *  amq.match - pub/sub on field content/value
+ *  </pre>
+ */
+public class HeadersExchange extends AbstractExchange
+{
+    private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
+
+    private final List<Registration> _bindings = new CopyOnWriteArrayList<Registration>();
+
+    /**
+     * HeadersExchangeMBean class implements the management interface for the
+     * Header Exchanges.
+     */
+    private final class HeadersExchangeMBean extends ExchangeMBean
+    {
+        private String[]   _bindingItemNames = {"Queue", "HeaderBinding"};
+        private String[]   _bindingItemDescriptions = {"Queue Name", "Header attribute bindings"};
+        private String[]   _bindingItemIndexNames = {"Queue"};
+        private OpenType[] _bindingItemTypes = new OpenType[2];
+
+        private CompositeType      _bindingDataType = null;
+        private TabularType        _bindinglistDataType = null;
+        private TabularDataSupport _bindingList = null;
+
+        public HeadersExchangeMBean()
+        {
+            super();
+            init();
+        }
+        /**
+         * initialises the OpenType objects.
+         */
+        private void init()
+        {
+            try
+            {
+                _bindingItemTypes[0] = SimpleType.STRING;
+                _bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);;
+
+                _bindingDataType = new CompositeType("QueueAndHeaderAttributesBinding",
+                                             "Queue and header attributes binding",
+                                             _bindingItemNames,
+                                             _bindingItemDescriptions,
+                                             _bindingItemTypes);
+                _bindinglistDataType = new TabularType("HeaderBindings",
+                                             "List of queues and related header attribute bindings",
+                                             _bindingDataType,
+                                             _bindingItemIndexNames);
+            }
+            catch(OpenDataException ex)
+            {
+                //It should never occur.
+                _logger.error("OpenDataTypes could not be created.", ex);
+                throw new RuntimeException(ex);
+            }
+        }
+
+        public TabularData viewBindings()
+            throws OpenDataException
+        {
+            _bindingList = new TabularDataSupport(_bindinglistDataType);
+            for (Iterator<Registration> itr = _bindings.iterator(); itr.hasNext();)
+            {
+                Registration registration = itr.next();
+                String queueName = registration.queue.getName();
+
+                HeadersBinding headers = registration.binding;
+                Map<Object, Object> headerMappings = headers.getMappings();
+                List<String> mappingList = new ArrayList<String>();
+
+                for (Map.Entry<Object, Object> en : headerMappings.entrySet())
+                {
+                    String key = en.getKey().toString();
+                    String value = en.getValue().toString();
+
+                    mappingList.add(key + "=" + value);
+                }
+
+                Object[] bindingItemValues = {queueName, mappingList.toArray(new String[0])};
+                CompositeData bindingData = new CompositeDataSupport(_bindingDataType,
+                                                                     _bindingItemNames,
+                                                                     bindingItemValues);
+                _bindingList.put(bindingData);
+            }
+
+            return _bindingList;
+        }
+
+        public void createBinding(String QueueName, String binding)
+            throws ServiceNotFoundException
+        {
+            throw new ServiceNotFoundException("This service is not supported by \"" + this.getName() + "\"");
+        }
+
+    } // End of MBean class
+
+    public void registerQueue(String routingKey, AMQQueue queue, FieldTable args) throws AMQException
+    {
+        _logger.debug("Exchange " + getName() + ": Binding " + queue.getName() + " with " + args);
+        _bindings.add(new Registration(new HeadersBinding(args), queue));
+    }
+
+    public void deregisterQueue(String routingKey, AMQQueue queue) throws AMQException
+    {
+        _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
+        _bindings.remove(new Registration(null, queue));
+    }
+
+    public void route(AMQMessage payload) throws AMQException
+    {
+        Map headers = getHeaders(payload.getContentHeaderBody());
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
+        }
+        boolean delivered = false;
+        for (Registration e : _bindings)
+        {
+            if (e.binding.matches(headers))
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Exchange " + getName() + ": delivering message with headers " +
+                                  headers + " to " + e.queue.getName());
+                }
+                e.queue.deliver(payload);
+                delivered = true;
+            }
+        }
+        if (!delivered)
+        {
+            _logger.warn("Exchange " + getName() + ": message not routable.");
+        }
+    }
+
+    protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
+    {
+        //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
+        //but these are not yet implemented.
+        return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
+    }
+
+    protected ExchangeMBean createMBean()
+    {
+        return new HeadersExchangeMBean();
+    }
+
+    private static class Registration
+    {
+        private final HeadersBinding binding;
+        private final AMQQueue queue;
+
+        Registration(HeadersBinding binding, AMQQueue queue)
+        {
+            this.binding = binding;
+            this.queue = queue;
+        }
+
+        public int hashCode()
+        {
+            return queue.hashCode();
+        }
+
+        public boolean equals(Object o)
+        {
+            return o instanceof Registration && ((Registration) o).queue.equals(queue);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/HeadersExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Index.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Index.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Index.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Index.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,85 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQQueue;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * An index of queues against routing key. Allows multiple queues to be stored
+ * against the same key. Used in the DestNameExchange.
+ */
+class Index
+{
+    private ConcurrentMap<String, List<AMQQueue>> _index
+            = new ConcurrentHashMap<String, List<AMQQueue>>();
+
+    boolean add(String key, AMQQueue queue)
+    {
+        List<AMQQueue> queues = _index.get(key);
+        if(queues == null)
+        {
+            queues = new CopyOnWriteArrayList<AMQQueue>();
+            //next call is atomic, so there is no race to create the list
+            List<AMQQueue> active = _index.putIfAbsent(key, queues);
+            if(active != null)
+            {
+                //someone added the new one in faster than we did, so use theirs
+                queues = active;
+            }
+        }
+        if(queues.contains(queue))
+        {
+            return false;
+        }
+        else
+        {
+            return queues.add(queue);
+        }
+    }
+
+    boolean remove(String key, AMQQueue queue)
+    {
+        List<AMQQueue> queues = _index.get(key);
+        if (queues != null)
+        {
+            boolean removed = queues.remove(queue);
+            if (queues.size() == 0)
+            {
+                _index.remove(key);
+            }
+            return removed;
+        }
+        return false;
+    }
+
+    List<AMQQueue> get(String key)
+    {
+        return _index.get(key);
+    }
+
+    Map<String, List<AMQQueue>> getBindingsMap()
+    {
+        return _index;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/Index.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ManagedExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ManagedExchange.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ManagedExchange.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ManagedExchange.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,78 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import javax.management.openmbean.TabularData;
+import javax.management.JMException;
+import java.io.IOException;
+
+/**
+ * The management interface exposed to allow management of an Exchange.
+ * @author  Robert J. Greig
+ * @author  Bhupendra Bhardwaj
+ * @version 0.1
+ */
+public interface ManagedExchange
+{
+    static final String TYPE = "Exchange";
+
+    /**
+     * Returns the name of the managed exchange.
+     * @return the name of the exchange.
+     * @throws IOException
+     */
+    String getName() throws IOException;
+
+    /**
+     * Tells if the exchange is durable or not.
+     * @return true if the exchange is durable.
+     * @throws IOException
+     */
+    boolean isDurable() throws IOException;
+
+    /**
+     * Tells if the exchange is set for autodelete or not.
+     * @return true if the exchange is set as autodelete.
+     * @throws IOException
+     */
+    boolean isAutoDelete() throws IOException;
+
+    int getTicketNo() throws IOException;
+
+
+    // Operations
+
+    /**
+     * Returns all the bindings this exchange has with the queues.
+     * @return  the bindings with the exchange.
+     * @throws IOException
+     * @throws JMException
+     */
+    TabularData viewBindings()
+        throws IOException, JMException;
+
+    /**
+     * Creates new binding with the given queue and binding.
+     * @param QueueName
+     * @param binding
+     * @throws JMException
+     */
+    void createBinding(String QueueName, String binding)
+        throws JMException;
+
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/ManagedExchange.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.AMQException;
+
+/**
+ * Separated out from the ExchangeRegistry interface to allow components
+ * that use only this part to have a dependency with a reduced footprint.
+ *
+ */
+public interface MessageRouter
+{
+    /**
+     * Routes content through exchanges, delivering it to 1 or more queues.
+     * @param message the message to be routed
+     * @throws org.apache.qpid.AMQException if something goes wrong delivering data
+     */
+    void routeContent(AMQMessage message) throws AMQException;
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/MessageRouter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/NoRouteException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/NoRouteException.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/NoRouteException.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/NoRouteException.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,39 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * Thrown by an exchange if there is no way to route a message with the
+ * mandatory flag set.
+ */
+public class NoRouteException extends RequiredDeliveryException
+{
+    public NoRouteException(String msg, AMQMessage message)
+    {
+        super(msg, message);
+    }
+
+    public int getReplyCode()
+    {
+        return AMQConstant.NO_ROUTE.getCode();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/exchange/NoRouteException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicAckMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicAckMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicAckMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicAckMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,52 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicAckBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.AMQChannel;
+
+public class BasicAckMethodHandler implements StateAwareMethodListener<BasicAckBody>
+{     
+    private static final BasicAckMethodHandler _instance = new BasicAckMethodHandler();
+
+    public static BasicAckMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicAckMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<BasicAckBody> evt) throws AMQException
+    {
+        BasicAckBody body = evt.getMethod();
+        final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        // this method throws an AMQException if the delivery tag is not known
+        channel.acknowledgeMessage(body.deliveryTag, body.multiple);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicAckMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelBody;
+import org.apache.qpid.framing.BasicCancelOkBody;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+
+public class BasicCancelMethodHandler implements StateAwareMethodListener<BasicCancelBody>
+{
+    private static final BasicCancelMethodHandler _instance = new BasicCancelMethodHandler();
+
+    public static BasicCancelMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicCancelMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<BasicCancelBody> evt) throws AMQException
+    {
+        final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        final BasicCancelBody body = evt.getMethod();
+        channel.unsubscribeConsumer(protocolSession, body.consumerTag);
+        if(!body.nowait)
+        {
+            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+            protocolSession.writeFrame(responseFrame);
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.BasicConsumeBody;
+import org.apache.qpid.framing.BasicConsumeOkBody;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.ConsumerTagNotUniqueException;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class BasicConsumeMethodHandler implements StateAwareMethodListener<BasicConsumeBody>
+{
+    private static final Logger _log = Logger.getLogger(BasicConsumeMethodHandler.class);
+
+    private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler();
+
+    public static BasicConsumeMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicConsumeMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession session,
+                               AMQMethodEvent<BasicConsumeBody> evt) throws AMQException
+    {
+        BasicConsumeBody body = evt.getMethod();
+        final int channelId = evt.getChannelId();
+
+        AMQChannel channel = session.getChannel(channelId);
+        if (channel == null)
+        {
+            _log.error("Channel " + channelId + " not found");
+            // TODO: either alert or error that the
+        }
+        else
+        {
+            AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
+
+            if(queue == null)
+            {
+                _log.info("No queue for '" + body.queue + "'");
+            }
+            try
+            {
+                String consumerTag = channel.subscribeToQueue(body.consumerTag, queue,  session, !body.noAck);
+                if(!body.nowait)
+                {
+                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+                }
+
+                //now allow queue to start async processing of any backlog of messages
+                queue.deliverAsync();
+            }
+            catch(ConsumerTagNotUniqueException e)
+            {
+                String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg, BasicConsumeBody.CLASS_ID, BasicConsumeBody.METHOD_ID));
+            }
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,77 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class BasicPublishMethodHandler  implements StateAwareMethodListener<BasicPublishBody>
+{
+    private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler();
+
+    public static BasicPublishMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private BasicPublishMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<BasicPublishBody> evt) throws AMQException
+    {
+        final BasicPublishBody body = evt.getMethod();
+
+        // TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
+        if (body.exchange == null)
+        {
+            body.exchange = "amq.direct";
+        }
+        Exchange e = exchangeRegistry.getExchange(body.exchange);
+        // if the exchange does not exist we raise a channel exception
+        if (e == null)
+        {
+            protocolSession.closeChannel(evt.getChannelId());
+            // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
+            // then we can remove the hardcoded 0,0
+            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+            protocolSession.writeFrame(cf);
+        }
+        else
+        {
+            // The partially populated BasicDeliver frame plus the received route body
+            // is stored in the channel. Once the final body frame has been received
+            // it is routed to the exchange.
+            AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+            channel.setPublishFrame(body, protocolSession);
+        }
+    }
+}
+

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicQosHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicQosHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicQosHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,46 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+
+public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
+{
+    private static final BasicQosHandler _instance = new BasicQosHandler();
+
+    public static BasicQosHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateMgr, QueueRegistry queues, ExchangeRegistry exchanges,
+                               AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
+    {
+        session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
+        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicQosHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,54 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.framing.BasicRecoverBody;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody>
+{
+    private static final Logger _logger = Logger.getLogger(BasicRecoverMethodHandler.class);
+
+    private static final BasicRecoverMethodHandler _instance = new BasicRecoverMethodHandler();
+
+    public static BasicRecoverMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<BasicRecoverBody> evt) throws AMQException
+    {
+        _logger.debug("Recover received on protocol session " + protocolSession + " and channel " + evt.getChannelId());        
+        AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        if (channel == null)
+        {
+            throw new AMQException("Unknown channel " + evt.getChannelId());
+        }
+        channel.resend(protocolSession);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/BasicRecoverMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseHandler.class);
+
+    private static ChannelCloseHandler _instance = new ChannelCloseHandler();
+
+    public static ChannelCloseHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ChannelCloseHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+    {
+        ChannelCloseBody body = evt.getMethod();
+        _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
+                     " and method " + body.methodId);
+        protocolSession.closeChannel(evt.getChannelId());
+        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        protocolSession.writeFrame(response);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,51 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
+public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseOkHandler.class);
+
+    private static ChannelCloseOkHandler _instance = new ChannelCloseOkHandler();
+
+    public static ChannelCloseOkHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ChannelCloseOkHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+    {
+        _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelFlowHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelFlowHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,61 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelFlowBody;
+import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.ChannelCloseBody;
+import org.apache.qpid.AMQException;
+
+public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
+{
+    private static final Logger _logger = Logger.getLogger(ChannelFlowHandler.class);
+
+    private static ChannelFlowHandler _instance = new ChannelFlowHandler();
+
+    public static ChannelFlowHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ChannelFlowHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+    {
+        ChannelFlowBody body = evt.getMethod();
+
+        AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+        channel.setSuspended(!body.active);
+        _logger.info("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
+
+        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+        protocolSession.writeFrame(response);
+    }}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelFlowHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelOpenHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelOpenHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,58 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
+{
+    private static ChannelOpenHandler _instance = new ChannelOpenHandler();
+
+    public static ChannelOpenHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ChannelOpenHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+    {        
+        IApplicationRegistry registry = ApplicationRegistry.getInstance();
+        final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
+                                                  exchangeRegistry);
+        protocolSession.addChannel(channel);
+        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+        protocolSession.writeFrame(response);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ChannelOpenHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=auto&rev=447994
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Tue Sep 19 15:06:50 2006
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+
+public class ConnectionCloseMethodHandler implements  StateAwareMethodListener<ConnectionCloseBody>
+{
+    private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+    private static ConnectionCloseMethodHandler _instance = new ConnectionCloseMethodHandler();
+
+    public static ConnectionCloseMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private ConnectionCloseMethodHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueRegistry queueRegistry,
+                               ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
+                               AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+    {
+        final ConnectionCloseBody body = evt.getMethod();
+        _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
+                     body.replyText +  " for " + protocolSession);
+        try
+        {
+            protocolSession.closeSession();
+        }
+        catch (Exception e)
+        {
+            _logger.error("Error closing protocol session: " + e, e);
+        }
+        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+        protocolSession.writeFrame(response);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/broker/src/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message