qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1333027 [9/13] - in /qpid/branches/qpid-3767/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/bindings/qpid/ruby/lib/qpid/ cpp/docs/a...
Date Wed, 02 May 2012 13:10:03 GMT
Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Wed May  2 13:09:18 2012
@@ -134,7 +134,7 @@ public class QueueBindHandler implements
                     Map<String, Object> oldArgs = oldBinding.getArguments();
                     if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
                     {
-                        virtualHost.getBindingFactory().replaceBinding(bindingKey, queue, exch, arguments);    
+                        virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments);
                     }
                 }
             }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Wed May  2 13:09:18 2012
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.MethodRegistry;
 import org.apache.qpid.framing.QueueDeclareBody;
 import org.apache.qpid.framing.QueueDeclareOkBody;
@@ -31,6 +32,7 @@ import org.apache.qpid.protocol.AMQConst
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -43,6 +45,7 @@ import org.apache.qpid.server.store.Dura
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -65,7 +68,7 @@ public class QueueDeclareHandler impleme
         VirtualHost virtualHost = protocolConnection.getVirtualHost();
         ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+        DurableConfigurationStore store = virtualHost.getMessageStore();
 
         final AMQShortString queueName;
 
@@ -219,10 +222,11 @@ public class QueueDeclareHandler impleme
             throws AMQException
     {
         final QueueRegistry registry = virtualHost.getQueueRegistry();
-        AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+        String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null;
 
-        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
-                                                                  body.getExclusive(),virtualHost, body.getArguments());
+        Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments());
+        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), AMQShortString.toString(queueName), body.getDurable(), owner, body.getAutoDelete(),
+                                                                  body.getExclusive(),virtualHost, arguments);
 
         if (body.getExclusive() && !body.getDurable())
         {

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Wed May  2 13:09:18 2012
@@ -62,7 +62,7 @@ public class QueueDeleteHandler implemen
         AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
         VirtualHost virtualHost = protocolConnection.getVirtualHost();
         QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
-        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+        DurableConfigurationStore store = virtualHost.getMessageStore();
 
 
         AMQChannel channel = protocolConnection.getChannel(channelId);

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/ConfigStore_logmessages.properties Wed May  2 13:09:18 2012
@@ -18,8 +18,7 @@
 #
 # Default File used for all non-defined locales.
 
-# 0 - name
-CREATED = CFG-1001 : Created : {0}
+CREATED = CFG-1001 : Created
 # 0 - path
 STORE_LOCATION = CFG-1002 : Store location : {0}
 CLOSE = CFG-1003 : Closed

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/MessageStore_logmessages.properties Wed May  2 13:09:18 2012
@@ -18,11 +18,11 @@
 #
 # Default File used for all non-defined locales.
 #
-# 0 - name
-CREATED = MST-1001 : Created : {0}
+CREATED = MST-1001 : Created
 # 0 - path
 STORE_LOCATION = MST-1002 : Store location : {0}
 CLOSED = MST-1003 : Closed
 RECOVERY_START = MST-1004 : Recovery Start
 RECOVERED = MST-1005 : Recovered {0,number} messages
-RECOVERY_COMPLETE = MST-1006 : Recovery Complete
\ No newline at end of file
+RECOVERY_COMPLETE = MST-1006 : Recovery Complete
+PASSIVATE = MST-1007 : Store Passivated

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/TransactionLog_logmessages.properties Wed May  2 13:09:18 2012
@@ -19,8 +19,7 @@
 # Default File used for all non-defined locales.
 #
 #
-# 0 - name
-CREATED = TXN-1001 : Created : {0}
+CREATED = TXN-1001 : Created
 # 0 - path
 STORE_LOCATION = TXN-1002 : Store location : {0}
 CLOSED = TXN-1003 : Closed

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java Wed May  2 13:09:18 2012
@@ -40,7 +40,8 @@ public class BindingLogSubject extends A
     public BindingLogSubject(String routingKey, Exchange exchange,
                              AMQQueue queue)
     {
-        setLogStringWithFormat(BINDING_FORMAT, queue.getVirtualHost().getName(),
+        setLogStringWithFormat(BINDING_FORMAT,
+                               queue.getVirtualHost().getName(),
                                exchange.getTypeShortString(),
                                exchange.getNameShortString(),
                                queue.getNameShortString(),

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/subjects/MessageStoreLogSubject.java Wed May  2 13:09:18 2012
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.server.logging.subjects;
 
-import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.STORE_FORMAT;
@@ -28,10 +27,9 @@ import static org.apache.qpid.server.log
 public class MessageStoreLogSubject extends AbstractLogSubject
 {
 
-    /** Create an ExchangeLogSubject that Logs in the following format. */
-    public MessageStoreLogSubject(VirtualHost vhost, MessageStore store)
+    /** Create an MessageStoreLogSubject that Logs in the following format. */
+    public MessageStoreLogSubject(VirtualHost vhost, String messageStoreName)
     {
-        setLogStringWithFormat(STORE_FORMAT, vhost.getName(),
-                               store.getClass().getSimpleName());
+        setLogStringWithFormat(STORE_FORMAT, vhost.getName(), messageStoreName);
     }
 }

Propchange: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/management/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/management:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/management/AbstractAMQManagedConnectionObject.java Wed May  2 13:09:18 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.management;
 
 import org.apache.qpid.management.common.mbeans.ManagedConnection;

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Wed May  2 13:09:18 2012
@@ -58,6 +58,7 @@ import org.apache.qpid.server.state.AMQS
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.subscription.ClientDeliveryMethod;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionImpl;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
 import org.apache.qpid.transport.Sender;
@@ -1455,7 +1456,7 @@ public class AMQProtocolEngine implement
                 throws AMQException
         {
             registerMessageDelivered(entry.getMessage().getSize());
-            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, sub.getConsumerTag());
+            _protocolOutputConverter.writeDeliver(entry, _channelId, deliveryTag, ((SubscriptionImpl)sub).getConsumerTag());
             entry.incrementDeliveryCount();
         }
 

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AmqpProtocolVersion.java Wed May  2 13:09:18 2012
@@ -20,4 +20,4 @@
  */
 package org.apache.qpid.server.protocol;
 
-public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10 }
\ No newline at end of file
+public enum AmqpProtocolVersion { v0_8, v0_9, v0_9_1, v0_10, v1_0_0 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Wed May  2 13:09:18 2012
@@ -175,6 +175,28 @@ public class MultiVersionProtocolEngine 
                          (byte) 10
             };
 
+    private static final byte[] AMQP_1_0_0_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 0,
+                         (byte) 1,
+                         (byte) 0,
+                         (byte) 0
+            };
+
+    private static final byte[] AMQP_SASL_1_0_0_HEADER =
+            new byte[] { (byte) 'A',
+                         (byte) 'M',
+                         (byte) 'Q',
+                         (byte) 'P',
+                         (byte) 3,
+                         (byte) 1,
+                         (byte) 0,
+                         (byte) 0
+            };
+
     public void setNetworkConnection(NetworkConnection networkConnection)
     {
         setNetworkConnection(networkConnection, networkConnection.getSender());
@@ -289,8 +311,48 @@ public class MultiVersionProtocolEngine 
         }
     };
 
