qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kp...@apache.org
Subject svn commit: r1334037 [15/24] - in /qpid/branches/asyncstore: ./ bin/ cc/ cpp/ cpp/bindings/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qpid/ cpp/bindings/qpid/dotnet/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/...
Date Fri, 04 May 2012 15:40:13 GMT
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Fri May  4 15:39:19 2012
@@ -134,7 +134,7 @@ public class QueueBindHandler implements
                     Map<String, Object> oldArgs = oldBinding.getArguments();
                     if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
                     {
-                        virtualHost.getBindingFactory().replaceBinding(bindingKey, queue, exch, arguments);    
+                        virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments);
                     }
                 }
             }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Fri May  4 15:39:19 2012
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
@@ -31,6 +32,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -43,6 +45,7 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -65,7 +68,7 @@ public class QueueDeclareHandler impleme
         VirtualHost virtualHost = protocolConnection.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+        DurableConfigurationStore store = virtualHost.getMessageStore();
 
         final AMQShortString queueName;
 
@@ -219,10 +222,11 @@ public class QueueDeclareHandler impleme
             throws AMQException
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
-        AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+        String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null;
 
-        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
-                                                                  body.getExclusive(),virtualHost, body.getArguments());
+        Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments());
+        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), AMQShortString.toString(queueName), body.getDurable(), owner, body.getAutoDelete(),
+                                                                  body.getExclusive(),virtualHost, arguments);
 
         if (body.getExclusive() && !body.getDurable())
         {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Fri May  4 15:39:19 2012
@@ -62,7 +62,7 @@ public class QueueDeleteHandler implemen
         AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
         VirtualHost virtualHost = protocolConnection.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+        DurableConfigurationStore store = virtualHost.getMessageStore();
 
 
         AMQChannel channel = protocolConnection.getChannel(channelId);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java Fri May  4 15:39:19 2012
@@ -20,11 +20,13 @@
  */
 package org.apache.qpid.server.logging.actors;
 
-import org.apache.qpid.server.logging.LogActor;
-
 import java.util.EmptyStackException;
 import java.util.Stack;
 
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogMessage;
+import org.apache.qpid.server.logging.LogSubject;
+
 /**
  * The CurrentActor is a ThreadLocal wrapper that allows threads in the broker
  * to retrieve an actor to perform logging. This approach is used so for two
@@ -126,4 +128,14 @@ public class CurrentActor
     {
         _defaultActor = defaultActor;
     }
+
+    public static void message(LogSubject subject, LogMessage message)
+    {
+        get().message(subject, message);
+    }
+
+    public static void message(LogMessage message)
+    {
+        get().message(message);
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties Fri May  4 15:39:19 2012
@@ -18,8 +18,7 @@
 #
 # Default File used for all non-defined locales.
 
-# 0 - name
-CREATED = CFG-1001 : Created : {0}
+CREATED = CFG-1001 : Created
 # 0 - path
 STORE_LOCATION = CFG-1002 : Store location : {0}
 CLOSE = CFG-1003 : Closed

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties Fri May  4 15:39:19 2012
@@ -18,11 +18,11 @@
 #
 # Default File used for all non-defined locales.
 #
-# 0 - name
-CREATED = MST-1001 : Created : {0}
+CREATED = MST-1001 : Created
 # 0 - path
 STORE_LOCATION = MST-1002 : Store location : {0}
 CLOSED = MST-1003 : Closed
 RECOVERY_START = MST-1004 : Recovery Start
 RECOVERED = MST-1005 : Recovered {0,number} messages
-RECOVERY_COMPLETE = MST-1006 : Recovery Complete
\ No newline at end of file
+RECOVERY_COMPLETE = MST-1006 : Recovery Complete
+PASSIVATE = MST-1007 : Store Passivated

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties Fri May  4 15:39:19 2012
@@ -19,8 +19,7 @@
 # Default File used for all non-defined locales.
 #
 #
-# 0 - name
-CREATED = TXN-1001 : Created : {0}
+CREATED = TXN-1001 : Created
 # 0 - path
 STORE_LOCATION = TXN-1002 : Store location : {0}
 CLOSED = TXN-1003 : Closed
@@ -31,3 +30,9 @@ RECOVERY_START = TXN-1004 : Recovery Sta
 RECOVERED = TXN-1005 : Recovered {0,number} messages for queue {1}
 # 0 - queue name
 RECOVERY_COMPLETE = TXN-1006 : Recovery Complete[ : {0}]
+# 0 - xid
+# 1 - queue name
+XA_INCOMPLETE_QUEUE = TXN-1007 : XA transaction recover for xid {0} incomplete as it references a queue {1} which was not durably retained
+# 0 - xid format
+# 1 - message id
+XA_INCOMPLETE_MESSAGE = TXN-1008 : XA transaction recover for xid {0} incomplete as it references a message {1} which was not durably retained

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java Fri May  4 15:39:19 2012
@@ -40,7 +40,8 @@ public class BindingLogSubject extends A
     public BindingLogSubject(String routingKey, Exchange exchange,
                              AMQQueue queue)
     {
-        setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+        setLogStringWithFormat(BINDING_FORMAT,
+                               queue.getVirtualHost().getName(),
                                exchange.getTypeShortString(),
                                exchange.getNameShortString(),
                                queue.getNameShortString(),

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java Fri May  4 15:39:19 2012
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT;
@@ -28,10 +27,9 @@ import static org.apache.qpid.server.log
 public class MessageStoreLogSubject extends AbstractLogSubject
 {
 
-    /** Create an ExchangeLogSubject that Logs in the following format. */
-    public MessageStoreLogSubject(VirtualHost vhost, MessageStore store)
+    /** Create an MessageStoreLogSubject that Logs in the following format. */
+    public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName)
     {
-        setLogStringWithFormat(STORE_FORMAT, vhost.getName(),
-                               store.getClass().getSimpleName());
+        setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName);
     }
 }

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:r1291265-1333987

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java Fri May  4 15:39:19 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.management;
 
 import org.apache.qpid.management.common.mbeans.ManagedConnection;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java Fri May  4 15:39:19 2012
