qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ai...@apache.org
Subject svn commit: r619823 [7/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Common...
Date Fri, 08 Feb 2008 10:10:11 GMT
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Fri Feb  8 02:09:37 2008
@@ -23,6 +23,8 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxRollbackBody;
 import org.apache.qpid.framing.TxRollbackOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -42,24 +44,26 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxRollbackBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxRollbackBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
         try
         {
-            AMQChannel channel = session.getChannel(evt.getChannelId());
+            AMQChannel channel = session.getChannel(channelId);
 
             if (channel == null)
             {
-                throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+                throw body.getChannelNotFoundException(channelId);
             }
 
             channel.rollback();
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            session.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+
+            MethodRegistry methodRegistry = session.getMethodRegistry();
+            AMQMethodBody responseBody = methodRegistry.createTxRollbackOkBody();
+            session.writeFrame(responseBody.generateFrame(channelId));
+
+            
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             // Why, are we not allowed to send messages back to client before the ok method?
@@ -67,7 +71,7 @@
         }
         catch (AMQException e)
         {
-            throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
+            throw body.getChannelException(e.getErrorCode(), "Failed to rollback: " + e.getMessage());
         }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Fri Feb  8 02:09:37 2008
@@ -23,6 +23,7 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.TxSelectBody;
 import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.state.AMQStateManager;
@@ -42,22 +43,21 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<TxSelectBody> evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, TxSelectBody body, int channelId) throws AMQException
     {
         AMQProtocolSession session = stateManager.getProtocolSession();
 
-        AMQChannel channel = session.getChannel(evt.getChannelId());
+        AMQChannel channel = session.getChannel(channelId);
 
         if (channel == null)
         {
-            throw evt.getMethod().getChannelNotFoundException(evt.getChannelId());
+            throw body.getChannelNotFoundException(channelId);
         }
 
         channel.setLocalTransactional();
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        session.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0));
+        MethodRegistry methodRegistry = session.getMethodRegistry();
+        TxSelectOkBody responseBody = methodRegistry.createTxSelectOkBody();
+        session.writeFrame(responseBody.generateFrame(channelId));
     }
 }

