qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgr...@apache.org
Subject svn commit: r490613 [1/4] - in /incubator/qpid/branches/new_persistence/java: ./ broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src...
Date Thu, 28 Dec 2006 00:31:14 GMT
Author: rgreig
Date: Wed Dec 27 16:31:11 2006
New Revision: 490613

URL: http://svn.apache.org/viewvc?view=rev&rev=490613
Log:
Merge up to trunk rev 490505

Added:
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
      - copied unchanged from r490505, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
      - copied unchanged from r490505, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
      - copied unchanged from r490505, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypeMap.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
      - copied unchanged from r490505, incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
Removed:
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java
    incubator/qpid/branches/new_persistence/java/common/src/main/versions/
    incubator/qpid/branches/new_persistence/java/common/src/main/xsl/
Modified:
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
    incubator/qpid/branches/new_persistence/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
    incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/basic/MapMessageTest.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/message/JMSDestinationTest.java
    incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/cluster/DefaultGroupManager.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/cluster/handler/RemoteConsumeHandler.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ConsumerCounts.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/RecordingMethodHandlerFactory.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/cluster/replay/ReplayStore.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredQueue.java
    incubator/qpid/branches/new_persistence/java/cluster/src/main/java/org/apache/qpid/server/queue/PrivateQueue.java
    incubator/qpid/branches/new_persistence/java/cluster/src/test/java/org/apache/qpid/server/cluster/BrokerTest.java
    incubator/qpid/branches/new_persistence/java/common/pom.xml
    incubator/qpid/branches/new_persistence/java/common/protocol-version.xml
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderProperties.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
    incubator/qpid/branches/new_persistence/java/common/src/main/java/org/apache/qpid/framing/JMSPropertyFieldTable.java
    incubator/qpid/branches/new_persistence/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
    incubator/qpid/branches/new_persistence/java/common/src/test/java/org/apache/qpid/framing/JMSPropertyFieldTableTest.java
    incubator/qpid/branches/new_persistence/java/common/src/test/java/org/apache/qpid/framing/PropertyFieldTableTest.java
    incubator/qpid/branches/new_persistence/java/distribution/   (props changed)
    incubator/qpid/branches/new_persistence/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/AttributesTabControl.java
    incubator/qpid/branches/new_persistence/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/OperationTabControl.java
    incubator/qpid/branches/new_persistence/java/management/eclipse-plugin/src/main/java/org/apache/qpid/management/ui/views/ViewUtility.java
    incubator/qpid/branches/new_persistence/java/pom.xml
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/AckTest.java
    incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/queue/MessageTestHelper.java

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Wed Dec 27 16:31:11 2006
@@ -21,12 +21,11 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQTypedValue;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -35,11 +34,53 @@
 {
     private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
 
-    private final Map _mappings = new HashMap();
-    private final Set<Object> required = new HashSet<Object>();
-    private final Set<Map.Entry> matches = new HashSet<Map.Entry>();
+    private final FieldTable _mappings = new FieldTable();
+    private final Set<String> required = new HashSet<String>();
+    private final Map<String,Object> matches = new HashMap<String,Object>();
     private boolean matchAny;
 
+    private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
+    {
+        private Boolean _result = Boolean.FALSE;
+
+        public boolean processElement(String propertyName, AMQTypedValue value)
+        {
+            if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
+            {
+                _result = Boolean.TRUE;
+                return false;
+            }
+            return true;
+        }
+
+        public Object getResult()
+        {
+            return _result;
+        }
+    }
+
+    private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
+    {
+        Boolean _result = Boolean.FALSE;
+
+        public boolean processElement(String propertyName, AMQTypedValue value)
+        {
+            if(required.contains(propertyName))
+            {
+                _result = Boolean.TRUE;
+                return false;
+            }
+            return true;
+        }
+
+        public Object getResult()
+        {
+            return _result;
+        }
+    }
+
+
+
     /**
      * Creates a binding for a set of mappings. Those mappings whose value is
      * null or the empty string are assumed only to be required headers, with
@@ -47,33 +88,50 @@
      * define a required match of value. 
      * @param mappings the defined mappings this binding should use
      */
-    HeadersBinding(Map mappings)
+
+    HeadersBinding(FieldTable mappings)
     {
-        //noinspection unchecked
-        this(mappings == null ? new HashSet<Map.Entry>() : mappings.entrySet());
-        _mappings.putAll(mappings);
+        Enumeration propertyNames = mappings.getPropertyNames();
+        while(propertyNames.hasMoreElements())
+        {
+            String propName = (String) propertyNames.nextElement();
+            _mappings.put(propName, mappings.getObject(propName));
+        }
+        initMappings();
     }
 
-    private HeadersBinding(Set<Map.Entry> entries)
+    private void initMappings()
     {
-        for (Map.Entry e : entries)
+
+        _mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
         {
-            if (isSpecial(e.getKey()))
-            {
-                processSpecial((String) e.getKey(), e.getValue());
-            }
-            else if (e.getValue() == null || e.getValue().equals(""))
+
+            public boolean processElement(String propertyName, AMQTypedValue value)
             {
-                required.add(e.getKey());
+                if (isSpecial(propertyName))
+                {
+                    processSpecial(propertyName, value.getValue());
+                }
+                else if (value.getValue() == null || value.getValue().equals(""))
+                {
+                    required.add(propertyName);
+                }
+                else
+                {
+                    matches.put(propertyName,value.getValue());
+                }
+
+                return true;
             }
-            else
+
+            public Object getResult()
             {
-                matches.add(e);
+                return null;
             }
-        }
+        });
     }
 