@@ -157,9 +157,7 @@ public class JMXManagedObjectRegistry im
 
                 if (!ksf.exists())
                 {
-                    throw new FileNotFoundException("Cannot find JMX management SSL keystore file " + ksf + "\n"
-                                                  + "Check broker configuration, or see create-example-ssl-stores script"
-                                                  + "in the bin/ directory if you need to generate an example store.");
+                    throw new FileNotFoundException("Cannot find JMX management SSL keystore file: " + ksf);
                 }
                 if (!ksf.canRead())
                 {

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java Fri May  4 15:39:19 2012
@@ -50,6 +50,7 @@ public class MessageMetaData_0_10 implem
     public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
 
     private volatile ByteBuffer _encoded;
+    private Object _connectionReference;
 
 
     public MessageMetaData_0_10(MessageTransfer xfr)
@@ -219,6 +220,16 @@ public class MessageMetaData_0_10 implem
         return _header;
     }
 
+    public void setConnectionReference(Object connectionReference)
+    {
+        _connectionReference = connectionReference;
+    }
+
+    public Object getConnectionReference()
+    {
+        return _connectionReference;
+    }
+
     private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_0_10>
     {
         public MessageMetaData_0_10 createMetaData(ByteBuffer buf)

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri May  4 15:39:19 2012
@@ -58,6 +58,7 @@ import org.apache.qpid.server.state.AMQS
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.Sender;
@@ -1315,7 +1316,8 @@ public class AMQProtocolEngine implement
 
     public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
     {
-        closeChannel((Integer)session.getID());
+        int channelId = ((AMQChannel)session).getChannelId();
+        closeChannel(channelId);
 
         MethodRegistry methodRegistry = getMethodRegistry();
         ChannelCloseBody responseBody =
@@ -1324,7 +1326,7 @@ public class AMQProtocolEngine implement
                         new AMQShortString(message),
                         0,0);
 
-        writeFrame(responseBody.generateFrame((Integer)session.getID()));
+        writeFrame(responseBody.generateFrame(channelId));
     }
 
     public void close(AMQConstant cause, String message) throws AMQException
@@ -1454,7 +1456,7 @@ public class AMQProtocolEngine implement
                 throws AMQException
         {
             registerMessageDelivered(entry.getMessage().getSize());
-            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, sub.getConsumerTag());
+            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
             entry.incrementDeliveryCount();
         }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri May  4 15:39:19 2012
@@ -20,10 +20,12 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.util.UUID;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.SimpleAMQQueue;
 
@@ -34,7 +36,8 @@ import org.apache.qpid.server.queue.Simp
  */
 public interface AMQSessionModel extends Comparable<AMQSessionModel>
 {
-    public Object getID();
+    /** Unique session ID across entire broker*/
+    public UUID getId();
 
     public AMQConnectionModel getConnectionModel();
 
@@ -64,4 +67,7 @@ public interface AMQSessionModel extends
     void block(AMQQueue queue);
 
     void unblock(AMQQueue queue);
+
+
+    boolean onSameConnection(InboundMessage inbound);
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java Fri May  4 15:39:19 2012
@@ -20,4 +20,4 @@
  */
 package org.apache.qpid.server.protocol;
 
-public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10 }
\ No newline at end of file
+public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri May  4 15:39:19 2012
@@ -175,6 +175,28 @@ public class MultiVersionProtocolEngine 
                          (byte) 10
             };
 
+    private static final byte[] AMQP_1_0_0_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 0,
+                         (byte) 1,
+                         (byte) 0,
+                         (byte) 0
+            };
+
+    private static final byte[] AMQP_SASL_1_0_0_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 3,
+                         (byte) 1,
+                         (byte) 0,
+                         (byte) 0
+            };
+
     public void setNetworkConnection(NetworkConnection networkConnection)
     {
         setNetworkConnection(networkConnection, networkConnection.getSender());
@@ -289,8 +311,48 @@ public class MultiVersionProtocolEngine 
         }
     };
 