+    private DelegateCreator creator_1_0_0 = new DelegateCreator()
+    {
+
+        public AmqpProtocolVersion getVersion()
+        {
+            return AmqpProtocolVersion.v1_0_0;
+        }
+
+
+        public byte[] getHeaderIdentifier()
+        {
+            return AMQP_1_0_0_HEADER;
+        }
+
+        public ServerProtocolEngine getProtocolEngine()
+        {
+            return new ProtocolEngine_1_0_0(_appRegistry,_id);
+        }
+    };
+
+    private DelegateCreator creator_1_0_0_SASL = new DelegateCreator()
+    {
+
+        public AmqpProtocolVersion getVersion()
+        {
+            return AmqpProtocolVersion.v1_0_0;
+        }
+
+
+        public byte[] getHeaderIdentifier()
+        {
+            return AMQP_SASL_1_0_0_HEADER;
+        }
+
+        public ServerProtocolEngine getProtocolEngine()
+        {
+            return new ProtocolEngine_1_0_0_SASL(_network, _appRegistry, _id);
+        }
+    };
+
     private final DelegateCreator[] _creators =
-            new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10 };
+            new DelegateCreator[] { creator_0_8, creator_0_9, creator_0_9_1, creator_0_10, creator_1_0_0_SASL, creator_1_0_0 };
 
 
     private class ClosedDelegateProtocolEngine implements ServerProtocolEngine

Propchange: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Wed May  2 13:09:18 2012
@@ -0,0 +1,6 @@
+/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:886720-886722,887145,892761,930288
+/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:795950-829653
+/qpid/branches/java-network-refactor/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:805429-821809
+/qpid/branches/jmx_mc_gsoc09/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:787599
+/qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:742626,743015,743028-743029,743304,743306,743311,743357,744113,747363,747367,747369-747370,747376,747783,747868-747870,747875,748561,748591,748641,748680,748686,749149,749282,749285,749315,749340,749572,753219-753220,753253,754934,754958,755256,757258,757270,758730,759097,760919,761721,762365,762992,763959,764026,764109,764140,764790,1073294-1299226,1306564-1332660

Propchange: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java Wed May  2 13:09:18 2012
@@ -23,19 +23,20 @@ package org.apache.qpid.server.queue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
+import java.util.UUID;
 
 public class AMQPriorityQueue extends OutOfOrderQueue
 {
-    protected AMQPriorityQueue(final String name,
+    protected AMQPriorityQueue(UUID id,
+                                final String name,
                                 final boolean durable,
                                 final String owner,
                                 final boolean autoDelete,
                                 boolean exclusive,
                                 final VirtualHost virtualHost,
-                                Map<String, Object> arguments,
-                                int priorities)
+                                Map<String, Object> arguments, int priorities)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
+        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
     }
 
     public int getPriorities()

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Wed May  2 13:09:18 2012
@@ -20,6 +20,10 @@
  */
 package org.apache.qpid.server.queue;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
@@ -30,12 +34,10 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import java.util.HashMap;
-import java.util.Map;
-
 public class AMQQueueFactory
 {
     public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
@@ -166,8 +168,13 @@ public class AMQQueueFactory
             }
     };
 