-    protected Map getMappings()
+    protected FieldTable getMappings()
     {
         return _mappings;
     }
@@ -84,7 +142,7 @@
      * @return true if the headers define any required keys and match any required
      * values
      */
-    public boolean matches(Map headers)
+    public boolean matches(FieldTable headers)
     {
         if(headers == null)
         {
@@ -96,18 +154,37 @@
         }
     }
 
-    private boolean and(Map headers)
+    private boolean and(FieldTable headers)
     {
-        //need to match all the defined mapping rules:
-        return headers.keySet().containsAll(required)
-                && headers.entrySet().containsAll(matches);
+        if(headers.keys().containsAll(required))
+        {
+            for(Map.Entry<String, Object> e : matches.entrySet())
+            {
+                if(!e.getValue().equals(headers.getObject(e.getKey())))
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+        else
+        {
+            return false;
+        }
     }
 
-    private boolean or(Map headers)
+
+    private boolean or(final FieldTable headers)
     {
-        //only need to match one mapping rule:
-        return !Collections.disjoint(headers.keySet(), required)
-                || !Collections.disjoint(headers.entrySet(), matches);
+        if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+        {
+            return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+                    || (required.isEmpty() && matches.isEmpty());
+        }
+        else
+        {
+            return true;
+        }
     }
 
     private void processSpecial(String key, Object value)

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Wed Dec 27 16:31:11 2006
@@ -22,10 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -34,10 +31,7 @@
 
 import javax.management.JMException;
 import javax.management.openmbean.*;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -119,16 +113,24 @@
                 String queueName = registration.queue.getName();
 
                 HeadersBinding headers = registration.binding;
-                Map<Object, Object> headerMappings = headers.getMappings();
-                List<String> mappingList = new ArrayList<String>();
+                FieldTable headerMappings = headers.getMappings();
+                final List<String> mappingList = new ArrayList<String>();
 
-                for (Map.Entry<Object, Object> en : headerMappings.entrySet())
+                headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor()
                 {
-                    String key = en.getKey().toString();
-                    String value = en.getValue().toString();
 
-                    mappingList.add(key + "=" + value);
-                }
+                    public boolean processElement(String propertyName, AMQTypedValue value)
+                    {
+                        mappingList.add(propertyName + "=" + value.getValue());
+                        return true;
+                    }
+
+                    public Object getResult()
+                    {
+                        return mappingList;
+                    }
+                });
+
 
                 Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
@@ -155,7 +157,7 @@
             }
 
             String[] bindings  = binding.split(",");
-            FieldTable fieldTable = FieldTableFactory.newFieldTable();
+            FieldTable bindingMap = new FieldTable();
             for (int i = 0; i < bindings.length; i++)
             {
                 String[] keyAndValue = bindings[i].split("=");
@@ -163,10 +165,10 @@
                 {
                     throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
                 }
-                fieldTable.put(keyAndValue[0], keyAndValue[1]);
+                bindingMap.setString(keyAndValue[0], keyAndValue[1]);
             }
 
-            _bindings.add(new Registration(new HeadersBinding(fieldTable), queue));
+            _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
         }
 
     } // End of MBean class
@@ -185,7 +187,7 @@
 
     public void route(AMQMessage payload) throws AMQException
     {
-        Map headers = getHeaders(payload.getContentHeaderBody());
+        FieldTable headers = getHeaders(payload.getContentHeaderBody());
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
@@ -248,7 +250,7 @@
         return !_bindings.isEmpty();
     }
 