+    private DelegateCreator creator_1_0_0 = new DelegateCreator()
+    {
+
+        public AmqpProtocolVersion getVersion()
+        {
+            return AmqpProtocolVersion.v1_0_0;
+        }
+
+
+        public byte[] getHeaderIdentifier()
+        {
+            return AMQP_1_0_0_HEADER;
+        }
+
+        public ServerProtocolEngine getProtocolEngine()
+        {
+            return new ProtocolEngine_1_0_0(_appRegistry,_id);
+        }
+    };
+
+    private DelegateCreator creator_1_0_0_SASL = new DelegateCreator()
+    {
+
+        public AmqpProtocolVersion getVersion()
+        {
+            return AmqpProtocolVersion.v1_0_0;
+        }
+
+
+        public byte[] getHeaderIdentifier()
+        {
+            return AMQP_SASL_1_0_0_HEADER;
+        }
+
+        public ServerProtocolEngine getProtocolEngine()
+        {
+            return new ProtocolEngine_1_0_0_SASL(_network, _appRegistry, _id);
+        }
+    };
+
     private final DelegateCreator[] _creators =
-            new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+            new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10, creator_1_0_0_SASL, creator_1_0_0 };
 
 
     private class ClosedDelegateProtocolEngine implements ServerProtocolEngine

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri May  4 15:39:19 2012
@@ -0,0 +1,6 @@
+/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:886720-886722,887145,892761,930288
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:805429-821809
+/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:787599
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1333987

Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1291265-1333987

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Fri May  4 15:39:19 2012
@@ -23,19 +23,20 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
+import java.util.UUID;
 
 public class AMQPriorityQueue extends OutOfOrderQueue
 {
-    protected AMQPriorityQueue(final String name,
+    protected AMQPriorityQueue(UUID id,
+                                final String name,
                                 final boolean durable,
                                 final String owner,
                                 final boolean autoDelete,
                                 boolean exclusive,
                                 final VirtualHost virtualHost,
-                                Map<String, Object> arguments,
-                                int priorities)
+                                Map<String, Object> arguments, int priorities)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
+        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
     }
 
     public int getPriorities()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri May  4 15:39:19 2012
@@ -34,7 +34,6 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
@@ -141,10 +140,9 @@ public interface AMQQueue extends Managa
     public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
 
 
-    void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName,
-                                                        ServerTransaction transaction);
+    void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
 
-    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction transaction);
+    void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
 
     void removeMessagesFromQueue(long fromMessageId, long toMessageId);
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri May  4 15:39:19 2012
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -30,12 +34,10 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class AMQQueueFactory
 {
     public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
@@ -166,8 +168,13 @@ public class AMQQueueFactory
             }
     };
 
-
-    /** @see #createAMQQueueImpl(String, boolean, String, boolean, boolean, VirtualHost, Map) */
+    /**
+     * Creates a new queue with a random id.
+     *
+     * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map)
+     * @deprecated because only called from unit tests
+     * */
+    @Deprecated
     public static AMQQueue createAMQQueueImpl(AMQShortString name,
                                               boolean durable,
                                               AMQShortString owner,
@@ -175,22 +182,28 @@ public class AMQQueueFactory
                                               boolean exclusive,
                                               VirtualHost virtualHost, final FieldTable arguments) throws AMQException
     {
-        return createAMQQueueImpl(name == null ? null : name.toString(),
+        return createAMQQueueImpl(UUIDGenerator.generateUUID(),
+                                  name == null ? null : name.toString(),
                                   durable,
                                   owner == null ? null : owner.toString(),
                                   autoDelete,
-                                  exclusive,
-                                  virtualHost, FieldTable.convertToMap(arguments));
+                                  exclusive, virtualHost, FieldTable.convertToMap(arguments));
     }
 
-
-    public static AMQQueue createAMQQueueImpl(String queueName,
+    /**
+     * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value.
+     */
+    public static AMQQueue createAMQQueueImpl(UUID id,
+                                              String queueName,
                                               boolean durable,
                                               String owner,
                                               boolean autoDelete,
-                                              boolean exclusive,
-                                              VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
+                                              boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
     {
+        if (id == null)
+        {
+            throw new IllegalArgumentException("Queue id must not be null");
+        }
         if (queueName == null)
         {
             throw new IllegalArgumentException("Queue name must not be null");
@@ -241,19 +254,19 @@ public class AMQQueueFactory
         AMQQueue q;
         if(sortingKey != null)
         {
-            q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+            q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
         }
         else if(conflationKey != null)
         {
-            q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
+            q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
         }
         else if(priorities > 1)
         {
-            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
+            q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
         }
         else
         {
-            q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
+            q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
         }
 
         //Register the new queue
@@ -287,12 +300,12 @@ public class AMQQueueFactory
 
                 if(dlExchange == null)
                 {
-                    dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+                    dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
 
                     exchangeRegistry.registerExchange(dlExchange);
 
                     //enter the dle in the persistent store
-                    virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+                    virtualHost.getMessageStore().createExchange(dlExchange);
                 }
             }
 
@@ -309,10 +322,10 @@ public class AMQQueueFactory
                     args.put(X_QPID_DLQ_ENABLED, false);
                     args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
 
-                    dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+                    dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
 
                     //enter the dlq in the persistent store
-                    virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+                    virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
                 }
             }
 
@@ -364,7 +377,10 @@ public class AMQQueueFactory
             arguments.put(X_QPID_DLQ_ENABLED, true);
         }
 
-        AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
+        // we need queues that are defined in config to have deterministic ids.
+        UUID id = UUIDGenerator.generateUUID(queueName, host.getName());
+
+        AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
         q.configure(config);
         return q;
     }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Fri May  4 15:39:19 2012