-
-    /** @see #createAMQQueueImpl(String, boolean, String, boolean, boolean, VirtualHost, Map) */
+    /**
+     * Creates a new queue with a random id.
+     *
+     * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map)
+     * @deprecated because only called from unit tests
+     * */
+    @Deprecated
     public static AMQQueue createAMQQueueImpl(AMQShortString name,
                                               boolean durable,
                                               AMQShortString owner,
@@ -175,22 +182,28 @@ public class AMQQueueFactory
                                               boolean exclusive,
                                               VirtualHost virtualHost, final FieldTable arguments) throws AMQException
     {
-        return createAMQQueueImpl(name == null ? null : name.toString(),
+        return createAMQQueueImpl(UUIDGenerator.generateUUID(),
+                                  name == null ? null : name.toString(),
                                   durable,
                                   owner == null ? null : owner.toString(),
                                   autoDelete,
-                                  exclusive,
-                                  virtualHost, FieldTable.convertToMap(arguments));
+                                  exclusive, virtualHost, FieldTable.convertToMap(arguments));
     }
 
-
-    public static AMQQueue createAMQQueueImpl(String queueName,
+    /**
+     * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value.
+     */
+    public static AMQQueue createAMQQueueImpl(UUID id,
+                                              String queueName,
                                               boolean durable,
                                               String owner,
                                               boolean autoDelete,
-                                              boolean exclusive,
-                                              VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
+                                              boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
     {
+        if (id == null)
+        {
+            throw new IllegalArgumentException("Queue id must not be null");
+        }
         if (queueName == null)
         {
             throw new IllegalArgumentException("Queue name must not be null");
@@ -241,19 +254,19 @@ public class AMQQueueFactory
         AMQQueue q;
         if(sortingKey != null)
         {
-            q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+            q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
         }
         else if(conflationKey != null)
         {
-            q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
+            q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
         }
         else if(priorities > 1)
         {
-            q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
+            q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
         }
         else
         {
-            q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
+            q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
         }
 
         //Register the new queue
@@ -287,12 +300,12 @@ public class AMQQueueFactory
 
                 if(dlExchange == null)
                 {
-                    dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+                    dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
 
                     exchangeRegistry.registerExchange(dlExchange);
 
                     //enter the dle in the persistent store
-                    virtualHost.getDurableConfigurationStore().createExchange(dlExchange);
+                    virtualHost.getMessageStore().createExchange(dlExchange);
                 }
             }
 
@@ -309,10 +322,10 @@ public class AMQQueueFactory
                     args.put(X_QPID_DLQ_ENABLED, false);
                     args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
 
-                    dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+                    dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
 
                     //enter the dlq in the persistent store
-                    virtualHost.getDurableConfigurationStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
+                    virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
                 }
             }
 
@@ -364,7 +377,10 @@ public class AMQQueueFactory
             arguments.put(X_QPID_DLQ_ENABLED, true);
         }
 
-        AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
+        // we need queues that are defined in config to have deterministic ids.
+        UUID id = UUIDGenerator.generateUUID(queueName, host.getName());
+
+        AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
         q.configure(config);
         return q;
     }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java Wed May  2 13:09:18 2012