-    protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
+    protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
     {
         //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
         //but these are not yet implemented.

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -54,7 +54,12 @@
         channel.unsubscribeConsumer(protocolSession, body.consumerTag);
         if(!body.nowait)
         {
-            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                body.consumerTag);	// consumerTag
             protocolSession.writeFrame(responseFrame);
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -81,7 +81,12 @@
                                                               body.arguments, body.noLocal);
                 if (!body.nowait)
                 {
-                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        consumerTag));		// consumerTag
                 }
 
                 //now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@
             catch (AMQInvalidSelectorException ise)
             {
                 _log.info("Closing connection due to invalid selector");
-                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
-                                                                      ise.getMessage(), BasicConsumeBody.CLASS_ID,
-                                                                      BasicConsumeBody.METHOD_ID));
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                    BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.INVALID_SELECTOR.getCode(),	// replyCode
+                    ise.getMessage()));		// replyText
             }
             catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
-                                                                      BasicConsumeBody.CLASS_ID,
-                                                                      BasicConsumeBody.METHOD_ID));
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                    BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    msg));	// replyText
             }
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -64,7 +64,15 @@
             protocolSession.closeChannel(evt.getChannelId());
             // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
             // then we can remove the hardcoded 0,0
-            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                ChannelCloseBody.getClazz((byte)8, (byte)0),	// classId
+                ChannelCloseBody.getMethod((byte)8, (byte)0),	// methodId
+                500,	// replyCode
+                "Unknown exchange name");	// replyText
             protocolSession.writeFrame(cf);
         }
         else

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Wed Dec 27 16:31:11 2006
@@ -44,6 +44,9 @@
                                AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
     {
         session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
-        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Wed Dec 27 16:31:11 2006
@@ -55,7 +55,10 @@
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
                      " and method " + body.methodId);
         protocolSession.closeChannel(evt.getChannelId());
-        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Wed Dec 27 16:31:11 2006
@@ -58,6 +58,12 @@
         channel.setSuspended(!body.active);
         _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
 
-        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            body.active);	// active
         protocolSession.writeFrame(response);
-    }}
+    }
+}

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Wed Dec 27 16:31:11 2006
@@ -55,7 +55,10 @@
         final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
                                                   exchangeRegistry);
         protocolSession.addChannel(channel);
-        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -62,7 +62,10 @@
         {
             _logger.error("Error closing protocol session: " + e, e);
         }
-        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -64,7 +64,12 @@
             contextKey = generateClientID();
         }
         protocolSession.setContextKey(contextKey);
-        AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            contextKey);	// knownHosts
         stateManager.changeState(AMQState.CONNECTION_OPEN);
         protocolSession.writeFrame(response);
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -75,25 +75,43 @@
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
                 stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
-                        AMQConstant.NOT_ALLOWED.getName(),
-                        ConnectionCloseBody.CLASS_ID,
-                        ConnectionCloseBody.METHOD_ID);
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    ConnectionCloseBody.getClazz((byte)8, (byte)0),		// classId
+                    ConnectionCloseBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    AMQConstant.NOT_ALLOWED.getName());	// replyText
                 protocolSession.writeFrame(close);
                 disposeSaslServer(protocolSession);
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
-                        ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
-                        HeartbeatConfig.getInstance().getDelay());
+                // TODO: Check the value of channelMax here: This should be the max
+                // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
+                // not Integer.MAX_VALUE (which is signed 4 bytes).
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    Integer.MAX_VALUE,	// channelMax
+                    ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
+                    HeartbeatConfig.getInstance().getDelay());	// heartbeat
                 protocolSession.writeFrame(tune);
                 disposeSaslServer(protocolSession);
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    authResult.challenge);	// challenge
                 protocolSession.writeFrame(challenge);
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Wed Dec 27 16:31:11 2006
@@ -92,13 +92,24 @@
                     _logger.info("Connected as: " + ss.getAuthorizationID());
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
-                                                                      HeartbeatConfig.getInstance().getDelay());
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        Integer.MAX_VALUE,	// channelMax
+                        getConfiguredFrameSize(),	// frameMax
+                        HeartbeatConfig.getInstance().getDelay());	// heartbeat
                     protocolSession.writeFrame(tune);
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        authResult.challenge);	// challenge
                     protocolSession.writeFrame(challenge);
             }
         }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Wed Dec 27 16:31:11 2006