@@ -36,8 +36,6 @@ import org.apache.qpid.server.message.AM
 import org.apache.qpid.server.message.AMQMessageHeader;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.txn.LocalTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.transport.MessageProperties;
 
 import javax.management.JMException;
@@ -613,9 +611,7 @@ public class AMQQueueMBean extends AMQMa
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
-        txn.commit();
+        _queue.moveMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName);
     }
 
     /**
@@ -648,11 +644,7 @@ public class AMQQueueMBean extends AMQMa
             throw new OperationsException("\"From MessageId\" should be greater than 0 and less than \"To MessageId\"");
         }
 
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
-        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName, txn);
-
-        txn.commit();
+        _queue.copyMessagesToAnotherQueue(fromMessageId, toMessageId, toQueueName);
     }
 
     /**

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Fri May  4 15:39:19 2012
@@ -21,22 +21,23 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
 import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ConflationQueue extends SimpleAMQQueue
 {
-    protected ConflationQueue(String name,
+    protected ConflationQueue(UUID id,
+                              String name,
                               boolean durable,
                               String owner,
                               boolean autoDelete,
                               boolean exclusive,
                               VirtualHost virtualHost,
-                              Map<String, Object> args,
-                              String conflationKey)
+                              Map<String, Object> args, String conflationKey)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
+        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
     }
 
     public String getConflationKey()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri May  4 15:39:19 2012
@@ -20,15 +20,21 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 public class DefaultQueueRegistry implements QueueRegistry
 {
+    private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
+
     private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
 
     private final VirtualHost _virtualHost;
@@ -72,4 +78,36 @@ public class DefaultQueueRegistry implem
     {
         return getQueue(new AMQShortString(queue));
     }
+
+    @Override
+    public void stopAllAndUnregisterMBeans()
+    {
+        for (final AMQQueue queue : getQueues())
+        {
+            queue.stop();
+            try
+            {
+                queue.getManagedObject().unregister();
+            }
+            catch (AMQException e)
+            {
+                LOGGER.warn("Failed to unregister mbean", e);
+            }
+        }
+        _queueMap.clear();
+    }
+
+    @Override
+    public synchronized AMQQueue getQueue(UUID queueId)
+    {
+        Collection<AMQQueue> queues = _queueMap.values();
+        for (AMQQueue queue : queues)
+        {
+            if (queue.getId().equals(queueId))
+            {
+                return queue;
+            }
+        }
+        return null;
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Fri May  4 15:39:19 2012
@@ -70,8 +70,6 @@ public class IncomingMessage implements 
 
     private Exchange _exchange;
 
-
-    private int _receivedChunkCount = 0;
     private List<ContentChunk> _contentChunks = new ArrayList<ContentChunk>();
 
     // we keep both the original meta data object and the store reference to it just in case the
@@ -80,13 +78,20 @@ public class IncomingMessage implements 
     private MessageMetaData _messageMetaData;
 
     private StoredMessage<MessageMetaData> _storedMessageHandle;
+    private Object _connectionReference;
 
 
     public IncomingMessage(
             final MessagePublishInfo info
     )
     {
+        this(info, null);
+    }
+
+    public IncomingMessage(MessagePublishInfo info, Object reference)
+    {
         _messagePublishInfo = info;
+        _connectionReference = reference;
     }
 
     public void setContentHeaderBody(final ContentHeaderBody contentHeaderBody) throws AMQException
@@ -125,12 +130,6 @@ public class IncomingMessage implements 
 
     }
 
-    public MessageMetaData headersReceived()
-    {
-
-        return headersReceived(System.currentTimeMillis());
-    }
-
     public MessageMetaData headersReceived(long currentTime)
     {
         _messageMetaData = new MessageMetaData(_messagePublishInfo, _contentHeaderBody, 0, currentTime);
@@ -143,16 +142,10 @@ public class IncomingMessage implements 
         return _destinationQueues;
     }
 
-    public int addContentBodyFrame(final ContentChunk contentChunk)
-            throws AMQException
+    public void addContentBodyFrame(final ContentChunk contentChunk) throws AMQException
     {
-        _storedMessageHandle.addContent((int)_bodyLengthReceived, ByteBuffer.wrap(contentChunk.getData()));
         _bodyLengthReceived += contentChunk.getSize();
         _contentChunks.add(contentChunk);
-
-
-
-        return _receivedChunkCount++;
     }
 
     public boolean allContentReceived()
@@ -252,18 +245,12 @@ public class IncomingMessage implements 
         return _expiration;
     }
 
-    public int getReceivedChunkCount()
-    {
-        return _receivedChunkCount;
-    }
-
-
     public int getBodyCount() throws AMQException
     {
         return _contentChunks.size();
     }
 
-    public ContentChunk getContentChunk(int index) throws IllegalArgumentException, AMQException
+    public ContentChunk getContentChunk(int index)
     {
         return _contentChunks.get(index);
     }
@@ -318,4 +305,14 @@ public class IncomingMessage implements 
     {
         return _storedMessageHandle;
     }
+
+    public Object getConnectionReference()
+    {
+        return _connectionReference;
+    }
+
+    public MessageMetaData getMessageMetaData()
+    {
+        return _messageMetaData;
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Fri May  4 15:39:19 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.subscription.Subscription;
@@ -5,15 +25,16 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
+import java.util.UUID;
 
 public abstract class OutOfOrderQueue extends SimpleAMQQueue
 {
 
-    protected OutOfOrderQueue(String name, boolean durable, String owner,
-                              boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
-                              QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+    protected OutOfOrderQueue(UUID id, String name, boolean durable,
+                              String owner, boolean autoDelete, boolean exclusive,
+                              VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
     }
 
     @Override

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Fri May  4 15:39:19 2012
@@ -25,19 +25,19 @@ import org.apache.qpid.server.message.Se
 public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
 {
     private final AMQQueue _queue;
-    private final SimpleQueueEntryList[] _priorityLists;
+    private final PriorityQueueEntrySubList[] _priorityLists;
     private final int _priorities;
     private final int _priorityOffset;
 
     public PriorityQueueList(AMQQueue queue, int priorities)
     {
         _queue = queue;
-        _priorityLists = new SimpleQueueEntryList[priorities];
+        _priorityLists = new PriorityQueueEntrySubList[priorities];
         _priorities = priorities;
         _priorityOffset = 5-((priorities + 1)/2);
         for(int i = 0; i < priorities; i++)
         {
-            _priorityLists[i] = new SimpleQueueEntryList(queue);
+            _priorityLists[i] = new PriorityQueueEntrySubList(queue);
         }
     }
 
@@ -161,4 +161,48 @@ public class PriorityQueueList implement
             return new PriorityQueueList(queue, _priorities);
         }
     }
+
+    private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
+    {
+        public PriorityQueueEntrySubList(AMQQueue queue)
+        {
+            super(queue);
+        }
+
+        @Override
+        protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message)
+        {
+            return new PriorityQueueEntryImpl(this, message);
+        }
+    }
+
+    private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
+    {
+        public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+        {
+            super(queueEntryList, message);
+        }
+
+        @Override
+        public int compareTo(final QueueEntry o)
+        {
+            byte thisPriority = getMessageHeader().getPriority();
+            byte otherPriority = o.getMessageHeader().getPriority();
+
+            if(thisPriority != otherPriority)
+            {
+                /*
+                 * Different priorities, so answer can only be greater than or less than
+                 *
+                 * A message with higher priority (e.g. 5) is conceptually 'earlier' in the
+                 * priority queue than one with a lower priority (e.g. 4).
+                 */
+                return thisPriority > otherPriority ? -1 : 1;
+            }
+            else
+            {
+                return super.compareTo(o);
+            }
+        }
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Fri May  4 15:39:19 2012
@@ -52,4 +52,13 @@ final class QueueContext implements AMQQ
     {
         return _releasedEntry;
     }