@@ -21,22 +21,23 @@
 
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
 import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ConflationQueue extends SimpleAMQQueue
 {
-    protected ConflationQueue(String name,
+    protected ConflationQueue(UUID id,
+                              String name,
                               boolean durable,
                               String owner,
                               boolean autoDelete,
                               boolean exclusive,
                               VirtualHost virtualHost,
-                              Map<String, Object> args,
-                              String conflationKey)
+                              Map<String, Object> args, String conflationKey)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
+        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
     }
 
     public String getConflationKey()

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Wed May  2 13:09:18 2012
@@ -20,15 +20,21 @@
  */
 package org.apache.qpid.server.queue;
 
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 public class DefaultQueueRegistry implements QueueRegistry
 {
+    private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
+
     private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
 
     private final VirtualHost _virtualHost;
@@ -72,4 +78,36 @@ public class DefaultQueueRegistry implem
     {
         return getQueue(new AMQShortString(queue));
     }
+
+    @Override
+    public void stopAllAndUnregisterMBeans()
+    {
+        for (final AMQQueue queue : getQueues())
+        {
+            queue.stop();
+            try
+            {
+                queue.getManagedObject().unregister();
+            }
+            catch (AMQException e)
+            {
+                LOGGER.warn("Failed to unregister mbean", e);
+            }
+        }
+        _queueMap.clear();
+    }
+
+    @Override
+    public synchronized AMQQueue getQueue(UUID queueId)
+    {
+        Collection<AMQQueue> queues = _queueMap.values();
+        for (AMQQueue queue : queues)
+        {
+            if (queue.getId().equals(queueId))
+            {
+                return queue;
+            }
+        }
+        return null;
+    }
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Wed May  2 13:09:18 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.server.subscription.Subscription;
@@ -5,15 +25,16 @@ import org.apache.qpid.server.subscripti
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
+import java.util.UUID;
 
 public abstract class OutOfOrderQueue extends SimpleAMQQueue
 {
 
-    protected OutOfOrderQueue(String name, boolean durable, String owner,
-                              boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
-                              QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+    protected OutOfOrderQueue(UUID id, String name, boolean durable,
+                              String owner, boolean autoDelete, boolean exclusive,
+                              VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+        super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
     }
 
     @Override

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Wed May  2 13:09:18 2012
@@ -25,19 +25,19 @@ import org.apache.qpid.server.message.Se
 public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
 {
     private final AMQQueue _queue;
-    private final SimpleQueueEntryList[] _priorityLists;
+    private final PriorityQueueEntrySubList[] _priorityLists;
     private final int _priorities;
     private final int _priorityOffset;
 
     public PriorityQueueList(AMQQueue queue, int priorities)
     {
         _queue = queue;
-        _priorityLists = new SimpleQueueEntryList[priorities];
+        _priorityLists = new PriorityQueueEntrySubList[priorities];
         _priorities = priorities;
         _priorityOffset = 5-((priorities + 1)/2);
         for(int i = 0; i < priorities; i++)
         {
-            _priorityLists[i] = new SimpleQueueEntryList(queue);
+            _priorityLists[i] = new PriorityQueueEntrySubList(queue);
         }
     }
 
@@ -161,4 +161,48 @@ public class PriorityQueueList implement
             return new PriorityQueueList(queue, _priorities);
         }
     }
+
+    private static class PriorityQueueEntrySubList extends SimpleQueueEntryList
+    {
+        public PriorityQueueEntrySubList(AMQQueue queue)
+        {
+            super(queue);
+        }
+
+        @Override
+        protected PriorityQueueEntryImpl createQueueEntry(ServerMessage<?> message)
+        {
+            return new PriorityQueueEntryImpl(this, message);
+        }
+    }
+
+    private static class PriorityQueueEntryImpl extends SimpleQueueEntryImpl
+    {
+        public PriorityQueueEntryImpl(PriorityQueueEntrySubList queueEntryList, ServerMessage<?> message)
+        {
+            super(queueEntryList, message);
+        }
+
+        @Override
+        public int compareTo(final QueueEntry o)
+        {
+            byte thisPriority = getMessageHeader().getPriority();
+            byte otherPriority = o.getMessageHeader().getPriority();
+
+            if(thisPriority != otherPriority)
+            {
+                /*
+                 * Different priorities, so answer can only be greater than or less than
+                 *
+                 * A message with higher priority (e.g. 5) is conceptually 'earlier' in the
+                 * priority queue than one with a lower priority (e.g. 4).
+                 */
+                return thisPriority > otherPriority ? -1 : 1;
+            }
+            else
+            {
+                return super.compareTo(o);
+            }
+        }
+    }
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java Wed May  2 13:09:18 2012
@@ -52,4 +52,13 @@ final class QueueContext implements AMQQ
     {
         return _releasedEntry;
     }
+
+    @Override
+    public String toString()
+    {
+        return "QueueContext{" +
+               "_lastSeenEntry=" + _lastSeenEntry +
+               ", _releasedEntry=" + _releasedEntry +
+               '}';
+    }
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Wed May  2 13:09:18 2012
@@ -227,9 +227,10 @@ public abstract class QueueEntryImpl imp
     public void release()
     {
         EntryState state = _state;
-        
+
         if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
+
             if(state instanceof SubscriptionAcquiredState)
             {
                 getQueue().decrementUnackedMsgCount();
@@ -254,6 +255,7 @@ public abstract class QueueEntryImpl imp
                 routeToAlternate();
             }
         }
+
     }
 
     public boolean releaseButRetain()
@@ -267,7 +269,6 @@ public abstract class QueueEntryImpl imp
             Subscription sub = ((SubscriptionAcquiredState) state).getSubscription();
             if(_stateUpdater.compareAndSet(this, state, sub.getAssignedState()))
             {
-                System.err.println("Message released (and retained)" + getMessage().getMessageNumber());
                 getQueue().requeue(this);
                 if(_stateChangeListeners != null)
                 {

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java Wed May  2 13:09:18 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
+import java.util.UUID;
 
 public interface QueueRegistry
 {
@@ -40,4 +41,8 @@ public interface QueueRegistry
     Collection<AMQQueue> getQueues();
 
     AMQQueue getQueue(String queue);
+
+    void stopAllAndUnregisterMBeans();
+
+    AMQQueue getQueue(UUID queueId);
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Wed May  2 13:09:18 2012
@@ -191,29 +191,29 @@ public class SimpleAMQQueue implements A
     private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
     private final MessageGroupManager _messageGroupManager;
 
-    protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
+    protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
     {
-        this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments);
+        this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
     }
 
-    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
+    public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
     {
-        this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
+        this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
     }
 
-    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+    public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
     {
-        this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+        this(id, queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
     }
 
-    protected SimpleAMQQueue(AMQShortString name,
+    protected SimpleAMQQueue(UUID id,
+                             AMQShortString name,
                              boolean durable,
                              AMQShortString owner,
                              boolean autoDelete,
                              boolean exclusive,
                              VirtualHost virtualHost,
-                             QueueEntryListFactory entryListFactory,
-                             Map<String,Object> arguments)
+                             QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
     {
 
         if (name == null)
@@ -236,7 +236,7 @@ public class SimpleAMQQueue implements A
         _entries = entryListFactory.createQueueEntryList(this);
         _arguments = arguments;
 
-        _id = virtualHost.getConfigStore().createId();
+        _id = id;
 
         _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
 
@@ -346,7 +346,7 @@ public class SimpleAMQQueue implements A
 
         if(isDurable())
         {
-            getVirtualHost().getDurableConfigurationStore().updateQueue(this);
+            getVirtualHost().getMessageStore().updateQueue(this);
         }
     }
 
@@ -862,7 +862,6 @@ public class SimpleAMQQueue implements A
 
     public void requeue(QueueEntry entry)
     {
-
         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
         // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
         while (subscriberIter.advance() && entry.isAvailable())
@@ -1743,6 +1742,7 @@ public class SimpleAMQQueue implements A
     {
         boolean atTail = false;
         final boolean keepSendLockHeld = iterations <=  SimpleAMQQueue.MAX_ASYNC_DELIVERIES;
+        boolean queueEmpty = false;
 
         try
         {
@@ -1760,12 +1760,9 @@ public class SimpleAMQQueue implements A
                     }
 
                     atTail = attemptDelivery(sub, true);
-                    if (atTail && !sub.isSuspended() && sub.isAutoClose())
+                    if (atTail && getNextAvailableEntry(sub) == null)
                     {
-                        unregisterSubscription(sub);
-
-                        sub.confirmAutoClose();
-
+                        queueEmpty = true;
                     }
                     else if (!atTail)
                     {
@@ -1787,6 +1784,11 @@ public class SimpleAMQQueue implements A
             {
                 sub.releaseSendLock();
             }
+            if(queueEmpty)
+            {
+                sub.queueEmpty();
+            }
+
             sub.flushBatched();
 
         }
@@ -2009,13 +2011,9 @@ public class SimpleAMQQueue implements A
                             if (subscriptionDone)
                             {
                                 sub.flushBatched();
-                                //close autoClose subscriptions if we are not currently intent on continuing
-                                if (lastLoop && !sub.isSuspended() && sub.isAutoClose())
+                                if (lastLoop && !sub.isSuspended())
                                 {
-
-                                    unregisterSubscription(sub);
-
-                                    sub.confirmAutoClose();
+                                    sub.queueEmpty();
                                 }
                                 break;
                             }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Wed May  2 13:09:18 2012
@@ -104,7 +104,7 @@ public class SimpleQueueEntryList implem
         }
     }
 
-    protected SimpleQueueEntryImpl createQueueEntry(ServerMessage message)
+    protected SimpleQueueEntryImpl createQueueEntry(ServerMessage<?> message)
     {
         return new SimpleQueueEntryImpl(this, message);
     }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java Wed May  2 13:09:18 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
+import java.util.UUID;
 
 public class SortedQueue extends OutOfOrderQueue
 {
@@ -33,12 +34,12 @@ public class SortedQueue extends OutOfOr
     private final Object _sortedQueueLock = new Object();
     private final String _sortedPropertyName;
 
-    protected SortedQueue(final String name, final boolean durable,
-                            final String owner, final boolean autoDelete, final boolean exclusive,
-                            final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+    protected SortedQueue(UUID id, final String name,
+                            final boolean durable, final String owner, final boolean autoDelete,
+                            final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
     {
-        super(name, durable, owner, autoDelete, exclusive, virtualHost,
-                new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+        super(id, name, durable, owner, autoDelete, exclusive,
+                virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
         this._sortedPropertyName = sortedPropertyName;
     }
 

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Wed May  2 13:09:18 2012
@@ -664,7 +664,7 @@ public abstract class ApplicationRegistr
 
     public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
     {
-        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig, null);
+        VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
         _virtualHostRegistry.registerVirtualHost(virtualHost);
         getBroker().addVirtualHost(virtualHost);
         return virtualHost;

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/AuthenticationManager.java Wed May  2 13:09:18 2012
@@ -24,6 +24,7 @@ import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.plugins.Plugin;
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 
+import javax.security.auth.callback.CallbackHandler;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 
@@ -88,4 +89,6 @@ public interface AuthenticationManager e
      * @return authentication result
      */
     AuthenticationResult authenticate(String username, String password);
+
+    CallbackHandler getHandler(String mechanism);
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java Wed May  2 13:09:18 2012
@@ -300,6 +300,11 @@ public class PrincipalDatabaseAuthentica
         }
     }
 
+    public CallbackHandler getHandler(String mechanism)
+    {
+        return _callbackHandlerMap.get(mechanism);
+    }
+
     /**
      * @see org.apache.qpid.server.security.auth.manager.AuthenticationManager#authenticate(String, String)
      */

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/sasl/anonymous/AnonymousSaslServerFactory.java Wed May  2 13:09:18 2012
@@ -58,4 +58,4 @@ public class AnonymousSaslServerFactory 
             return new String[]{AnonymousSaslServer.MECHANISM};
         }
     }
-}
\ No newline at end of file
+}

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Wed May  2 13:09:18 2012
@@ -32,19 +32,19 @@ public interface ConfigurationRecoveryHa
 
     public static interface QueueRecoveryHandler
     {
-        void queue(String queueName, String owner, boolean exclusive, FieldTable arguments);
+        void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments);
         ExchangeRecoveryHandler completeQueueRecovery();
     }
 
     public static interface ExchangeRecoveryHandler
     {
-        void exchange(String exchangeName, String type, boolean autoDelete);
+        void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
         BindingRecoveryHandler completeExchangeRecovery();
     }
 
     public static interface BindingRecoveryHandler
     {
-        void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
+        void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
         BrokerLinkRecoveryHandler completeBindingRecovery();
     }
     
@@ -60,13 +60,4 @@ public interface ConfigurationRecoveryHa
         void completeBridgeRecoveryForLink();
     }
 
-    public static interface QueueEntryRecoveryHandler
-    {
-        void complete();
-
-        void queueEntry(String queueName, long messageId);
-    }
-
-
-
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java Wed May  2 13:09:18 2012
@@ -23,12 +23,11 @@ package org.apache.qpid.server.store;
 import org.apache.commons.configuration.Configuration;
 
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.federation.Bridge;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
 
 public interface DurableConfigurationStore
@@ -36,23 +35,21 @@ public interface DurableConfigurationSto
 
     public static interface Source
     {
-        DurableConfigurationStore getDurableConfigurationStore();
+        DurableConfigurationStore getMessageStore();
     }
 
     /**
      * Called after instantiation in order to configure the message store. A particular implementation can define
      * whatever parameters it wants.
      *
-     * @param name             The name to be used by this storem
+     * @param name             The name to be used by this store
      * @param recoveryHandler  Handler to be called as the store recovers on start up
      * @param config           The apache commons configuration object.
-     *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
     void configureConfigStore(String name,
                               ConfigurationRecoveryHandler recoveryHandler,
-                              Configuration config,
-                              LogSubject logSubject) throws Exception;
+                              Configuration config) throws Exception;
     /**
      * Makes the specified exchange persistent.
      *
@@ -72,28 +69,22 @@ public interface DurableConfigurationSto
     void removeExchange(Exchange exchange) throws AMQStoreException;
 
     /**
-     * Binds the specified queue to an exchange with a routing key.
+     * Store the queue binding.
      *
-     * @param exchange   The exchange to bind to.
-     * @param routingKey The routing key to bind by.
-     * @param queue      The queue to bind.
-     * @param args       Additional parameters.
+     * @param binding queue binding
      *
      * @throws AMQStoreException if the operation fails for any reason.
      */
-    void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
+    void bindQueue(Binding binding) throws AMQStoreException;
 
     /**
-     * Unbinds the specified from an exchange under a particular routing key.
+     * Removes queue binding
      *
-     * @param exchange   The exchange to unbind from.
-     * @param routingKey The routing key to unbind.
-     * @param queue      The queue to unbind.
-     * @param args       Additonal parameters.
+     * @param binding queue binding to remove
      *
      * @throws AMQStoreException If the operation fails for any reason.
      */
-    void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
+    void unbindQueue(Binding binding) throws AMQStoreException;
 
     /**
      * Makes the specified queue persistent.

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Wed May  2 13:09:18 2012
@@ -21,102 +21,86 @@
 package org.apache.qpid.server.store;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.federation.Bridge;
-import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
-import org.apache.qpid.server.logging.messages.MessageStoreMessages;
 import org.apache.qpid.server.message.EnqueableMessage;
-import org.apache.qpid.server.queue.AMQQueue;
 
-import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-/** A simple message store that stores the messages in a threadsafe structure in memory. */
-public class MemoryMessageStore implements MessageStore, DurableConfigurationStore
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryMessageStore extends NullMessageStore
 {
-    private static final Logger _log = Logger.getLogger(MemoryMessageStore.class);
-
-    private static final int DEFAULT_HASHTABLE_CAPACITY = 50000;
-
-    private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
-
-
     private final AtomicLong _messageId = new AtomicLong(1);
-    private AtomicBoolean _closed = new AtomicBoolean(false);
-    private LogSubject _logSubject;
+    private final AtomicBoolean _closed = new AtomicBoolean(false);
 
     private static final Transaction IN_MEMORY_TRANSACTION = new Transaction()
     {
-        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
+        @Override
+        public StoreFuture commitTranAsync() throws AMQStoreException
         {
+            return StoreFuture.IMMEDIATE_FUTURE;
         }
 
-        public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
+        @Override
+        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
-        public void commitTran() throws AMQStoreException
+        @Override
+        public void dequeueMessage(TransactionLogResource  queue, EnqueableMessage message) throws AMQStoreException
         {
         }
 
-        public StoreFuture commitTranAsync() throws AMQStoreException
+        @Override
+        public void commitTran() throws AMQStoreException
         {
-            return IMMEDIATE_FUTURE;
         }
 
+        @Override
         public void abortTran() throws AMQStoreException
         {
         }
 
+        @Override
         public void removeXid(long format, byte[] globalId, byte[] branchId)
         {
         }
 
+        @Override
         public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
         {
         }
-
     };
 
-    public void configureConfigStore(String name, ConfigurationRecoveryHandler handler, Configuration configuration, LogSubject logSubject) throws Exception
-    {
-        _logSubject = logSubject;
-        CurrentActor.get().message(_logSubject, ConfigStoreMessages.CREATED(this.getClass().getName()));
-
+    private final StateManager _stateManager;
+    private final EventManager _eventManager = new EventManager();
 
+    public MemoryMessageStore()
+    {
+        _stateManager = new StateManager(_eventManager);
     }
 
-    public void configureMessageStore(String name,
-                                      MessageStoreRecoveryHandler recoveryHandler,
-                                      Configuration config,
-                                      LogSubject logSubject) throws Exception
+    @Override
+    public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
     {
-        if(_logSubject == null)
-        {
-            _logSubject = logSubject;
-        }
-        int hashtableCapacity = config.getInt(name + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
-        _log.info("Using capacity " + hashtableCapacity + " for hash tables");
-        CurrentActor.get().message(_logSubject, MessageStoreMessages.CREATED(this.getClass().getName()));
+        _stateManager.attainState(State.CONFIGURING);
     }
 
-    public void close() throws Exception
+    @Override
+    public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
     {
-        _closed.getAndSet(true);
-        CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+        _stateManager.attainState(State.CONFIGURED);
+    }
 
+    @Override
+    public void activate() throws Exception
+    {
+        _stateManager.attainState(State.RECOVERING);
+        
+        _stateManager.attainState(State.ACTIVE);
     }
 
+    @Override
     public StoredMessage addMessage(StorableMessageMetaData metaData)
     {
         final long id = _messageId.getAndIncrement();
@@ -125,104 +109,29 @@ public class MemoryMessageStore implemen
         return message;
     }
 
-
-    public void createExchange(Exchange exchange) throws AMQStoreException
-    {
-
-    }
-
-    public void removeExchange(Exchange exchange) throws AMQStoreException
-    {
-
-    }
-
-    public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
-    {
-
-    }
-
-    public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
-    {
-
-    }
-
-
-    public void createQueue(AMQQueue queue) throws AMQStoreException
-    {
-        // Not requred to do anything
-    }
-
-    public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
-    {
-        // Not required to do anything
-    }
-
-    public void removeQueue(final AMQQueue queue) throws AMQStoreException
-    {
-        // Not required to do anything
-    }
-    
-    public void updateQueue(final AMQQueue queue) throws AMQStoreException
-    {
-        // Not required to do anything
-    }
-
-    public void createBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-
-    }
-
-    public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
-    {
-
-    }
-
-    public void createBridge(final Bridge bridge) throws AMQStoreException
-    {
-
-    }
-
-    public void deleteBridge(final Bridge bridge) throws AMQStoreException
-    {
-
-    }
-
-    public void configureTransactionLog(String name,
-                                        TransactionLogRecoveryHandler recoveryHandler,
-                                        Configuration storeConfiguration,
-                                        LogSubject logSubject) throws Exception
-    {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
+    @Override
     public Transaction newTransaction()
     {
         return IN_MEMORY_TRANSACTION;
     }
 
-
-    public List<AMQQueue> createQueues() throws AMQException
+    @Override
+    public boolean isPersistent()
     {
-        return null;
+        return false;
     }
 
-    public Long getNewMessageId()
+    @Override
+    public void close() throws Exception
     {
-        return _messageId.getAndIncrement();
+        _stateManager.attainState(State.CLOSING);
+        _closed.getAndSet(true);
+        _stateManager.attainState(State.CLOSED);
     }
 
-    public boolean isPersistent()
+    @Override
+    public void addEventListener(EventListener eventListener, Event... events)
     {
-        return false;
-    }
-
-    private void checkNotClosed() throws MessageStoreClosedException
-     {
-        if (_closed.get())
-        {
-            throw new MessageStoreClosedException();
-        }
+        _eventManager.addEventListener(eventListener, events);
     }
-
-
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java Wed May  2 13:09:18 2012
@@ -22,13 +22,16 @@ package org.apache.qpid.server.store;
 
 import org.apache.qpid.server.message.MessageMetaData;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.message.MessageMetaData_1_0;
 
 import java.nio.ByteBuffer;
 
 public enum MessageMetaDataType
 {
     META_DATA_0_8  {   public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } },
-    META_DATA_0_10 {   public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } };
+    META_DATA_0_10 {   public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } },
+    META_DATA_1_0 {   public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } };
+
 
 
     public static interface Factory<M extends StorableMessageMetaData>

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Wed May  2 13:09:18 2012
@@ -21,52 +21,29 @@
 package org.apache.qpid.server.store;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.message.EnqueableMessage;
 
 /**
  * MessageStore defines the interface to a storage area, which can be used to preserve the state of messages.
  *
  */
-public interface MessageStore
+public interface MessageStore extends DurableConfigurationStore
 {
-    StoreFuture IMMEDIATE_FUTURE = new StoreFuture()
-        {
-            public boolean isComplete()
-            {
-                return true;
-            }
-
-            public void waitForCompletion()
-            {
-
-            }
-        };
-
-
     /**
      * Called after instantiation in order to configure the message store. A particular implementation can define
      * whatever parameters it wants.
      *
-     * @param name             The name to be used by this storem
-     * @param recoveryHandler  Handler to be called as the store recovers on start up
+     * @param name             The name to be used by this store
+     * @param messageRecoveryHandler  Handler to be called as the store recovers on start up
+     * @param tlogRecoveryHandler
      * @param config           The apache commons configuration object.
-     *
      * @throws Exception If any error occurs that means the store is unable to configure itself.
      */
     void configureMessageStore(String name,
-                               MessageStoreRecoveryHandler recoveryHandler,
-                               Configuration config,
-                               LogSubject logSubject) throws Exception;
-
-    /**
-     * Called to close and cleanup any resources used by the message store.
-     *
-     * @throws Exception If the close fails.
-     */
-    void close() throws Exception;
+                               MessageStoreRecoveryHandler messageRecoveryHandler,
+                               TransactionLogRecoveryHandler tlogRecoveryHandler,
+                               Configuration config) throws Exception;
 
+    void activate() throws Exception;
 
     public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(T metaData);
 
@@ -78,79 +55,16 @@ public interface MessageStore
      */
     boolean isPersistent();
 
-
-
-    public static interface Transaction
-    {
-        /**
-         * Places a message onto a specified queue, in a given transactional context.
-         *
-         *
-         *
-         * @param queue     The queue to place the message on.
-         * @param message
-         * @throws org.apache.qpid.AMQStoreException If the operation fails for any reason.
-         */
-        void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
-
-        /**
-         * Extracts a message from a specified queue, in a given transactional context.
-         *
-         * @param queue     The queue to place the message on.
-         * @param message The message to dequeue.
-         * @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
-         */
-        void dequeueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException;
-
-
-        /**
-         * Commits all operations performed within a given transactional context.
-         *
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        void commitTran() throws AMQStoreException;
-
-        /**
-         * Commits all operations performed within a given transactional context.
-         *
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        StoreFuture commitTranAsync() throws AMQStoreException;
-
-        /**
-         * Abandons all operations performed within a given transactional context.
-         *
-         * @throws AMQStoreException If the operation fails for any reason.
-         */
-        void abortTran() throws AMQStoreException;
-
-
-        public static interface Record
-        {
-            TransactionLogResource getQueue();
-            EnqueableMessage getMessage();
-        }
-
-        void removeXid(long format, byte[] globalId, byte[] branchId) throws AMQStoreException;
-
-        void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
-                throws AMQStoreException;
-    }
-
-    public void configureTransactionLog(String name,
-                      TransactionLogRecoveryHandler recoveryHandler,
-                      Configuration storeConfiguration,
-                      LogSubject logSubject) throws Exception;
-
     Transaction newTransaction();
 
+    /**
+     * Called to close and cleanup any resources used by the message store.
+     *
+     * @throws Exception If the close fails.
+     */
+    void close() throws Exception;
 
+    void addEventListener(EventListener eventListener, Event... events);
 
-    public static interface StoreFuture
-    {
-        boolean isComplete();
-
-        void waitForCompletion();
-    }
-
+    String getStoreLocation();
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java Wed May  2 13:09:18 2012
@@ -122,9 +122,9 @@ public class StoredMemoryMessage impleme
         return buf;
     }
 
-    public MessageStore.StoreFuture flushToStore()
+    public StoreFuture flushToStore()
     {
-        return MessageStore.IMMEDIATE_FUTURE;
+        return StoreFuture.IMMEDIATE_FUTURE;
     }
 
 

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java Wed May  2 13:09:18 2012
@@ -34,7 +34,7 @@ public interface StoredMessage<M extends
 
     ByteBuffer getContent(int offsetInMessage, int size);
 
-    MessageStore.StoreFuture flushToStore();
+    StoreFuture flushToStore();
 
     void remove();
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java Wed May  2 13:09:18 2012
@@ -20,20 +20,22 @@
 */
 package org.apache.qpid.server.store;
 
+import java.util.UUID;
+
 public interface TransactionLogRecoveryHandler
 {
     QueueEntryRecoveryHandler begin(MessageStore log);
 
     public static interface QueueEntryRecoveryHandler
     {
-        void queueEntry(String queuename, long messageId);
-
         DtxRecordRecoveryHandler completeQueueEntryRecovery();
+
+        void queueEntry(UUID queueId, long messageId);
     }
 
     public static interface DtxRecordRecoveryHandler
     {
-        void dtxRecord(long format, byte[] globalId, byte[] branchId, MessageStore.Transaction.Record[] enqueues, MessageStore.Transaction.Record[] dequeues);
+        void dtxRecord(long format, byte[] globalId, byte[] branchId, Transaction.Record[] enqueues, Transaction.Record[] dequeues);
 
         void completeDtxRecordRecovery();
     }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java Wed May  2 13:09:18 2012
@@ -20,7 +20,9 @@
 */
 package org.apache.qpid.server.store;
 
+import java.util.UUID;
+
 public interface TransactionLogResource
 {
-    public String getResourceName();
+    public UUID getId();
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Wed May  2 13:09:18 2012
@@ -54,16 +54,12 @@ public interface Subscription
 
     void setNoLocal(boolean noLocal);
 
-    AMQShortString getConsumerTag();
-
     long getSubscriptionID();
 
     boolean isSuspended();
 
     boolean hasInterest(QueueEntry msg);
 
-    boolean isAutoClose();
-
     boolean isClosed();
 
     boolean acquires();
@@ -105,11 +101,11 @@ public interface Subscription
 
     boolean isActive();
 
-    void confirmAutoClose();
-
     public void set(String key, Object value);
 
     public Object get(String key);
 
     boolean isSessionTransactional();
+
+    void queueEmpty() throws AMQException;
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Wed May  2 13:09:18 2012
@@ -375,7 +375,7 @@ public abstract class SubscriptionImpl i
     {
         return getQueue().getConfigStore();
     }
-    
+
     public Long getDelivered()
     {
         return _deliveredCount.get();
@@ -810,12 +810,22 @@ public abstract class SubscriptionImpl i
     {
         return _channel.isTransactional();
     }
-    
+
     public long getCreateTime()
     {
         return _createTime;
     }
 
+    public void queueEmpty() throws AMQException
+    {
+        if (isAutoClose())
+        {
+            _queue.unregisterSubscription(this);
+
+            confirmAutoClose();
+        }
+    }
+
     public void flushBatched()
     {
         _channel.getProtocolSession().setDeferFlush(false);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message