@@ -64,6 +64,11 @@
                                ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
                                AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
     {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        byte major = (byte)8;
+        byte minor = (byte)0;
+        
         ExchangeBoundBody body = evt.getMethod();
 
         String exchangeName = body.exchange;
@@ -77,8 +82,11 @@
         AMQFrame response;
         if (exchange == null)
         {
-            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
-                                                             "Exchange " + exchangeName + " not found");
+            // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                major, minor,	// AMQP version (major, minor)
+                EXCHANGE_NOT_FOUND,	// replyCode
+                "Exchange " + exchangeName + " not found");	// replyText
         }
         else if (routingKey == null)
         {
@@ -86,11 +94,19 @@
             {
                 if (exchange.hasBindings())
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        OK,	// replyCode
+                        null);	// replyText
                 }
                 else
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        NO_BINDINGS,	// replyCode
+                        null);	// replyText
                 }
             }
             else
@@ -98,20 +114,29 @@
                 AMQQueue queue = queueRegistry.getQueue(queueName);
                 if (queue == null)
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
-                                                                      "Queue " + queueName + " not found");
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        QUEUE_NOT_FOUND,	// replyCode
+                        "Queue " + queueName + " not found");	// replyText
                 }
                 else
                 {
                     if (exchange.isBound(queue))
                     {
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                            major, minor,	// AMQP version (major, minor)
+                            OK,	// replyCode
+                            null);	// replyText
                     }
                     else
                     {
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
-                                                                      "Queue " + queueName + " not bound to exchange " +
-                                                                      exchangeName);
+                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                            major, minor,	// AMQP version (major, minor)
+                            QUEUE_NOT_BOUND,	// replyCode
+                            "Queue " + queueName + " not bound to exchange " + exchangeName);	// replyText
                     }
                 }
             }
@@ -121,24 +146,30 @@
             AMQQueue queue = queueRegistry.getQueue(queueName);
             if (queue == null)
             {
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
-                                                                  "Queue " + queueName + " not found");
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                    major, minor,	// AMQP version (major, minor)
+                    QUEUE_NOT_FOUND,	// replyCode
+                    "Queue " + queueName + " not found");	// replyText
             }
             else
             {
                 if (exchange.isBound(body.routingKey, queue))
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
-                                                                     null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        OK,	// replyCode
+                        null);	// replyText
                 }
                 else
                 {
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                     response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                                                                     SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
-                                                                     "Queue " + queueName +
-                                                                     " not bound with routing key " +
-                                                                     body.routingKey + " to exchange " +
-                                                                     exchangeName);
+                        major, minor,	// AMQP version (major, minor)
+                        SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
+                        "Queue " + queueName + " not bound with routing key " +
+                        body.routingKey + " to exchange " + exchangeName);	// replyText
                 }
             }
         }
@@ -146,16 +177,20 @@
         {
             if (exchange.isBound(body.routingKey))
             {
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
-                                                                 null);
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                    major, minor,	// AMQP version (major, minor)
+                    OK,	// replyCode
+                    null);	// replyText
             }
             else
             {
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                 response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                                                                 NO_QUEUE_BOUND_WITH_RK,
-                                                                 "No queue bound with routing key " +
-                                                                 body.routingKey + " to exchange " +
-                                                                 exchangeName);
+                    major, minor,	// AMQP version (major, minor)
+                    NO_QUEUE_BOUND_WITH_RK,	// replyCode
+                    "No queue bound with routing key " + body.routingKey +
+                    " to exchange " + exchangeName);	// replyText
             }
         }
         protocolSession.writeFrame(response);

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Wed Dec 27 16:31:11 2006
@@ -75,7 +75,10 @@
         }
         if(!body.nowait)
         {
-            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Wed Dec 27 16:31:11 2006
@@ -53,7 +53,10 @@
         try
         {
             exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
-            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
         catch (ExchangeInUseException e)

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Wed Dec 27 16:31:11 2006
@@ -90,7 +90,10 @@
         }
         if (!body.nowait)
         {
-            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed Dec 27 16:31:11 2006
@@ -102,7 +102,14 @@
         }
         if (!body.nowait)
         {
-            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                0L, // consumerCount
+                0L, // messageCount
+                body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             protocolSession.writeFrame(response);
         }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed Dec 27 16:31:11 2006
@@ -81,7 +81,12 @@
         {
             int purged = queue.delete(body.ifUnused, body.ifEmpty);
             _store.removeQueue(queue.getName());
-            session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                purged));	// messageCount
         }
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Wed Dec 27 16:31:11 2006
@@ -53,7 +53,10 @@
         {
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.commit();
-            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             channel.processReturns(protocolSession);
         }
         catch(AMQException e)

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Wed Dec 27 16:31:11 2006
@@ -51,7 +51,10 @@
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.rollback();
-            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             channel.resend(protocolSession);

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Wed Dec 27 16:31:11 2006
@@ -48,6 +48,9 @@
                                AMQMethodEvent<TxSelectBody> evt) throws AMQException
     {
         protocolSession.getChannel(evt.getChannelId()).setLocalTransactional();
-        protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Wed Dec 27 16:31:11 2006
@@ -164,8 +164,17 @@
                 _minor = pi.protocolMinor;
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
-                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
-                                                                       mechanisms.getBytes(), locales.getBytes());
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+            		(byte)8, (byte)0,	// AMQP version (major, minor)
+                    locales.getBytes(),	// locales
+                    mechanisms.getBytes(),	// mechanisms
+                    null,	// serverProperties
+                	(short)8,	// versionMajor
+                    (short)0	// versionMinor
+                    );
                 _minaProtocolSession.write(response);
             }
             catch (AMQException e)

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Wed Dec 27 16:31:11 2006
@@ -172,7 +172,16 @@
         }
         else
         {
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+            	(byte)8, (byte)0,	// AMQP version (major, minor)
+            	0,	// classId
+                0,	// methodId
+                200,	// replyCode
+                throwable.getMessage()	// replyText
+                ));
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
             protocolSession.close();
         }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Wed Dec 27 16:31:11 2006