+
+    @Override
+    public String toString()
+    {
+        return "QueueContext{" +
+               "_lastSeenEntry=" + _lastSeenEntry +
+               ", _releasedEntry=" + _releasedEntry +
+               '}';
+    }
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Fri May  4 15:39:19 2012
@@ -227,9 +227,10 @@ public abstract class QueueEntryImpl imp
     public void release()
     {
         EntryState state = _state;
-        
+
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
+
             if(state instanceof SubscriptionAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount();
@@ -254,6 +255,7 @@ public abstract class QueueEntryImpl imp
                 routeToAlternate();
             }
         }
+
     }
 
     public boolean releaseButRetain()
@@ -267,7 +269,6 @@ public abstract class QueueEntryImpl imp
             Subscription sub = ((SubscriptionAcquiredState) state).getSubscription();
             if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState()))
             {
-                System.err.println("Message released (and retained)" + getMessage().getMessageNumber());
                 getQueue().requeue(this);
                 if(_stateChangeListeners != null)
                 {
@@ -417,11 +418,19 @@ public abstract class QueueEntryImpl imp
 
         if (alternateExchange != null)
         {
-            final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+            InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
+            List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
             final ServerMessage message = getMessage();
-            if (rerouteQueues != null && rerouteQueues.size() != 0)
+            if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
             {
+                queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+            }
 
+
+
+            if (queues != null && queues.size() != 0)
+            {
+                final List<? extends BaseQueue> rerouteQueues = queues;
                 ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
 
                 txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Fri May  4 15:39:19 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
+import java.util.UUID;
 
 public interface QueueRegistry
 {
@@ -40,4 +41,8 @@ public interface QueueRegistry
     Collection<AMQQueue> getQueues();
 
     AMQQueue getQueue(String queue);
+
+    void stopAllAndUnregisterMBeans();
+
+    AMQQueue getQueue(UUID queueId);
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Fri May  4 15:39:19 2012
@@ -191,29 +191,29 @@ public class SimpleAMQQueue implements A
     private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
     private final MessageGroupManager _messageGroupManager;
 
-    protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
+    protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
     {
-        this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments);
+        this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
     }
 
-    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
+    public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
     {
-        this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
+        this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
     }
 
-    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+    public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
     {
-        this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+        this(id, queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
     }
 
-    protected SimpleAMQQueue(AMQShortString name,
+    protected SimpleAMQQueue(UUID id,
+                             AMQShortString name,
                              boolean durable,
                              AMQShortString owner,
                              boolean autoDelete,
                              boolean exclusive,
                              VirtualHost virtualHost,
-                             QueueEntryListFactory entryListFactory,
-                             Map<String,Object> arguments)
+                             QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
     {
 
         if (name == null)
@@ -236,7 +236,7 @@ public class SimpleAMQQueue implements A
         _entries = entryListFactory.createQueueEntryList(this);
         _arguments = arguments;
 
-        _id = virtualHost.getConfigStore().createId();
+        _id = id;
 
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
@@ -346,7 +346,7 @@ public class SimpleAMQQueue implements A
 
         if(isDurable())
         {
-            getVirtualHost().getDurableConfigurationStore().updateQueue(this);
+            getVirtualHost().getMessageStore().updateQueue(this);
         }
     }
 
@@ -862,7 +862,6 @@ public class SimpleAMQQueue implements A
 
     public void requeue(QueueEntry entry)
     {
-
         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
         while (subscriberIter.advance() && entry.isAvailable())
@@ -1198,19 +1197,10 @@ public class SimpleAMQQueue implements A
 
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
-                                           String queueName,
-                                           ServerTransaction txn) throws IllegalArgumentException
+                                           String destinationQueueName) throws IllegalArgumentException
     {
 
-        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        if (toQueue == null)
-        {
-            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
-        }
-        else if (toQueue == this)
-        {
-            throw new IllegalArgumentException("The destination queue cant be the same as the source queue");
-        }
+        final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -1230,65 +1220,68 @@ public class SimpleAMQQueue implements A
         });
 
 
-
-        // Move the messages in on the message store.
-        for (final QueueEntry entry : entries)
+        final ServerTransaction txn = new LocalTransaction(getVirtualHost().getMessageStore());
+        boolean shouldRollback = true;
+        try
         {
-            final ServerMessage message = entry.getMessage();
-            txn.enqueue(toQueue, message,
-                        new ServerTransaction.Action()
-                        {
-
-                            public void postCommit()
+            // Move the messages in on the message store.
+            for (final QueueEntry entry : entries)
+            {
+                final ServerMessage message = entry.getMessage();
+                txn.enqueue(toQueue, message,
+                            new ServerTransaction.Action()
                             {
-                                try
+
+                                public void postCommit()
                                 {
-                                    toQueue.enqueue(message);
+                                    try
+                                    {
+                                        toQueue.enqueue(message);
+                                    }
+                                    catch (AMQException e)
+                                    {
+                                        throw new RuntimeException(e);
+                                    }
                                 }
-                                catch (AMQException e)
+
+                                public void onRollback()
                                 {
-                                    throw new RuntimeException(e);
+                                    entry.release();
                                 }
-                            }
-
-                            public void onRollback()
-                            {
-                                entry.release();
-                            }
-                        });
-            txn.dequeue(this, message,
-                        new ServerTransaction.Action()
-                        {
-
-                            public void postCommit()
+                            });
+                txn.dequeue(this, message,
+                            new ServerTransaction.Action()
                             {
-                                entry.discard();
-                            }
 
-                            public void onRollback()
-                            {
+                                public void postCommit()
+                                {
+                                    entry.discard();
+                                }
 
-                            }
-                        });
+                                public void onRollback()
+                                {
 
+                                }
+                            });
+            }
+            txn.commit();
+            shouldRollback = false;
+        }
+        finally
+        {
+            if (shouldRollback)
+            {
+                txn.rollback();
+            }
         }
 
     }
 
     public void copyMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
-                                           String queueName,
-                                           final ServerTransaction txn) throws IllegalArgumentException
+                                           String destinationQueueName) throws IllegalArgumentException
     {
-        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        if (toQueue == null)
-        {
-            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
-        }
-        else if (toQueue == this)
-        {
-            throw new IllegalArgumentException("The destination queue cant be the same as the source queue");
-        }
+        final AMQQueue toQueue = getValidatedDestinationQueue(destinationQueueName);
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
@@ -1306,36 +1299,63 @@ public class SimpleAMQQueue implements A
             }
         });
 