Added: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java?rev=619823&view=auto
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java (added)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/UnexpectedMethodException.java Fri Feb  8 02:09:37 2008
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.handler;
+
+
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.AMQException;
+
+public class UnexpectedMethodException extends AMQException
+{
+    public UnexpectedMethodException(AMQMethodBody body)
+    {
+        super(null, "Unexpected method recevied: " + body.getClass().getName(), null);
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/JDBCStore.java Fri Feb  8 02:09:37 2008
@@ -1449,7 +1449,7 @@
                     foundMessage.staged();
                     foundMessage.enqueue(foundQueue);
                     foundQueue.enqueue(foundMessage);
-                    foundQueue.process(context, (AMQMessage) foundMessage, false);
+                    // FIXME: TGM AS foundQueue.process(context, (AMQMessage) foundMessage, false);
                 }
                 // add the queue in the result map
                 result.put(foundQueue.getQueueID(), foundQueue);
@@ -1499,7 +1499,7 @@
             while (rs.next())
             {
                 foundExchange = _virtualHost.getExchangeFactory().createExchange(
-                        new AMQShortString(rs.getString(1)), new AMQShortString(rs.getString(2)), true, false, 0);
+                        new AMQShortString(rs.getString(1)), new AMQShortString(rs.getString(2)), true, false);
                 // get all the bindings
                 Statement stmtb = connection.getConnection().createStatement();
                 ResultSet rsb = stmtb.executeQuery("SELECT * FROM " + _tableNameExchangeQueueRelation +

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java Fri Feb  8 02:09:37 2008
@@ -27,8 +27,8 @@
 package org.apache.qpid.server.output;
 
 import org.apache.qpid.server.output.ProtocolOutputConverter.Factory;
-import org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.framing.ProtocolVersion;
 
 import java.util.Map;
 import java.util.HashMap;
@@ -36,27 +36,26 @@
 public class ProtocolOutputConverterRegistry
 {
 
-    private static final Map<Byte, Map<Byte, Factory>> _registry =
-            new HashMap<Byte, Map<Byte, Factory>>();
+    private static final Map<ProtocolVersion, Factory> _registry =
+            new HashMap<ProtocolVersion, Factory>();
 
 
     static
     {
-        register((byte) 8, (byte) 0, ProtocolOutputConverterImpl.getInstanceFactory());
+        register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
+        register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
+
     }
 
-    private static void register(byte major, byte minor, Factory converter)
+    private static void register(ProtocolVersion version, Factory converter)
     {
-        if(!_registry.containsKey(major))
-        {
-            _registry.put(major, new HashMap<Byte, Factory>());
-        }
-        _registry.get(major).put(minor, converter);
+
+        _registry.put(version,converter);
     }
 
 
     public static ProtocolOutputConverter getConverter(AMQProtocolSession session)
     {
-        return _registry.get(session.getProtocolMajorVersion()).get(session.getProtocolMinorVersion()).newInstance(session);
+        return _registry.get(session.getProtocolVersion()).newInstance(session);
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java Fri Feb  8 02:09:37 2008
@@ -43,6 +43,7 @@
 public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
 {
 
+
     public static Factory getInstanceFactory()
     {
         return new Factory()
@@ -98,7 +99,7 @@
             //
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
             writeFrame(compositeBlock);
@@ -109,7 +110,7 @@
             for(int i = 1; i < bodyCount; i++)
             {
                 cb = messageHandle.getContentChunk(storeContext,messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
@@ -149,7 +150,7 @@
             //
             ContentChunk cb = messageHandle.getContentChunk(storeContext,messageId, 0);
 
-            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+            AMQDataBlock firstContentBody = new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
             AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
             CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
             writeFrame(compositeBlock);
@@ -160,7 +161,7 @@
             for(int i = 1; i < bodyCount; i++)
             {
                 cb = messageHandle.getContentChunk(storeContext, messageId, i);
-                writeFrame(new AMQFrame(channelId, getProtocolSession().getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+                writeFrame(new AMQFrame(channelId, getProtocolSession().getMethodRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
             }
 
 
@@ -176,11 +177,14 @@
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, getProtocolMajorVersion(),
-                                                                getProtocolMinorVersion(),
-                                                                consumerTag,
-                                                                deliveryTag, pb.getExchange(), messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicDeliverBody deliverBody =
+                methodRegistry.createBasicDeliverBody(consumerTag,
+                                                      deliveryTag,
+                                                      messageHandle.isRedelivered(),
+                                                      pb.getExchange(),
+                                                      pb.getRoutingKey());
+        AMQFrame deliverFrame = deliverBody.generateFrame(channelId);
 
 
         return deliverFrame.toByteBuffer();
@@ -192,13 +196,14 @@
         final MessagePublishInfo pb = message.getMessagePublishInfo();
         final AMQMessageHandle messageHandle = message.getMessageHandle();
 
-        AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-                                                            getProtocolMajorVersion(),
-                                                            getProtocolMinorVersion(),
-                                                                deliveryTag, pb.getExchange(),
-                                                                queueSize,
-                                                                messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicGetOkBody getOkBody =
+                methodRegistry.createBasicGetOkBody(deliveryTag,
+                                                    messageHandle.isRedelivered(),
+                                                    pb.getExchange(),
+                                                    pb.getRoutingKey(),
+                                                    queueSize);
+        AMQFrame getOkFrame = getOkBody.generateFrame(channelId);
 
         return getOkFrame.toByteBuffer();
     }
@@ -215,12 +220,13 @@
 
     private ByteBuffer createEncodedReturnFrame(AMQMessage message, int channelId, int replyCode, AMQShortString replyText) throws AMQException
     {
-        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-                                                              getProtocolMajorVersion(),
-                                                              getProtocolMinorVersion(),
-                                                              message.getMessagePublishInfo().getExchange(),
-                                                              replyCode, replyText,
-                                                              message.getMessagePublishInfo().getRoutingKey());
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicReturnBody basicReturnBody =
+                methodRegistry.createBasicReturnBody(replyCode,
+                                                     replyText,
+                                                     message.getMessagePublishInfo().getExchange(),
+                                                     message.getMessagePublishInfo().getRoutingKey());
+        AMQFrame returnFrame = basicReturnBody.generateFrame(channelId);
 
         return returnFrame.toByteBuffer();
     }
@@ -272,11 +278,9 @@
 
     public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
     {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+        BasicCancelOkBody basicCancelOkBody = methodRegistry.createBasicCancelOkBody(consumerTag);
+        writeFrame(basicCancelOkBody.generateFrame(channelId));
 
-        writeFrame(BasicCancelOkBody.createAMQFrame(channelId,
-                   getProtocolMajorVersion(),
-                   getProtocolMinorVersion(),
-                   consumerTag    // consumerTag
-        ));
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java?rev=619823&r1=616809&r2=619823&view=diff
==============================================================================
    (empty)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/Activator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/Activator.java?rev=619823&r1=616809&r2=619823&view=diff
==============================================================================
    (empty)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java?rev=619823&r1=616809&r2=619823&view=diff
==============================================================================
    (empty)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Fri Feb  8 02:09:37 2008
@@ -39,6 +39,7 @@
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.handler.ServerMethodDispatcherImpl;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -107,10 +108,11 @@
 
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
-    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
+
     private List<Integer> _closingChannelsList = new ArrayList<Integer>();
     private ProtocolOutputConverter _protocolOutputConverter;
     private Principal _authorizedID;
+    private MethodDispatcher _dispatcher;
 
     public ManagedObject getManagedObject()
     {
@@ -235,24 +237,24 @@
         ((AMQDecoder) _codecFactory.getDecoder()).setExpectProtocolInitiation(false);
         try
         {
-            pi.checkVersion(); // Fails if not correct
+            ProtocolVersion pv = pi.checkVersion(); // Fails if not correct
 
             // This sets the protocol version (and hence framing classes) for this session.
-            setProtocolVersion(pi._protocolMajor, pi._protocolMinor);
+            setProtocolVersion(pv);
 
             String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
 
             String locales = "en_US";
 
-            // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
-            AMQFrame response =
-                ConnectionStartBody.createAMQFrame((short) 0, getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
-                    locales.getBytes(), // locales
-                    mechanisms.getBytes(), // mechanisms
-                    null, // serverProperties
-                    (short) getProtocolMajorVersion(), // versionMajor
-                    (short) getProtocolMinorVersion()); // versionMinor
-            _minaProtocolSession.write(response);
+
+            AMQMethodBody responseBody = getMethodRegistry().createConnectionStartBody((short) getProtocolMajorVersion(),
+                                                                                       (short) getProtocolMinorVersion(),
+                                                                                       null,
+                                                                                       mechanisms.getBytes(),
+                                                                                       locales.getBytes());
+            _minaProtocolSession.write(responseBody.generateFrame(0));
+
+
         }
         catch (AMQException e)
         {
@@ -548,6 +550,7 @@
 
     public void closeChannelOk(int channelId)
     {
+        removeChannel(channelId);
         _closingChannelsList.remove(new Integer(channelId));
     }
 
@@ -695,13 +698,12 @@
         }
     }
 
-    private void setProtocolVersion(byte major, byte minor)
+    private void setProtocolVersion(ProtocolVersion pv)
     {
-        _protocolVersion = new ProtocolVersion(major, minor);
-
-        _registry = MainRegistry.getVersionSpecificRegistry(_protocolVersion);
+        _protocolVersion = pv;
 
         _protocolOutputConverter = ProtocolOutputConverterRegistry.getConverter(this);
+        _dispatcher = ServerMethodDispatcherImpl.createMethodDispatcher(_stateManager, _protocolVersion);
     }
 
     public byte getProtocolMajorVersion()
@@ -709,6 +711,11 @@
         return _protocolVersion.getMajorVersion();
     }
 
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
     public byte getProtocolMinorVersion()
     {
         return _protocolVersion.getMinorVersion();
@@ -719,9 +726,9 @@
         return (getProtocolMajorVersion() == major) && (getProtocolMinorVersion() == minor);
     }
 
-    public VersionSpecificRegistry getRegistry()
+    public MethodRegistry getRegistry()
     {
-        return _registry;
+        return getMethodRegistry();
     }
 
     public Object getClientIdentifier()
@@ -764,6 +771,16 @@
     public Principal getAuthorizedID()
     {
         return _authorizedID;
+    }
+
+    public MethodRegistry getMethodRegistry()
+    {
+        return MethodRegistry.getMethodRegistry(getProtocolVersion());
+    }
+
+    public MethodDispatcher getMethodDispatcher()
+    {
+        return _dispatcher;
     }
 
     public String getClientVersion()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Fri Feb  8 02:09:37 2008
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -25,20 +25,19 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
 import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.SessionUtil;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQProtocolHeaderException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.HeartbeatBody;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.transport.ConnectorConfiguration;
@@ -53,7 +52,6 @@
  *
  * We delegate all frame (message) processing to the AMQProtocolSession which wraps
  * the state for the connection.
- *
  */
 public class AMQPFastProtocolHandler extends IoHandlerAdapter
 {
@@ -117,11 +115,41 @@
             }
 
         }
+
+        if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false))
+        {
+            try
+            {
+//        //Add IO Protection Filters
+                IoFilterChain chain = protocolSession.getFilterChain();
+
+                int buf_size = 32768;
+                if (protocolSession.getConfig() instanceof SocketSessionConfig)
+                {
+                    buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize();
+                }
+
+                protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+                readfilter.setMaximumConnectionBufferSize(buf_size);
+                readfilter.attach(chain);
+
+                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+                writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+                writefilter.attach(chain);
+
+                protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+                _logger.info("Using IO Read/Write Filter Protection");
+            }
+            catch (Exception e)
+            {
+                _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+            }
+        }
     }
 
-    /**
-     * Separated into its own, protected, method to allow easier reuse
-     */
+    /** Separated into its own, protected, method to allow easier reuse */
     protected void createSession(IoSession session, IApplicationRegistry applicationRegistry, AMQCodecFactory codec) throws AMQException
     {
         new AMQMinaProtocolSession(session, applicationRegistry.getVirtualHostRegistry(), codec);
@@ -139,7 +167,14 @@
         //fixme  -- this can be null
         if (amqProtocolSession != null)
         {
-            amqProtocolSession.closeSession();
+            try
+            {
+                amqProtocolSession.closeSession();
+            }
+            catch (AMQException e)
+            {
+                _logger.error("Caught AMQException whilst closingSession:" + e);
+            }
         }
     }
 
@@ -173,21 +208,18 @@
         }
         else if (throwable instanceof IOException)
         {
-            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable);
         }
         else
         {
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
 
-            // Be aware of possible changes to parameter order as versions change.
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
-                                                                     session.getProtocolMajorVersion(),
-                                                                     session.getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                     0,    // classId
-                                                                     0,    // methodId
-                                                                     200,    // replyCode
-                                                                     new AMQShortString(throwable.getMessage())    // replyText
-            ));
+
+            MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(session.getProtocolVersion());
+            ConnectionCloseBody closeBody = methodRegistry.createConnectionCloseBody(200,new AMQShortString(throwable.getMessage()),0,0);
+                        
+            protocolSession.write(closeBody.generateFrame(0));
+
             protocolSession.close();
         }
     }
@@ -195,8 +227,10 @@
     /**
      * Invoked when a message is received on a particular protocol session. Note that a
      * protocol session is directly tied to a particular physical connection.
+     *
      * @param protocolSession the protocol session that received the message
-     * @param message the message itself (i.e. a decoded frame)
+     * @param message         the message itself (i.e. a decoded frame)
+     *
      * @throws Exception if the message cannot be processed
      */
     public void messageReceived(IoSession protocolSession, Object message) throws Exception
@@ -206,7 +240,7 @@
         if (message instanceof AMQDataBlock)
         {
             amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
-                        
+
         }
         else if (message instanceof ByteBuffer)
         {
@@ -221,9 +255,11 @@
 
     /**
      * Called after a message has been sent out on a particular protocol session
+     *
      * @param protocolSession the protocol session (i.e. connection) on which this
-     * message was sent
-     * @param object the message (frame) that was encoded and sent
+     *                        message was sent
+     * @param object          the message (frame) that was encoded and sent
+     *
      * @throws Exception if we want to indicate an error
      */
     public void messageSent(IoSession protocolSession, Object object) throws Exception

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Feb  8 02:09:37 2008
@@ -23,9 +23,7 @@
 import javax.security.sasl.SaslServer;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.output.ProtocolOutputConverter;
@@ -37,6 +35,8 @@
 public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
 {
 
+
+
     public static interface Task
     {
         public void doTask(AMQProtocolSession session) throws AMQException;
@@ -172,4 +172,8 @@
     /** @return a Principal that was used to authorized this session */
     Principal getAuthorizedID();
 
+    public MethodRegistry getMethodRegistry();
+
+    public MethodDispatcher getMethodDispatcher();
+    
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Fri Feb  8 02:09:37 2008
@@ -61,6 +61,7 @@
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.management.AMQManagedObject;
@@ -261,17 +262,17 @@
      */
     public void closeConnection() throws JMException
     {
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame response =
-            ConnectionCloseBody.createAMQFrame(0, _session.getProtocolMajorVersion(), _session.getProtocolMinorVersion(), // AMQP version (major, minor)
-                0, // classId
-                0, // methodId
-                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION // replyText
-            );
-        _session.writeFrame(response);
+
+        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        ConnectionCloseBody responseBody =
+                methodRegistry.createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
+                                                         // replyCode
+                                                         BROKER_MANAGEMENT_CONSOLE_HAS_CLOSED_THE_CONNECTION,
+                                                         // replyText,
+                                                         0,
+                                                         0);
+
+        _session.writeFrame(responseBody.generateFrame(0));
 
         try
         {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java Fri Feb  8 02:09:37 2008
@@ -23,18 +23,20 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.ExchangeType;
 
 public class ExchangeInitialiser
 {
     public void initialise(ExchangeFactory factory, ExchangeRegistry registry) throws AMQException{
+        for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
+        {
+            define (registry, factory, type.getDefaultExchangeName(), type.getName());
+        }
+        
         define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
-        define(registry, factory, ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
-        define(registry, factory, ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
-        define(registry, factory, ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
-        define(registry, factory, ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
-
         registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME));
     }
 
@@ -43,7 +45,7 @@
     {
         if(r.getExchange(name)== null)
         {
-            r.registerExchange(f.createExchange(name, type, true, false, 0));
+            r.registerExchange(f.createExchange(name, type, true, false));
         }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Fri Feb  8 02:09:37 2008
@@ -96,22 +96,13 @@
     /** Flag to indicate that this message requires 'immediate' delivery. */
     private boolean _immediate;
 
-    // private Subscription _takenBySubcription;
-    // private AtomicBoolean _taken = new AtomicBoolean(false);
     private TransientMessageData _transientMessageData = new TransientMessageData();
 
-    //todo: this should be part of a messageOnQueue object
-    private Set<Subscription> _rejectedBy = null;
+    private long _expiration;
 
-    //todo: this should be part of a messageOnQueue object
-    private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue, AtomicBoolean>();
-    //todo: this should be part of a messageOnQueue object
-    private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new HashMap<AMQQueue, Subscription>();
 
     private final int hashcode = System.identityHashCode(this);
 
-    //todo: this should be part of a messageOnQueue object
-    private long _expiration;
 
     public String debugIdentity()
     {
@@ -149,6 +140,11 @@
 
     }
 
+    public boolean isReferenced()
+    {
+        return _referenceCount.get() > 0;
+    }
+
     /**
      * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory
      * therefore is memory-efficient.
@@ -201,7 +197,7 @@
 
         private ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
         {
-            return _protocolSession.getRegistry().getProtocolVersionMethodConverter();
+            return _protocolSession.getMethodRegistry().getProtocolVersionMethodConverter();
         }
 
         public void remove()
@@ -296,7 +292,7 @@
         setContentHeaderBody(contentHeader);
     }
 
-    /**
+    /* *
      * Used in testing only. This allows the passing of the content header and some body fragments on construction.
      *
      * @param messageId
@@ -307,7 +303,7 @@
      * @param contentBodies
      *
      * @throws AMQException
-     */
+     */        /*
     public AMQMessage(Long messageId, MessagePublishInfo info, TransactionalContext txnContext,
                       ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues, List<ContentChunk> contentBodies,
                       MessageStore messageStore, StoreContext storeContext, MessageHandleFactory messageHandleFactory) throws AMQException
@@ -320,7 +316,7 @@
             addContentBodyFrame(storeContext, cb);
         }
     }
-
+                 */
     protected AMQMessage(AMQMessage msg) throws AMQException
     {
         _messageId = msg._messageId;
@@ -595,84 +591,6 @@
         return _deliveredToConsumer;
     }
 
-    public boolean isTaken(AMQQueue queue)
-    {
-        // return _taken.get();
-
-        synchronized (this)
-        {
-            AtomicBoolean taken = _takenMap.get(queue);
-            if (taken == null)
-            {
-                taken = new AtomicBoolean(false);
-                _takenMap.put(queue, taken);
-            }
-
-            return taken.get();
-        }
-    }
-
-    public boolean taken(AMQQueue queue, Subscription sub)
-    {
-        // if (_taken.getAndSet(true))
-        // {
-        // return true;
-        // }
-        // else
-        // {
-        // _takenBySubcription = sub;
-        // return false;
-        // }
-
-        synchronized (this)
-        {
-            AtomicBoolean taken = _takenMap.get(queue);
-            if (taken == null)
-            {
-                taken = new AtomicBoolean(false);
-            }
-
-            if (taken.getAndSet(true))
-            {
-                return true;
-            }
-            else
-            {
-                _takenMap.put(queue, taken);
-                _takenBySubcriptionMap.put(queue, sub);
-
-                return false;
-            }
-        }
-    }
-
-    public void release(AMQQueue queue)
-    {
-        if (_log.isTraceEnabled())
-        {
-            _log.trace("Releasing Message:" + debugIdentity());
-        }
-
-        // _taken.set(false);
-        // _takenBySubcription = null;
-
-        synchronized (this)
-        {
-            AtomicBoolean taken = _takenMap.get(queue);
-            if (taken == null)
-            {
-                taken = new AtomicBoolean(false);
-            }
-            else
-            {
-                taken.set(false);
-            }
-
-            _takenMap.put(queue, taken);
-            _takenBySubcriptionMap.put(queue, null);
-        }
-    }
-
     public boolean checkToken(Object token)
     {
 
@@ -780,7 +698,6 @@
      */
     public boolean expired(AMQQueue queue) throws AMQException
     {
-        // note: If the storecontext isn't need then we can remove the getChannel() from Subscription.
 
         if (_expiration != 0L)
         {
@@ -826,7 +743,7 @@
                 // Increment the references to this message for each queue delivery.
                 incrementReference();
                 // normal deliver so add this message at the end.
-                _txnContext.deliver(this, q, false);
+                _txnContext.deliver(q.createEntry(this), false);
             }
         }
         finally
@@ -837,175 +754,6 @@
         }
     }
 
-    /*
-        public void writeDeliver(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-                throws AMQException
-        {
-            ByteBuffer deliver = createEncodedDeliverFrame(protocolSession, channelId, deliveryTag, consumerTag);
-            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                          getContentHeaderBody());
-
-            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-            if (bodyCount == 0)
-            {
-                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                           contentHeader);
-
-                protocolSession.writeFrame(compositeBlock);
-            }
-            else
-            {
-
-                //
-                // Optimise the case where we have a single content body. In that case we create a composite block
-                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-                //
-                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
-                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-                protocolSession.writeFrame(compositeBlock);
-
-                //
-                // Now start writing out the other content bodies
-                //
-                for (int i = 1; i < bodyCount; i++)
-                {
-                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-                }
-
-
-            }
-
-
-        }
-
-        public void writeGetOk(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize) throws AMQException
-        {
-            ByteBuffer deliver = createEncodedGetOkFrame(protocolSession, channelId, deliveryTag, queueSize);
-            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                          getContentHeaderBody());
-
-            final int bodyCount = _messageHandle.getBodyCount(getStoreContext(), _messageId);
-            if (bodyCount == 0)
-            {
-                SmallCompositeAMQDataBlock compositeBlock = new SmallCompositeAMQDataBlock(deliver,
-                                                                                           contentHeader);
-                protocolSession.writeFrame(compositeBlock);
-            }
-            else
-            {
-
-                //
-                // Optimise the case where we have a single content body. In that case we create a composite block
-                // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-                //
-                ContentChunk cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
-                AMQDataBlock firstContentBody = new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
-                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(deliver, headerAndFirstContent);
-                protocolSession.writeFrame(compositeBlock);
-
-                //
-                // Now start writing out the other content bodies
-                //
-                for (int i = 1; i < bodyCount; i++)
-                {
-                    cb = _messageHandle.getContentChunk(getStoreContext(), _messageId, i);
-                    protocolSession.writeFrame(new AMQFrame(channelId, protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
-                }
-
-
-            }
-
-
-        }
-
-
-        private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
-                throws AMQException
-        {
-            MessagePublishInfo pb = getMessagePublishInfo();
-            AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
-                                                                    deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
-                                                                    pb.getRoutingKey());
-            ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
-            deliverFrame.writePayload(buf);
-            buf.flip();
-            return buf;
-        }
-
-        private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession protocolSession, int channelId, long deliveryTag, int queueSize)
-                throws AMQException
-        {
-            MessagePublishInfo pb = getMessagePublishInfo();
-            AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-                                                                protocolSession.getProtocolMajorVersion(),
-                                                                protocolSession.getProtocolMinorVersion(),
-                                                                deliveryTag, pb.getExchange(),
-                                                                queueSize,
-                                                                _messageHandle.isRedelivered(),
-                                                                pb.getRoutingKey());
-            ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); // XXX: Could cast be a problem?
-            getOkFrame.writePayload(buf);
-            buf.flip();
-            return buf;
-        }
-
-        private ByteBuffer createEncodedReturnFrame(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText) throws AMQException
-        {
-            AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-                                                                  protocolSession.getProtocolMajorVersion(),
-                                                                  protocolSession.getProtocolMinorVersion(),
-                                                                  getMessagePublishInfo().getExchange(),
-                                                                  replyCode, replyText,
-                                                                  getMessagePublishInfo().getRoutingKey());
-            ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
-            returnFrame.writePayload(buf);
-            buf.flip();
-            return buf;
-        }
-
-        public void writeReturn(AMQProtocolSession protocolSession, int channelId, int replyCode, AMQShortString replyText)
-                throws AMQException
-        {
-            ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession, channelId, replyCode, replyText);
-
-            AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
-                                                                          getContentHeaderBody());
-
-            Iterator<AMQDataBlock> bodyFrameIterator = getBodyFrameIterator(protocolSession, channelId);
-            //
-            // Optimise the case where we have a single content body. In that case we create a composite block
-            // so that we can writeDeliver out the deliver, header and body with a single network writeDeliver.
-            //
-            if (bodyFrameIterator.hasNext())
-            {
-                AMQDataBlock firstContentBody = bodyFrameIterator.next();
-                AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody};
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
-                protocolSession.writeFrame(compositeBlock);
-            }
-            else
-            {
-                CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame,
-                                                                                 new AMQDataBlock[]{contentHeader});
-                protocolSession.writeFrame(compositeBlock);
-            }
-
-            //
-            // Now start writing out the other content bodies
-            // TODO: MINA needs to be fixed so the the pending writes buffer is not unbounded
-            //
-            while (bodyFrameIterator.hasNext())
-            {
-                protocolSession.writeFrame(bodyFrameIterator.next());
-            }
-        }
-     */
 
     public AMQMessageHandle getMessageHandle()
     {
@@ -1048,49 +796,7 @@
         // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken : " +
         // _taken + " by :" + _takenBySubcription;
 
-        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount + "; taken for queues: "
-               + _takenMap.toString() + " by Subs:" + _takenBySubcriptionMap.toString();
-    }
-
-    public Subscription getDeliveredSubscription(AMQQueue queue)
-    {
-        // return _takenBySubcription;
-        synchronized (this)
-        {
-            return _takenBySubcriptionMap.get(queue);
-        }
-    }
-
-    public void reject(Subscription subscription)
-    {
-
-        if (subscription != null)
-        {
-            if (_rejectedBy == null)
-            {
-                _rejectedBy = new HashSet<Subscription>();
-            }
-
-            _rejectedBy.add(subscription);
-        }
-        else
-        {
-            _log.warn("Requesting rejection by null subscriber:" + debugIdentity());
-        }
-    }
-
-    public boolean isRejectedBy(Subscription subscription)
-    {
-        boolean rejected = _rejectedBy != null;
-
-        if (rejected) // We have subscriptions that rejected this message
-        {
-            return _rejectedBy.contains(subscription);
-        }
-        else // This messasge hasn't been rejected yet.
-        {
-            return rejected;
-        }
+        return "Message[" + debugIdentity() + "]: " + _messageId + "; ref count: " + _referenceCount;
     }
 
     public String getBodyAsString()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb  8 02:09:37 2008
@@ -26,6 +26,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.Managable;
 import org.apache.qpid.server.management.ManagedObject;
@@ -41,6 +42,8 @@
 import java.util.Collection;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -53,49 +56,6 @@
  */
 public class AMQQueue implements Managable, Comparable, StorableQueue
 {
-    //FROM M2 - think these have been replaced by *Exception in the broker exception package
-//    /**
-//     * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
-//     * already exists.
-//     *
-//     * <p/><table id="crc"><caption>CRC Card</caption>
-//     * <tr><th> Responsibilities <th> Collaborations
-//     * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
-//     * </table>
-//     *
-//     * @todo Not an AMQP exception as no status code.
-//     *
-//     * @todo Move to top level, used outside this class.
-//     */
-//    public static final class ExistingExclusiveSubscription extends AMQException
-//    {
-//
-//        public ExistingExclusiveSubscription()
-//        {
-//            super("");
-//        }
-//    }
-//
-//    /**
-//     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
-//     * already exists.
-//     *
-//     * <p/><table id="crc"><caption>CRC Card</caption>
-//     * <tr><th> Responsibilities <th> Collaborations
-//     * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
-//     * </table>
-//     *
-//     * @todo Not an AMQP exception as no status code.
-//     *
-//     * @todo Move to top level, used outside this class.
-//     */
-//    public static final class ExistingSubscriptionPreventsExclusive extends AMQException
-//    {
-//        public ExistingSubscriptionPreventsExclusive()
-//        {
-//            super("");
-    //        }
-    //    }
     public static int s_queueID = 0;
 
     private static final Logger _logger = Logger.getLogger(AMQQueue.class);
@@ -188,10 +148,7 @@
      */
     public AtomicLong _totalMessagesReceived = new AtomicLong();
 
-    public int compareTo(Object o)
-    {
-        return _name.compareTo(((AMQQueue) o).getName());
-    }
+
 
     public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
             throws AMQException
@@ -274,6 +231,11 @@
         return _autoDelete;
     }
 
+    public boolean isDeleted()
+    {
+        return _deleted.get();
+    }
+
     /**
      * @return no of messages(undelivered) on the queue.
      */
@@ -285,7 +247,7 @@
     /**
      * @return List of messages(undelivered) on the queue.
      */
-    public List<AMQMessage> getMessagesOnTheQueue()
+    public List<QueueEntry> getMessagesOnTheQueue()
     {
         return _deliveryMgr.getMessages();
     }
@@ -297,7 +259,7 @@
      * @param toMessageId
      * @return List of messages
      */
-    public List<AMQMessage> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
+    public List<QueueEntry> getMessagesOnTheQueue(long fromMessageId, long toMessageId)
     {
         return _deliveryMgr.getMessages(fromMessageId, toMessageId);
     }
@@ -309,11 +271,11 @@
 
     /**
      * @param messageId
-     * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
+     * @return QueueEntry with give id if exists. null if QueueEntry with given id doesn't exist.
      */
-    public AMQMessage getMessageOnTheQueue(long messageId)
+    public QueueEntry getMessageOnTheQueue(long messageId)
     {
-        List<AMQMessage> list = getMessagesOnTheQueue(messageId, messageId);
+        List<QueueEntry> list = getMessagesOnTheQueue(messageId, messageId);
         if ((list == null) || (list.size() == 0))
         {
             return null;
@@ -355,15 +317,16 @@
             toQueue.startMovingMessages();
 
             // Get the list of messages to move.
-            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             try
             {
                 fromStore.beginTran(storeContext);
 
                 // Move the messages in on the message store.
-                for (AMQMessage message : foundMessagesList)
+                for (QueueEntry entry : foundMessagesList)
                 {
+                    AMQMessage message = entry.getMessage();
                     fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
                     toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
                 }
@@ -433,15 +396,16 @@
             toQueue.startMovingMessages();
 
             // Get the list of messages to move.
-            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             try
             {
                 fromStore.beginTran(storeContext);
 
                 // Move the messages in on the message store.
-                for (AMQMessage message : foundMessagesList)
+                for (QueueEntry entry : foundMessagesList)
                 {
+                    AMQMessage message = entry.getMessage();
                     toStore.enqueueMessage(storeContext, toQueue._name, message.getMessageId());
                     message.takeReference();
                 }
@@ -499,15 +463,16 @@
             startMovingMessages();
 
             // Get the list of messages to move.
-            List<AMQMessage> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
+            List<QueueEntry> foundMessagesList = getMessagesOnTheQueue(fromMessageId, toMessageId);
 
             try
             {
                 fromStore.beginTran(storeContext);
 
                 // remove the messages in on the message store.
-                for (AMQMessage message : foundMessagesList)
+                for (QueueEntry entry : foundMessagesList)
                 {
+                    AMQMessage message = entry.getMessage();
                     fromStore.dequeueMessage(storeContext, _name, message.getMessageId());
                 }
 
@@ -549,7 +514,7 @@
         _deliveryMgr.startMovingMessages();
     }
 
-    private void enqueueMovedMessages(StoreContext storeContext, List<AMQMessage> messageList)
+    private void enqueueMovedMessages(StoreContext storeContext, List<QueueEntry> messageList)
     {
         _deliveryMgr.enqueueMovedMessages(storeContext, messageList);
         _totalMessagesReceived.addAndGet(messageList.size());
@@ -621,9 +586,7 @@
 
     }
 
-    /**
-     * Removes the AMQMessage from the top of the queue.
-     */
+    /** Removes the QueueEntry from the top of the queue. */
     public synchronized void deleteMessageFromTop(StoreContext storeContext) throws AMQException
     {
         _deliveryMgr.removeAMessageFromTop(storeContext);
@@ -861,27 +824,28 @@
     // return _deliveryMgr;
     // }
 
-    public void process(StoreContext storeContext, AMQMessage msg, boolean deliverFirst) throws AMQException
+    public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException
     {
-        _deliveryMgr.deliver(storeContext, getName(), msg, deliverFirst);
+        AMQMessage msg = entry.getMessage();
+        _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst);
         try
         {
             msg.checkDeliveredToConsumer();
-            updateReceivedMessageCount(msg);
+            updateReceivedMessageCount(entry);
         }
         catch (NoConsumersException e)
         {
             // as this message will be returned, it should be removed
             // from the queue:
-            dequeue(storeContext, msg);
+            dequeue(storeContext, entry);
         }
     }
 
-    void dequeue(StoreContext storeContext, AMQMessage msg) throws FailedDequeueException
+    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
     {
         try
         {
-            msg.dequeue(storeContext, this);
+            entry.getMessage().dequeue(storeContext, this);
         }
         catch (MessageCleanupException e)
         {
@@ -907,8 +871,10 @@
         return _subscribers;
     }
 
-    protected void updateReceivedMessageCount(AMQMessage msg) throws AMQException
+    protected void updateReceivedMessageCount(QueueEntry entry) throws AMQException
     {
+        AMQMessage msg = entry.getMessage();
+
         if (!msg.isRedelivered())
         {
             _totalMessagesReceived.incrementAndGet();
@@ -996,11 +962,15 @@
         _maximumMessageAge = maximumMessageAge;
     }
 
-    public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, AMQMessage msg)
+    public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry)
     {
-        _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, msg);
+        _deliveryMgr.subscriberHasPendingResend(hasContent, subscription, entry);
     }
 
+    /*
+     * TGM: wow, this is not going to have a high fun quotient
+     */
+
     // ========================================================================
     // Interface StorableQueue
     // ========================================================================
@@ -1048,5 +1018,27 @@
     public StorableMessage getEnqueuedMessage(long messageId)
     {
         return _messages.get(messageId);
+    }
+
+    public QueueEntry createEntry(AMQMessage amqMessage)
+    {
+        return new QueueEntry(this, amqMessage);
+    }
+
+    public int compareTo(Object o)
+    {
+        return _name.compareTo(((AMQQueue) o).getName());
+    }
+
+
+    public void removeExpiredIfNoSubscribers() throws AMQException
+    {
+        synchronized(_subscribers.getChangeLock())
+        {
+            if(_subscribers.isEmpty())
+            {
+                _deliveryMgr.removeExpired();
+            }
+        }
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Fri Feb  8 02:09:37 2008
@@ -321,11 +321,14 @@
      */
     public CompositeData viewMessageContent(long msgId) throws JMException
     {
-        AMQMessage msg = _queue.getMessageOnTheQueue(msgId);
-        if (msg == null)
+        QueueEntry entry = _queue.getMessageOnTheQueue(msgId);
+
+        if (entry == null)
         {
             throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
         }
+
+        AMQMessage msg = entry.getMessage();
         // get message content
         Iterator<ContentChunk> cBodies = msg.getContentBodyIterator();
         List<Byte> msgContent = new ArrayList<Byte>();
@@ -381,7 +384,7 @@
                 + "\n\"From Index\" should be greater than 0 and less than \"To Index\"");
         }
 
-        List<AMQMessage> list = _queue.getMessagesOnTheQueue();
+        List<QueueEntry> list = _queue.getMessagesOnTheQueue();
         TabularDataSupport _messageList = new TabularDataSupport(_messagelistDataType);
 
         try
@@ -389,7 +392,7 @@
             // Create the tabular list of message header contents
             for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++)
             {
-                AMQMessage msg = list.get(i - 1);
+                AMQMessage msg = list.get(i - 1).getMessage();
                 ContentHeaderBody headerBody = msg.getContentHeaderBody();
                 // Create header attributes list
                 String[] headerAttributes = getMessageHeaderProperties(headerBody);



Mime
View raw message