@@ -193,8 +193,16 @@
     public void closeConnection() throws JMException
     {
         
-        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0, AMQConstant.REPLY_SUCCESS.getCode(),
-                                                      "Broker Management Console has closing the connection.", 0, 0);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            0,	// classId
+            0,	// methodId
+        	AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+            "Broker Management Console has closing the connection."	// replyText
+            );
         _session.writeFrame(response);
 
         try

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Wed Dec 27 16:31:11 2006
@@ -585,8 +585,8 @@
             throws AMQException
     {
         BasicPublishBody pb = getPublishBody();
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, consumerTag,
-                                                                deliveryTag, false, pb.exchange,
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId, (byte) 8, (byte) 0, consumerTag,
+                                                                deliveryTag, pb.exchange, _messageHandle.isRedelivered(),
                                                                 pb.routingKey);
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
         deliverFrame.writePayload(buf);
@@ -596,7 +596,8 @@
 
     private ByteBuffer createEncodedReturnFrame(int channelId, int replyCode, String replyText) throws AMQException
     {
-        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, replyCode, replyText, getPublishBody().exchange,
+        AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId, (byte) 8, (byte) 0, getPublishBody().exchange,
+                                                              replyCode, replyText,                                                             
                                                               getPublishBody().routingKey);
         ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); // XXX: Could cast be a problem?
         returnFrame.writePayload(buf);

Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Wed Dec 27 16:31:11 2006
@@ -373,7 +373,13 @@
         if (!_closed)
         {
             _logger.info("Closing autoclose subscription:" + this);
-            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+        		(byte)8, (byte)0,	// AMQP version (major, minor)
+            	consumerTag	// consumerTag
+                ));
             _closed = true;
         }
     }
@@ -386,9 +392,17 @@
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
-                                                                deliveryTag, false, exchange,
-                                                                routingKey);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+        	(byte)8, (byte)0,	// AMQP version (major, minor)
+            consumerTag,	// consumerTag
+        	deliveryTag,	// deliveryTag
+            exchange,	// exchange
+            false,	// redelivered
+            routingKey	// routingKey
+            );
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
         deliverFrame.writePayload(buf);
         buf.flip();

Modified: incubator/qpid/branches/new_persistence/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Wed Dec 27 16:31:11 2006
@@ -24,17 +24,18 @@
 import java.util.HashMap;
 
 import junit.framework.TestCase;
+import org.apache.qpid.framing.FieldTable;
 
 /**
  */
 public class HeadersBindingTest extends TestCase
 {
-    private Map<String, String> bindHeaders = new HashMap<String, String>();
-    private Map<String, String> matchHeaders = new HashMap<String, String>();
+    private FieldTable bindHeaders = new FieldTable();
+    private FieldTable matchHeaders = new FieldTable();
 
     public void testDefault_1()
     {
-        bindHeaders.put("A", "Value of A");
+        bindHeaders.setString("A", "Value of A");
 
         matchHeaders.put("A", "Value of A");
 

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=490613&r1=490612&r2=490613
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Wed Dec 27 16:31:11 2006
@@ -465,12 +465,25 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException
     {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-                ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+            ChannelOpenBody.createAMQFrame(channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                null),	// outOfBand
+                ChannelOpenOkBody.class);
 
         //todo send low water mark when protocol allows.
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-                BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+            BasicQosBody.createAMQFrame(channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                false,	// global
+                prefetchHigh,	// prefetchCount
+                0),	// prefetchSize
                 BasicQosOkBody.class);
 
         if (transacted)
@@ -479,7 +492,10 @@
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
         }
     }
 



Mime
View raw message