-
-        // Move the messages in on the message store.
-        for (QueueEntry entry : entries)
+        final ServerTransaction txn = new LocalTransaction(_virtualHost.getMessageStore());
+        boolean shouldRollback = true;
+        try
         {
-            final ServerMessage message = entry.getMessage();
-
-            txn.enqueue(toQueue, message, new ServerTransaction.Action()
+            // Copy the messages in on the message store.
+            for (QueueEntry entry : entries)
             {
-                public void postCommit()
+                final ServerMessage message = entry.getMessage();
+
+                txn.enqueue(toQueue, message, new ServerTransaction.Action()
                 {
-                    try
+                    public void postCommit()
                     {
-                        toQueue.enqueue(message);
+                        try
+                        {
+                            toQueue.enqueue(message);
+                        }
+                        catch (AMQException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
                     }
-                    catch (AMQException e)
+
+                    public void onRollback()
                     {
-                        throw new RuntimeException(e);
                     }
-                }
+                });
 
-                public void onRollback()
-                {
-
-                }
-            });
+            }
 
+            txn.commit();
+            shouldRollback = false;
+        }
+        finally
+        {
+            if (shouldRollback)
+            {
+                txn.rollback();
+            }
         }
 
     }
 
+    private AMQQueue getValidatedDestinationQueue(String queueName)
+    {
+        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+        if (toQueue == null)
+        {
+            throw new IllegalArgumentException("Queue '" + queueName + "' is not registered with the virtualhost.");
+        }
+        else if (toQueue == this)
+        {
+            throw new IllegalArgumentException("The destination queue can't be the same as the source queue");
+        }
+        return toQueue;
+    }
+
     public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
     {
 
@@ -1515,10 +1535,16 @@ public class SimpleAMQQueue implements A
                 for(final QueueEntry entry : entries)
                 {
                     adapter.setEntry(entry);
-                    final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter);
+                    List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+                    if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
+                    {
+                        queues = _alternateExchange.getAlternateExchange().route(adapter);
+                    }
+
                     final ServerMessage message = entry.getMessage();
-                    if(rerouteQueues != null && rerouteQueues.size() != 0)
+                    if(queues != null && queues.size() != 0)
                     {
+                        final List<? extends BaseQueue> rerouteQueues = queues;
                         txn.enqueue(rerouteQueues, entry.getMessage(),
                                     new ServerTransaction.Action()
                                     {
@@ -1716,6 +1742,7 @@ public class SimpleAMQQueue implements A
     {
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+        boolean queueEmpty = false;
 
         try
         {
@@ -1733,12 +1760,9 @@ public class SimpleAMQQueue implements A
                     }
 
                     atTail = attemptDelivery(sub, true);
-                    if (atTail && !sub.isSuspended() && sub.isAutoClose())
+                    if (atTail && getNextAvailableEntry(sub) == null)
                     {
-                        unregisterSubscription(sub);
-
-                        sub.confirmAutoClose();
-
+                        queueEmpty = true;
                     }
                     else if (!atTail)
                     {
@@ -1760,6 +1784,11 @@ public class SimpleAMQQueue implements A
             {
                 sub.releaseSendLock();
             }
+            if(queueEmpty)
+            {
+                sub.queueEmpty();
+            }
+
             sub.flushBatched();
 
         }
@@ -1895,7 +1924,7 @@ public class SimpleAMQQueue implements A
         if(context != null)
         {
             QueueEntry releasedNode = context.getReleasedEntry();
-            return releasedNode == null || releasedNode.compareTo(entry) < 0;
+            return releasedNode != null && releasedNode.compareTo(entry) < 0;
         }
         else
         {
@@ -1982,13 +2011,9 @@ public class SimpleAMQQueue implements A
                             if (subscriptionDone)
                             {
                                 sub.flushBatched();
-                                //close autoClose subscriptions if we are not currently intent on continuing
-                                if (lastLoop && !sub.isSuspended() && sub.isAutoClose())
+                                if (lastLoop && !sub.isSuspended())
                                 {
-
-                                    unregisterSubscription(sub);
-
-                                    sub.confirmAutoClose();
+                                    sub.queueEmpty();
                                 }
                                 break;
                             }
@@ -2064,9 +2089,13 @@ public class SimpleAMQQueue implements A
             // Only process nodes that are not currently deleted and not dequeued
             if (!node.isDispensed())
             {
-                // If the node has exired then aquire it
+                // If the node has exired then acquire it
                 if (node.expired() && node.acquire())
                 {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Dequeuing expired node " + node);
+                    }
                     // Then dequeue it.
                     dequeueEntry(node);
                 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Fri May  4 15:39:19 2012
@@ -104,7 +104,7 @@ public class SimpleQueueEntryList implem
         }
     }
 
-    protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message)
+    protected SimpleQueueEntryImpl createQueueEntry(ServerMessage<?> message)
     {
         return new SimpleQueueEntryImpl(this, message);
     }
@@ -116,7 +116,6 @@ public class SimpleQueueEntryList implem
 
     public static class QueueEntryIteratorImpl implements QueueEntryIterator<SimpleQueueEntryImpl>
     {
-
         private SimpleQueueEntryImpl _lastNode;
 
         QueueEntryIteratorImpl(SimpleQueueEntryImpl startNode)
@@ -124,10 +123,9 @@ public class SimpleQueueEntryList implem
             _lastNode = startNode;
         }
 
-
         public boolean atTail()
         {
-            return _lastNode.getNextNode() == null;
+            return _lastNode.getNextValidEntry() == null;
         }
 
         public SimpleQueueEntryImpl getNode()
@@ -137,28 +135,17 @@ public class SimpleQueueEntryList implem
 
         public boolean advance()
         {
+            SimpleQueueEntryImpl nextValidNode = _lastNode.getNextValidEntry();
 
-            if(!atTail())
+            if(nextValidNode != null)
             {
-                SimpleQueueEntryImpl nextNode = _lastNode.getNextNode();
-                while(nextNode.isDispensed() && nextNode.getNextNode() != null)
-                {
-                    nextNode = nextNode.getNextNode();
-                }
-                _lastNode = nextNode;
-                return true;
-
-            }
-            else
-            {
-                return false;
+                _lastNode = nextValidNode;
             }
 
+            return nextValidNode != null;
         }
-
     }
 
-
     public QueueEntryIteratorImpl iterator()
     {
         return new QueueEntryIteratorImpl(_head);

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Fri May  4 15:39:19 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
+import java.util.UUID;
 
 public class SortedQueue extends OutOfOrderQueue
 {
@@ -33,12 +34,12 @@ public class SortedQueue extends OutOfOr
     private final Object _sortedQueueLock = new Object();
     private final String _sortedPropertyName;
 
-    protected SortedQueue(final String name, final boolean durable,
-                            final String owner, final boolean autoDelete, final boolean exclusive,
-                            final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+    protected SortedQueue(UUID id, final String name,
+                            final boolean durable, final String owner, final boolean autoDelete,
+                            final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost,
-                new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+        super(id, name, durable, owner, autoDelete, exclusive,
+                virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
         this._sortedPropertyName = sortedPropertyName;
     }
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Fri May  4 15:39:19 2012
@@ -186,19 +186,6 @@ public abstract class ApplicationRegistr
         _qmfService = qmfService;
     }
 
-    static
-    {
-        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownService()));
-    }
-
-    private static class ShutdownService implements Runnable
-    {
-        public void run()
-        {
-            remove();
-        }
-    }
-
     public static void initialise(IApplicationRegistry instance) throws Exception
     {
         if(instance == null)
@@ -273,7 +260,6 @@ public abstract class ApplicationRegistr
                     _logger.info("Shutting down ApplicationRegistry(" + instance + ")");
                 }
                 instance.close();
-                instance.getBroker().getSystem().removeBroker(instance.getBroker());
             }
         }
         catch (Exception e)
@@ -536,35 +522,49 @@ public abstract class ApplicationRegistr
         }
     }
 
-
     public void close()
     {
         if (_logger.isInfoEnabled())
         {
             _logger.info("Shutting down ApplicationRegistry:" + this);
         }
-        
-        //Stop Statistics Reporting
-        if (_reportingTimer != null)
+
+        //Set the Actor for Broker Shutdown
+        CurrentActor.set(new BrokerActor(getRootMessageLogger()));
+        try
         {
-            _reportingTimer.cancel();
-        }
+            //Stop Statistics Reporting
+            if (_reportingTimer != null)
+            {
+                _reportingTimer.cancel();
+            }
+
+            //Stop incoming connections
+            unbind();
 
-        //Stop incoming connections
-        unbind();
+            //Shutdown virtualhosts
+            close(_virtualHostRegistry);
 
-        //Shutdown virtualhosts
-        close(_virtualHostRegistry);
+            close(_authenticationManager);
 
-        close(_authenticationManager);
+            close(_qmfService);
 
-        close(_qmfService);
+            close(_pluginManager);
 
-        close(_pluginManager);
+            close(_managedObjectRegistry);
 
-        close(_managedObjectRegistry);
+            BrokerConfig broker = getBroker();
+            if(broker != null)
+            {
+                broker.getSystem().removeBroker(broker);
+            }
 
-        CurrentActor.get().message(BrokerMessages.STOPPED());
+            CurrentActor.get().message(BrokerMessages.STOPPED());
+        }
+        finally
+        {
+            CurrentActor.remove();
+        }
     }
 
     private void unbind()
@@ -664,7 +664,7 @@ public abstract class ApplicationRegistr
 
     public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
     {
-        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
+        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
         _virtualHostRegistry.registerVirtualHost(virtualHost);
         getBroker().addVirtualHost(virtualHost);
         return virtualHost;

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/BrokerConfigAdapter.java Fri May  4 15:39:19 2012
@@ -114,7 +114,6 @@ public class BrokerConfigAdapter impleme
 
     public void addVirtualHost(final VirtualHostConfig virtualHost)
     {
-        virtualHost.setBroker(this);
         _vhosts.put(virtualHost.getId(), virtualHost);
         getConfigStore().addConfiguredObject(virtualHost);
 

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Fri May  4 15:39:19 2012
@@ -25,8 +25,6 @@ import org.osgi.framework.BundleContext;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.management.JMXManagedObjectRegistry;
 import org.apache.qpid.server.management.NoopManagedObjectRegistry;
 
@@ -45,22 +43,6 @@ public class ConfigurationFileApplicatio
     }
 
     @Override
-    public void close()
-    {
-        //Set the Actor for Broker Shutdown
-        CurrentActor.set(new BrokerActor(getRootMessageLogger()));
-        try
-        {
-            super.close();
-        }
-        finally
-        {
-            CurrentActor.remove();
-        }
-    }
-
-
-    @Override
     protected void initialiseManagedObjectRegistry() throws AMQException
     {
         if (getConfiguration().getManagementEnabled())

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Fri May  4 15:39:19 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
+import javax.security.auth.callback.CallbackHandler;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
@@ -88,4 +89,6 @@ public interface AuthenticationManager e
      * @return authentication result
      */
     AuthenticationResult authenticate(String username, String password);
+
+    CallbackHandler getHandler(String mechanism);
 }

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Fri May  4 15:39:19 2012
@@ -300,6 +300,11 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
+    public CallbackHandler getHandler(String mechanism)
+    {
+        return _callbackHandlerMap.get(mechanism);
+    }
+
     /**
      * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
      */

Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1334037&r1=1334036&r2=1334037&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Fri May  4 15:39:19 2012
@@ -58,4 +58,4 @@ public class AnonymousSaslServerFactory 
             return new String[]{AnonymousSaslServer.MECHANISM};
         }
     }
-}
\ No newline at end of file
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message