qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kgiu...@apache.org
Subject svn commit: r1333027 [10/13] - in /qpid/branches/qpid-3767/qpid: ./ cpp/ cpp/bindings/ cpp/bindings/qmf/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp/bindings/qpid/ruby/lib/qpid/ cpp/docs/...
Date Wed, 02 May 2012 13:10:03 GMT
Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Wed May  2 13:09:18 2012
@@ -109,7 +109,7 @@ public class Subscription_0_10 implement
 
                                                 public void stateChange(Subscription sub, State oldState, State newState)
                                                 {
-                                                    CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));    
+                                                    CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString()));
                                                 }
                                             };
     private AMQQueue _queue;
@@ -199,12 +199,7 @@ public class Subscription_0_10 implement
             CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
                     filterLogString.length() > 0));
         }
- 
-    }
 
-    public AMQShortString getConsumerTag()
-    {
-        return new AMQShortString(_destination);
     }
 
     public boolean isSuspended()
@@ -244,12 +239,6 @@ public class Subscription_0_10 implement
         return (_filters == null) || _filters.allAllow(entry);
     }
 
-    public boolean isAutoClose()
-    {
-        // no such thing in 0-10
-        return false;
-    }
-
     public boolean isClosed()
     {
         return getState() == State.CLOSED;
@@ -302,7 +291,7 @@ public class Subscription_0_10 implement
     {
         return getQueue().getConfigStore();
     }
-    
+
     public Long getDelivered()
     {
         return _deliveredCount.get();
@@ -823,11 +812,6 @@ public class Subscription_0_10 implement
         return getState() == State.ACTIVE;
     }
 
-    public void confirmAutoClose()
-    {
-        //No such thing in 0-10
-    }
-
     public void set(String key, Object value)
     {
         _properties.put(key, value);
@@ -1026,6 +1010,10 @@ public class Subscription_0_10 implement
         return _session.isTransactional();
     }
 
+    public void queueEmpty()
+    {
+    }
+
     public long getCreateTime()
     {
         return _createTime;
@@ -1033,7 +1021,7 @@ public class Subscription_0_10 implement
 
     public String toLogString()
     {
-        String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(), 
+        String queueInfo = MessageFormat.format(QUEUE_FORMAT, _queue.getVirtualHost().getName(),
                   _queue.getNameShortString());
         String result = "[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getSubscriptionID()) + "("
                 // queueString is "vh(/{0})/qu({1}) " so need to trim

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Wed May  2 13:09:18 2012
@@ -33,6 +33,7 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.auth.AuthenticationResult;
 import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
 import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.virtualhost.State;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
@@ -177,6 +178,11 @@ public class ServerConnectionDelegate ex
                 sconn.setState(Connection.State.CLOSING);
                 sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Permission denied '"+vhostName+"'"));
             }
+            else if (vhost.getState() != State.ACTIVE)
+            {
+                sconn.setState(Connection.State.CLOSING);
+                sconn.invoke(new ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Virtual host '"+vhostName+"' is not active"));
+            }
             else
             {
                 sconn.setState(Connection.State.OPEN);

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Wed May  2 13:09:18 2012
@@ -72,6 +72,7 @@ import org.apache.qpid.server.queue.Base
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.subscription.Subscription_0_10;
 import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
@@ -133,7 +134,7 @@ public class ServerSession extends Sessi
             new ConcurrentSkipListMap<Integer, MessageDispositionChangeListener>();
 
     private ServerTransaction _transaction;
-    
+
     private final AtomicLong _txnStarts = new AtomicLong(0);
     private final AtomicLong _txnCommits = new AtomicLong(0);
     private final AtomicLong _txnRejects = new AtomicLong(0);
@@ -152,7 +153,7 @@ public class ServerSession extends Sessi
     public ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, ConnectionConfig connConfig)
     {
         super(connection, delegate, name, expiry);
-        _connectionConfig = connConfig;        
+        _connectionConfig = connConfig;
         _transaction = new AsyncAutoCommitTransaction(this.getMessageStore(),this);
         _logSubject = new ChannelLogSubject(this);
         _id = getConfigStore().createId();
@@ -352,7 +353,7 @@ public class ServerSession extends Sessi
         }
     }
 
-    public void removeDispositionListener(Method method)                               
+    public void removeDispositionListener(Method method)
     {
         _messageDispositionListenerMap.remove(method.getId());
     }
@@ -380,7 +381,7 @@ public class ServerSession extends Sessi
         {
             task.doTask(this);
         }
-        
+
         CurrentActor.get().message(getLogSubject(), ChannelMessages.CLOSE());
     }
 
@@ -429,7 +430,7 @@ public class ServerSession extends Sessi
 
     public void unregister(Subscription_0_10 sub)
     {
-        _subscriptions.remove(sub.getConsumerTag().toString());
+        _subscriptions.remove(sub.getName());
         try
         {
             sub.getSendLock();
@@ -558,7 +559,7 @@ public class ServerSession extends Sessi
     public void commit()
     {
         _transaction.commit();
-        
+
         _txnCommits.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
@@ -567,13 +568,13 @@ public class ServerSession extends Sessi
     public void rollback()
     {
         _transaction.rollback();
-        
+
         _txnRejects.incrementAndGet();
         _txnStarts.incrementAndGet();
         decrementOutstandingTxnsIfNecessary();
     }
 
-    
+
     private void incrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -583,7 +584,7 @@ public class ServerSession extends Sessi
             _txnCount.compareAndSet(0,1);
         }
     }
-    
+
     private void decrementOutstandingTxnsIfNecessary()
     {
         if(isTransactional())
@@ -624,7 +625,7 @@ public class ServerSession extends Sessi
     {
         return _txnCount.get();
     }
-    
+
     public Principal getAuthorizedPrincipal()
     {
         return getConnection().getAuthorizedPrincipal();
@@ -975,17 +976,17 @@ public class ServerSession extends Sessi
         return _unfinishedCommandsQueue.isEmpty() ? null : _unfinishedCommandsQueue.getLast();
     }
 
-    public void recordFuture(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+    public void recordFuture(final StoreFuture future, final ServerTransaction.Action action)
     {
         _unfinishedCommandsQueue.add(new AsyncCommand(future, action));
     }
 
     private static class AsyncCommand
     {
-        private final MessageStore.StoreFuture _future;
+        private final StoreFuture _future;
         private ServerTransaction.Action _action;
 
-        public AsyncCommand(final MessageStore.StoreFuture future, final ServerTransaction.Action action)
+        public AsyncCommand(final StoreFuture future, final ServerTransaction.Action action)
         {
             _future = future;
             _action = action;

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Wed May  2 13:09:18 2012
@@ -40,6 +40,7 @@ import org.apache.qpid.server.flow.Windo
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.MessageMetaData_0_10;
 import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.queue.BaseQueue;
@@ -48,6 +49,7 @@ import org.apache.qpid.server.registry.A
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreFuture;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
@@ -126,7 +128,7 @@ public class ServerSessionDelegate exten
         serverSession.accept(method.getTransfers());
         if(!serverSession.isTransactional())
         {
-            serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE,
+            serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE,
                                        new CommandProcessedAction(serverSession, method));
         }
     }
@@ -356,7 +358,7 @@ public class ServerSessionDelegate exten
         }
         else
         {
-            serverSession.recordFuture(MessageStore.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
+            serverSession.recordFuture(StoreFuture.IMMEDIATE_FUTURE, new CommandProcessedAction(serverSession, xfr));
         }
     }
 
@@ -682,13 +684,12 @@ public class ServerSessionDelegate exten
     {
         String exchangeName = method.getExchange();
         VirtualHost virtualHost = getVirtualHost(session);
-        Exchange exchange = getExchange(session, exchangeName);
+        ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
 
         //we must check for any unsupported arguments present and throw not-implemented
         if(method.hasArguments())
         {
             Map<String,Object> args = method.getArguments();
-
             //QPID-3392: currently we don't support any!
             if(!args.isEmpty())
             {
@@ -696,120 +697,113 @@ public class ServerSessionDelegate exten
                 return;
             }
         }
-
-        if(method.getPassive())
+        synchronized(exchangeRegistry)
         {
-            if(exchange == null)
-            {
-                exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'");
-
-            }
-            else
-            {
-                if(!exchange.getTypeShortString().toString().equals(method.getType()) && (method.getType() != null && method.getType().length() > 0))
-                {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +".");
-                }
-            }
+            Exchange exchange = getExchange(session, exchangeName);
 
-        }
-        else
-        {
-            if (exchange == null)
+            if(method.getPassive())
             {
-                if(exchangeName.startsWith("amq."))
+                if(exchange == null)
                 {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
-                              "Attempt to declare exchange: " + exchangeName +
-                              " which begins with reserved prefix 'amq.'.");
+                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
                 }
-                else if(exchangeName.startsWith("qpid."))
+                else
                 {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
-                              "Attempt to declare exchange: " + exchangeName +
-                              " which begins with reserved prefix 'qpid.'.");
+                    if (!exchange.getTypeShortString().toString().equals(method.getType())
+                            && (method.getType() != null && method.getType().length() > 0))
+                    {
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+                                + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
+                    }
                 }
-                else
+            }
+            else
+            {
+                if (exchange == null)
                 {
-                    ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
-                    ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
-
-
-
-                    try
+                    if (exchangeName.startsWith("amq."))
                     {
-
-                        exchange = exchangeFactory.createExchange(method.getExchange(),
-                                                                  method.getType(),
-                                                                  method.getDurable(),
-                                                                  method.getAutoDelete());
-
-                        String alternateExchangeName = method.getAlternateExchange();
-                        boolean validAlternate;
-                        if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+                                + exchangeName + " which begins with reserved prefix 'amq.'.");
+                    }
+                    else if (exchangeName.startsWith("qpid."))
+                    {
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+                                + exchangeName + " which begins with reserved prefix 'qpid.'.");
+                    }
+                    else
+                    {
+                        ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+                        try
                         {
-                            Exchange alternate = getExchange(session, alternateExchangeName);
-                            if(alternate == null)
+                            exchange = exchangeFactory.createExchange(method.getExchange(),
+                                                                      method.getType(),
+                                                                      method.getDurable(),
+                                                                      method.getAutoDelete());
+                            String alternateExchangeName = method.getAlternateExchange();
+                            boolean validAlternate;
+                            if(alternateExchangeName != null && alternateExchangeName.length() != 0)
                             {
-                                validAlternate = false;
+                                Exchange alternate = getExchange(session, alternateExchangeName);
+                                if(alternate == null)
+                                {
+                                    validAlternate = false;
+                                }
+                                else
+                                {
+                                    exchange.setAlternateExchange(alternate);
+                                    validAlternate = true;
+                                }
                             }
                             else
                             {
-                                exchange.setAlternateExchange(alternate);
                                 validAlternate = true;
                             }
-                        }
-                        else
-                        {
-                            validAlternate = true;
-                        }
-
-                        if(validAlternate)
-                        {
-                            if (exchange.isDurable())
+                            if(validAlternate)
                             {
-                                DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-                                store.createExchange(exchange);
+                                if (exchange.isDurable())
+                                {
+                                    DurableConfigurationStore store = virtualHost.getMessageStore();
+                                    store.createExchange(exchange);
+                                }
+                                exchangeRegistry.registerExchange(exchange);
                             }
-
-                            exchangeRegistry.registerExchange(exchange);
+                            else
+                            {
+                                exception(session, method, ExecutionErrorCode.NOT_FOUND,
+                                            "Unknown alternate exchange " + alternateExchangeName);
+                            }
+                        }
+                        catch(AMQUnknownExchangeType e)
+                        {
+                            exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
                         }
-                        else
+                        catch (AMQException e)
                         {
-                            exception(session, method, ExecutionErrorCode.NOT_FOUND,
-                                        "Unknown alternate exchange " + alternateExchangeName);
+                            exception(session, method, e, "Cannot declare exchange '" + exchangeName);
                         }
                     }
-                    catch(AMQUnknownExchangeType e)
+                }
+                else
+                {
+                    if(!exchange.getTypeShortString().toString().equals(method.getType()))
                     {
-                        exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                                "Attempt to redeclare exchange: " + exchangeName
+                                        + " of type " + exchange.getTypeShortString()
+                                        + " to " + method.getType() +".");
                     }
-                    catch (AMQException e)
+                    else if(method.hasAlternateExchange()
+                              && (exchange.getAlternateExchange() == null ||
+                                  !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
                     {
-                        exception(session, method, e, "Cannot declare exchange '" + exchangeName);
+                        exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                                "Attempt to change alternate exchange of: " + exchangeName
+                                        + " from " + exchange.getAlternateExchange()
+                                        + " to " + method.getAlternateExchange() +".");
                     }
                 }
             }
-            else
-            {
-                if(!exchange.getTypeShortString().toString().equals(method.getType()))
-                {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
-                            "Attempt to redeclare exchange: " + exchangeName
-                                    + " of type " + exchange.getTypeShortString()
-                                    + " to " + method.getType() +".");
-                }
-                else if(method.hasAlternateExchange()
-                          && (exchange.getAlternateExchange() == null ||
-                              !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
-                {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
-                            "Attempt to change alternate exchange of: " + exchangeName
-                                    + " from " + exchange.getAlternateExchange()
-                                    + " to " + method.getAlternateExchange() +".");
-                }
-            }
-
         }
     }
 
@@ -924,7 +918,7 @@ public class ServerSessionDelegate exten
 
                 if (exchange.isDurable() && !exchange.isAutoDelete())
                 {
-                    DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+                    DurableConfigurationStore store = virtualHost.getMessageStore();
                     store.removeExchange(exchange);
                 }
             }
@@ -1205,7 +1199,7 @@ public class ServerSessionDelegate exten
     {
 
         VirtualHost virtualHost = getVirtualHost(session);
-        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+        DurableConfigurationStore store = virtualHost.getMessageStore();
 
         String queueName = method.getQueue();
         AMQQueue queue;
@@ -1395,8 +1389,8 @@ public class ServerSessionDelegate exten
     {
         String owner = body.getExclusive() ? session.getClientID() : null;
 
-        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
-                                                                  body.getExclusive(), virtualHost, body.getArguments());
+        final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner,
+                                                                  body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
 
         return queue;
     }
@@ -1448,7 +1442,7 @@ public class ServerSessionDelegate exten
                         queue.delete();
                         if (queue.isDurable() && !queue.isAutoDelete())
                         {
-                            DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+                            DurableConfigurationStore store = virtualHost.getMessageStore();
                             store.removeQueue(queue);
                         }
                     }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Wed May  2 13:09:18 2012
@@ -29,7 +29,8 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStore.StoreFuture;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.Transaction;
 
 import java.util.Collection;
 import java.util.List;
@@ -71,16 +72,16 @@ public class AsyncAutoCommitTransaction 
      */
     public void addPostTransactionAction(final Action immediateAction)
     {
-        addFuture(MessageStore.IMMEDIATE_FUTURE, immediateAction);
+        addFuture(StoreFuture.IMMEDIATE_FUTURE, immediateAction);
 
     }
 
     public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
-            MessageStore.StoreFuture future;
+            StoreFuture future;
             if(message.isPersistent() && queue.isDurable())
             {
                 if (_logger.isDebugEnabled())
@@ -96,7 +97,7 @@ public class AsyncAutoCommitTransaction 
             }
             else
             {
-                future = MessageStore.IMMEDIATE_FUTURE;
+                future = StoreFuture.IMMEDIATE_FUTURE;
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -113,7 +114,7 @@ public class AsyncAutoCommitTransaction 
 
     }
 
-    private void addFuture(final MessageStore.StoreFuture future, final Action action)
+    private void addFuture(final StoreFuture future, final Action action)
     {
         if(action != null)
         {
@@ -130,7 +131,7 @@ public class AsyncAutoCommitTransaction 
 
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
             for(QueueEntry entry : queueEntries)
@@ -154,7 +155,7 @@ public class AsyncAutoCommitTransaction 
                 }
 
             }
-            MessageStore.StoreFuture future;
+            StoreFuture future;
             if(txn != null)
             {
                 future = txn.commitTranAsync();
@@ -162,7 +163,7 @@ public class AsyncAutoCommitTransaction 
             }
             else
             {
-                future = MessageStore.IMMEDIATE_FUTURE;    
+                future = StoreFuture.IMMEDIATE_FUTURE;
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -182,10 +183,10 @@ public class AsyncAutoCommitTransaction 
 
     public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
-            MessageStore.StoreFuture future;
+            StoreFuture future;
             if(message.isPersistent() && queue.isDurable())
             {
                 if (_logger.isDebugEnabled())
@@ -200,7 +201,7 @@ public class AsyncAutoCommitTransaction 
             }
             else
             {
-                future = MessageStore.IMMEDIATE_FUTURE;
+                future = StoreFuture.IMMEDIATE_FUTURE;
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -220,7 +221,7 @@ public class AsyncAutoCommitTransaction 
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
 
@@ -246,7 +247,7 @@ public class AsyncAutoCommitTransaction 
                 }
                 
             }
-            MessageStore.StoreFuture future;
+            StoreFuture future;
             if (txn != null)
             {
                 future = txn.commitTranAsync();
@@ -254,7 +255,7 @@ public class AsyncAutoCommitTransaction 
             }
             else
             {
-                future = MessageStore.IMMEDIATE_FUTURE;
+                future = StoreFuture.IMMEDIATE_FUTURE;
             }
             addFuture(future, postTransactionAction);
             postTransactionAction = null;
@@ -278,7 +279,7 @@ public class AsyncAutoCommitTransaction 
     {
         if(immediatePostTransactionAction != null)
         {
-            addFuture(MessageStore.IMMEDIATE_FUTURE, new Action()
+            addFuture(StoreFuture.IMMEDIATE_FUTURE, new Action()
             {
                 public void postCommit()
                 {
@@ -305,7 +306,7 @@ public class AsyncAutoCommitTransaction 
         return false;
     }
 
-    private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+    private void rollbackIfNecessary(Action postTransactionAction, Transaction txn)
     {
         if (txn != null)
         {

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java Wed May  2 13:09:18 2012
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
 
 import java.util.Collection;
 import java.util.List;
@@ -67,7 +68,7 @@ public class AutoCommitTransaction imple
 
     public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
             if(message.isPersistent() && queue.isDurable())
@@ -99,7 +100,7 @@ public class AutoCommitTransaction imple
 
     public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
             for(QueueEntry entry : queueEntries)
@@ -146,7 +147,7 @@ public class AutoCommitTransaction imple
 
     public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
             if(message.isPersistent() && queue.isDurable())
@@ -179,7 +180,7 @@ public class AutoCommitTransaction imple
 
     public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
     {
-        MessageStore.Transaction txn = null;
+        Transaction txn = null;
         try
         {
 
@@ -247,7 +248,7 @@ public class AutoCommitTransaction imple
         return false;
     }
 
-    private void rollbackIfNecessary(Action postTransactionAction, MessageStore.Transaction txn)
+    private void rollbackIfNecessary(Action postTransactionAction, Transaction txn)
     {
         if (txn != null)
         {

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java Wed May  2 13:09:18 2012
@@ -26,6 +26,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Xid;
 
@@ -38,7 +39,7 @@ public class DistributedTransaction impl
 
     private final AutoCommitTransaction _autoCommitTransaction;
 
-    private volatile MessageStore.Transaction _transaction;
+    private volatile Transaction _transaction;
 
     private long _txnStartTime = 0L;
 

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Wed May  2 13:09:18 2012
@@ -33,6 +33,7 @@ import org.apache.qpid.server.message.En
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.Xid;
 
@@ -48,7 +49,7 @@ public class DtxBranch
     private final List<Record> _enqueueRecords = new ArrayList<Record>();
     private final List<Record> _dequeueRecords = new ArrayList<Record>();
 
-    private MessageStore.Transaction _transaction;
+    private Transaction _transaction;
     private long _expiration;
     private VirtualHost _vhost;
     private ScheduledFuture<?> _timeoutFuture;
@@ -199,7 +200,7 @@ public class DtxBranch
     public void prepare() throws AMQStoreException
     {
 
-        MessageStore.Transaction txn = _store.newTransaction();
+        Transaction txn = _store.newTransaction();
         txn.recordXid(_xid.getFormat(),
                       _xid.getGlobalId(),
                       _xid.getBranchId(),
@@ -223,7 +224,7 @@ public class DtxBranch
         {
             // prepare has previously been called
 
-            MessageStore.Transaction txn = _store.newTransaction();
+            Transaction txn = _store.newTransaction();
             txn.removeXid(_xid.getFormat(), _xid.getGlobalId(), _xid.getBranchId());
             txn.commitTran();
 
@@ -302,7 +303,7 @@ public class DtxBranch
         _enqueueRecords.add(new Record(queue, message));
     }
 
-    private static final class Record implements MessageStore.Transaction.Record
+    private static final class Record implements Transaction.Record
     {
         private final BaseQueue _queue;
         private final EnqueableMessage _message;

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Wed May  2 13:09:18 2012
@@ -29,6 +29,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -46,7 +47,7 @@ public class LocalTransaction implements
 
     private final List<Action> _postTransactionActions = new ArrayList<Action>();
 
-    private volatile MessageStore.Transaction _transaction;
+    private volatile Transaction _transaction;
     private MessageStore _transactionLog;
     private long _txnStartTime = 0L;
 

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java Wed May  2 13:09:18 2012
@@ -20,13 +20,12 @@
  */
 package org.apache.qpid.server.txn;
 
+import java.util.Collection;
+import java.util.List;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.QueueEntry;
 
-import java.util.Collection;
-import java.util.List;
-
 
 /**
  * The ServerTransaction interface allows a set enqueue/dequeue operations to be

Propchange: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1306564-1332660

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Wed May  2 13:09:18 2012
@@ -33,6 +33,7 @@ import org.apache.qpid.server.exchange.E
 import org.apache.qpid.server.exchange.ExchangeRegistry;
 import org.apache.qpid.server.federation.BrokerLink;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
@@ -57,8 +58,6 @@ public interface VirtualHost extends Dur
 
     MessageStore getMessageStore();
 
-    DurableConfigurationStore getDurableConfigurationStore();
-
     SecurityManager getSecurityManager();
 
     void close();
@@ -98,5 +97,9 @@ public interface VirtualHost extends Dur
 
     void removeBrokerConnection(BrokerLink brokerLink);
 
+    LinkRegistry getLinkRegistry(String remoteContainerId);
+
     ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
+
+    State getState();
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Wed May  2 13:09:18 2012
@@ -23,12 +23,11 @@ package org.apache.qpid.server.virtualho
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
+
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQStoreException;
@@ -52,6 +51,7 @@ import org.apache.qpid.server.store.Conf
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
 import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.Transaction;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.txn.DtxBranch;
@@ -74,11 +74,9 @@ public class VirtualHostConfigRecoveryHa
 {
     private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
 
-
     private final VirtualHost _virtualHost;
 
     private MessageStoreLogSubject _logSubject;
-    private List<ProcessAction> _actions;
 
     private MessageStore _store;
 
@@ -95,14 +93,14 @@ public class VirtualHostConfigRecoveryHa
 
     public VirtualHostConfigRecoveryHandler begin(MessageStore store)
     {
-        _logSubject = new MessageStoreLogSubject(_virtualHost,store);
+        _logSubject = new MessageStoreLogSubject(_virtualHost,store.getClass().getSimpleName());
         _store = store;
         CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(null, false));
 
         return this;
     }
 
-    public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments)
+    public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments)
     {
         try
         {
@@ -110,7 +108,7 @@ public class VirtualHostConfigRecoveryHa
     
             if (q == null)
             {
-                q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+                q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
                                                        FieldTable.convertToMap(arguments));
                 _virtualHost.getQueueRegistry().registerQueue(q);
             }
@@ -132,7 +130,7 @@ public class VirtualHostConfigRecoveryHa
         return this;
     }
 
-    public void exchange(String exchangeName, String type, boolean autoDelete)
+    public void exchange(UUID id, String exchangeName, String type, boolean autoDelete)
     {
         try
         {
@@ -141,7 +139,7 @@ public class VirtualHostConfigRecoveryHa
             exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
             if (exchange == null)
             {
-                exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
+                exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
                 _virtualHost.getExchangeRegistry().registerExchange(exchange);
             }
         }
@@ -201,8 +199,8 @@ public class VirtualHostConfigRecoveryHa
     }
 
     public void dtxRecord(long format, byte[] globalId, byte[] branchId,
-                          MessageStore.Transaction.Record[] enqueues,
-                          MessageStore.Transaction.Record[] dequeues)
+                          Transaction.Record[] enqueues,
+                          Transaction.Record[] dequeues)
     {
         Xid id = new Xid(format, globalId, branchId);
         DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
@@ -212,9 +210,9 @@ public class VirtualHostConfigRecoveryHa
             branch = new DtxBranch(id, _store, _virtualHost);
             dtxRegistry.registerBranch(branch);
         }
-        for(MessageStore.Transaction.Record record : enqueues)
+        for(Transaction.Record record : enqueues)
         {
-            final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+            final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
             if(queue != null)
             {
                 final long messageId = record.getMessage().getMessageNumber();
@@ -255,10 +253,9 @@ public class VirtualHostConfigRecoveryHa
                 else
                 {
                     StringBuilder xidString = xidAsString(id);
-                    String messageNumberString = String.valueOf(message.getMessageNumber());
                     CurrentActor.get().message(_logSubject,
                                                TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
-                                                                                            messageNumberString));
+                                                                                            Long.toString(messageId)));
                     
                 }
 
@@ -268,13 +265,13 @@ public class VirtualHostConfigRecoveryHa
                 StringBuilder xidString = xidAsString(id);
                 CurrentActor.get().message(_logSubject,
                                            TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                                      record.getQueue().getResourceName()));
+                                                                                      record.getQueue().getId().toString()));
 
             }
         }
-        for(MessageStore.Transaction.Record record : dequeues)
+        for(Transaction.Record record : dequeues)
         {
-            final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+            final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
             if(queue != null)
             {
                 final long messageId = record.getMessage().getMessageNumber();
@@ -306,10 +303,9 @@ public class VirtualHostConfigRecoveryHa
                 else
                 {
                     StringBuilder xidString = xidAsString(id);
-                    String messageNumberString = String.valueOf(message.getMessageNumber());
                     CurrentActor.get().message(_logSubject,
                                                TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
-                                                                                            messageNumberString));
+                                                                                            Long.toString(messageId)));
 
                 }
 
@@ -319,7 +315,7 @@ public class VirtualHostConfigRecoveryHa
                 StringBuilder xidString = xidAsString(id);
                 CurrentActor.get().message(_logSubject,
                                            TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
-                                                                                      queue.getName()));
+                                                                                      record.getQueue().getId().toString()));
             }
 
         }
@@ -358,47 +354,22 @@ public class VirtualHostConfigRecoveryHa
         CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
     }
 
-    private static final class ProcessAction
+    @Override
+    public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
     {
-        private final AMQQueue _queue;
-        private final AMQMessage _message;
-
-        public ProcessAction(AMQQueue queue, AMQMessage message)
-        {
-            _queue = queue;
-            _message = message;
-        }
-
-        public void process()
-        {
-            try
-            {
-                _queue.enqueue(_message);
-            }
-            catch(AMQException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-
-    }
-
-    public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
-    {
-        _actions = new ArrayList<ProcessAction>();
         try
         {
-            Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
+            Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId);
             if (exchange == null)
             {
-                _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName);
+                _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId);
                 return;
             }
-            
-            AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+            AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
             if (queue == null)
             {
-                _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName);
+                _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName());
             }
             else
             {
@@ -422,10 +393,10 @@ public class VirtualHostConfigRecoveryHa
                 if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
                 {
 
-                    _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
+                    _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName()
                         + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
 
-                    bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
+                    bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap);
                 }
             }
         }
@@ -447,16 +418,14 @@ public class VirtualHostConfigRecoveryHa
 
     }
 
-    public void queueEntry(final String queueName, long messageId)
+    public void queueEntry(final UUID queueId, long messageId)
     {
-        AMQShortString queueNameShortString = new AMQShortString(queueName);
-
-        AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
+        AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
         try
         {
             if(queue != null)
             {
+                String queueName = queue.getName();
                 ServerMessage message = _recoveredMessages.get(messageId);
                 _unusedMessages.remove(messageId);
 
@@ -466,7 +435,7 @@ public class VirtualHostConfigRecoveryHa
 
                     if (_logger.isDebugEnabled())
                     {
-                        _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
+                        _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
                     }
 
                     Integer count = _queueRecoveries.get(queueName);
@@ -481,23 +450,23 @@ public class VirtualHostConfigRecoveryHa
                 }
                 else
                 {
-                    _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
-                    MessageStore.Transaction txn = _store.newTransaction();
+                    _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
+                    Transaction txn = _store.newTransaction();
                     txn.dequeueMessage(queue, new DummyMessage(messageId));
                     txn.commitTranAsync();
                 }
             }
             else
             {
-                _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
-                MessageStore.Transaction txn = _store.newTransaction();
+                _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
+                Transaction txn = _store.newTransaction();
                 TransactionLogResource mockQueue =
                         new TransactionLogResource()
                         {
-
-                            public String getResourceName()
+                            @Override
+                            public UUID getId()
                             {
-                                return queueName;
+                                return queueId;
                             }
                         };
                 txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
@@ -509,9 +478,6 @@ public class VirtualHostConfigRecoveryHa
         {
             throw new RuntimeException(e);
         }
-
-
-
     }
 
     public DtxRecordRecoveryHandler completeQueueEntryRecovery()

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Wed May  2 13:09:18 2012
@@ -20,15 +20,11 @@
  */
 package org.apache.qpid.server.virtualhost;
 
-import java.util.concurrent.ScheduledFuture;
-import org.apache.commons.configuration.Configuration;
+
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.AMQBrokerManagerMBean;
 import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.configuration.BrokerConfig;
@@ -45,14 +41,13 @@ import org.apache.qpid.server.exchange.D
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeFactory;
 import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.federation.Bridge;
 import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.VirtualHostMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.AMQQueue;
@@ -62,22 +57,29 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
-import org.apache.qpid.server.store.DurableConfigurationStore;
+import org.apache.qpid.server.store.Event;
+import org.apache.qpid.server.store.EventListener;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreFactory;
+import org.apache.qpid.server.store.OperationalLoggingListener;
 import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
 
+import javax.management.JMException;
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+
 public class VirtualHostImpl implements VirtualHost
 {
     private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
@@ -100,97 +102,80 @@ public class VirtualHostImpl implements 
 
     private final BrokerConfig _brokerConfig;
 
-    private final VirtualHostConfiguration _configuration;
+    private final VirtualHostConfiguration _vhostConfig;
 
-    private ConnectionRegistry _connectionRegistry;
+    private final VirtualHostMBean _virtualHostMBean;
 
-    private QueueRegistry _queueRegistry;
+    private final AMQBrokerManagerMBean _brokerMBean;
 
-    private ExchangeRegistry _exchangeRegistry;
+    private final QueueRegistry _queueRegistry;
 
-    private ExchangeFactory _exchangeFactory;
+    private final ExchangeRegistry _exchangeRegistry;
 
-    private MessageStore _messageStore;
+    private final ExchangeFactory _exchangeFactory;
 
-    private DtxRegistry _dtxRegistry;
+    private final ConnectionRegistry _connectionRegistry;
 
-    private VirtualHostMBean _virtualHostMBean;
+    private final BindingFactory _bindingFactory;
 
-    private AMQBrokerManagerMBean _brokerMBean;
+    private final DtxRegistry _dtxRegistry;
 
+    private final MessageStore _messageStore;
 
-    private DurableConfigurationStore _durableConfigurationStore;
-    private BindingFactory _bindingFactory;
+    private State _state = State.INITIALISING;
 
     private boolean _statisticsEnabled = false;
+
     private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
 
+    private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
 
-    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig, MessageStore store) throws Exception
+    public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
     {
         if (hostConfig == null)
         {
             throw new IllegalArgumentException("HostConfig cannot be null");
         }
 
+        if (hostConfig.getName() == null || hostConfig.getName().length() == 0)
+        {
+            throw new IllegalArgumentException("Illegal name (" + hostConfig.getName() + ") for virtualhost.");
+        }
+
         _appRegistry = appRegistry;
         _brokerConfig = _appRegistry.getBroker();
-        _configuration = hostConfig;
-        _name = _configuration.getName();
+        _vhostConfig = hostConfig;
+        _name = _vhostConfig.getName();
         _dtxRegistry = new DtxRegistry();
 
         _id = _appRegistry.getConfigStore().createId();
 
         CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
 
-        if (_name == null || _name.length() == 0)
-        {
-            throw new IllegalArgumentException("Illegal name (" + _name + ") for virtualhost.");
-        }
-
-        _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
-        _securityManager.configureHostPlugins(_configuration);
-
         _virtualHostMBean = new VirtualHostMBean();
+        _securityManager = new SecurityManager(_appRegistry.getSecurityManager());
+        _securityManager.configureHostPlugins(_vhostConfig);
 
         _connectionRegistry = new ConnectionRegistry();
 
-        _houseKeepingTasks = new ScheduledThreadPoolExecutor(_configuration.getHouseKeepingThreadCount());
+        _houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
 
         _queueRegistry = new DefaultQueueRegistry(this);
 
         _exchangeFactory = new DefaultExchangeFactory(this);
-        _exchangeFactory.initialise(_configuration);
+        _exchangeFactory.initialise(_vhostConfig);
 
         _exchangeRegistry = new DefaultExchangeRegistry(this);
 
-        StartupRoutingTable configFileRT = new StartupRoutingTable();
-
-        _durableConfigurationStore = configFileRT;
-
-        // This needs to be after the RT has been defined as it creates the default durable exchanges.
-        _exchangeRegistry.initialise();
-
         _bindingFactory = new BindingFactory(this);
 
-        initialiseModel(_configuration);
+        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
 
-        if (store != null)
-        {
-            _messageStore = store;
-            if(store instanceof DurableConfigurationStore)
-            {
-                _durableConfigurationStore = (DurableConfigurationStore) store;
-            }
-        }
-        else
-        {
-            initialiseMessageStore(hostConfig);
-        }
+        _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass());
 
-        _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
-        _brokerMBean.register();
-        initialiseHouseKeeping(hostConfig.getHousekeepingCheckPeriod());
+        configureMessageStore(hostConfig);
+
+        activateNonHAMessageStore();
 
         initialiseStatistics();
     }
@@ -202,7 +187,7 @@ public class VirtualHostImpl implements 
 
     public VirtualHostConfiguration getConfiguration()
     {
-        return _configuration;
+        return _vhostConfig;
     }
 
     public UUID getId()
@@ -226,47 +211,16 @@ public class VirtualHostImpl implements 
     }
 
     /**
-     * Virtual host JMX MBean class.
-     *
-     * This has some of the methods implemented from management intrerface for exchanges. Any
-     * implementaion of an Exchange MBean should extend this class.
-     */
-    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
-    {
-        public VirtualHostMBean() throws NotCompliantMBeanException
-        {
-            super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
-        }
-
-        public String getObjectInstanceName()
-        {
-            return ObjectName.quote(_name);
-        }
-
-        public String getName()
-        {
-            return _name;
-        }
-
-        public VirtualHostImpl getVirtualHost()
-        {
-            return VirtualHostImpl.this;
-        }
-    }
-
-
-    /**
      * Initialise a housekeeping task to iterate over queues cleaning expired messages with no consumers
      * and checking for idle or open transactions that have exceeded the permitted thresholds.
      *
      * @param period
      */
-	private void initialiseHouseKeeping(long period)
+    private void initialiseHouseKeeping(long period)
     {
+
         if (period != 0L)
         {
-
-
             scheduleHouseKeepingTask(period, new VirtualHostHouseKeepingTask());
 
             Map<String, VirtualHostPluginFactory> plugins = _appRegistry.getPluginManager().getVirtualHostPlugins();
@@ -299,50 +253,30 @@ public class VirtualHostImpl implements 
         }
     }
 
-    private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+    private void shutdownHouseKeeping()
     {
-        public VirtualHostHouseKeepingTask()
-        {
-            super(VirtualHostImpl.this);
-        }
+        _houseKeepingTasks.shutdown();
 
-        public void execute()
+           try
+           {
+               if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
+               {
+                   _houseKeepingTasks.shutdownNow();
+               }
+           }
+           catch (InterruptedException e)
+           {
+               _logger.warn("Interrupted during Housekeeping shutdown:", e);
+               Thread.currentThread().interrupt();
+           }
+    }
+
+    private void removeHouseKeepingTasks()
+    {
+        BlockingQueue<Runnable> taskQueue = _houseKeepingTasks.getQueue();
+        for (final Runnable runnable : taskQueue)
         {
-            for (AMQQueue q : _queueRegistry.getQueues())
-            {
-                _logger.debug("Checking message status for queue: "
-                              + q.getName());
-                try
-                {
-                    q.checkMessageStatus();
-                }
-                catch (Exception e)
-                {
-                    _logger.error("Exception in housekeeping for queue: "
-                                  + q.getNameShortString().toString(), e);
-                    //Don't throw exceptions as this will stop the
-                    // house keeping task from running.
-                }
-            }
-            for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
-            {
-                _logger.debug("Checking for long running open transactions on connection " + connection);
-                for (AMQSessionModel session : connection.getSessionModels())
-                {
-                    _logger.debug("Checking for long running open transactions on session " + session);
-                    try
-                    {
-                        session.checkTransactionStatus(_configuration.getTransactionTimeoutOpenWarn(),
-                                                       _configuration.getTransactionTimeoutOpenClose(),
-                                                       _configuration.getTransactionTimeoutIdleWarn(),
-                                                       _configuration.getTransactionTimeoutIdleClose());
-                    }
-                    catch (Exception e)
-                    {
-                        _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
-                    }
-                }
-            }
+            _houseKeepingTasks.remove(runnable);
         }
     }
 
@@ -390,48 +324,44 @@ public class VirtualHostImpl implements 
     }
 
 
-    private void initialiseMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+    private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception
     {
-        String messageStoreClass = hostConfig.getMessageStoreClass();
+        final Class<?> clazz = Class.forName(messageStoreFactoryClass);
+        final Object o = clazz.newInstance();
 
-        Class<?> clazz = Class.forName(messageStoreClass);
-        Object o = clazz.newInstance();
-
-        if (!(o instanceof MessageStore))
+        if (!(o instanceof MessageStoreFactory))
         {
-            throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz +
-                                         " does not.");
+            throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class +
+                                        ". Class " + clazz + " does not.");
         }
-        MessageStore messageStore = (MessageStore) o;
-        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
 
-        MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStore);
+        final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
+        final MessageStore messageStore = messageStoreFactory.createMessageStore();
+        final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
+        OperationalLoggingListener.listen(messageStore, storeLogSubject);
 
+        messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
+        messageStore.addEventListener(new AfterActivationListener(), Event.AFTER_ACTIVATE);
+        messageStore.addEventListener(new BeforeCloseListener(), Event.BEFORE_CLOSE);
+        messageStore.addEventListener(new BeforePassivationListener(), Event.BEFORE_PASSIVATE);
 
-        if(messageStore instanceof DurableConfigurationStore)
-        {
-            DurableConfigurationStore durableConfigurationStore = (DurableConfigurationStore) messageStore;
-
-            durableConfigurationStore.configureConfigStore(this.getName(),
-                                              recoveryHandler,
-                                              hostConfig.getStoreConfiguration(),
-                                              storeLogSubject);
+        return messageStore;
+    }
 
-            _durableConfigurationStore = durableConfigurationStore;
-        }
+    private void configureMessageStore(VirtualHostConfiguration hostConfig) throws Exception
+    {
 
-        messageStore.configureMessageStore(this.getName(),
-                                           recoveryHandler,
-                                           hostConfig.getStoreConfiguration(),
-                                           storeLogSubject);
-        messageStore.configureTransactionLog(this.getName(),
-                                           recoveryHandler,
-                                           hostConfig.getStoreConfiguration(),
-                                           storeLogSubject);
+        VirtualHostConfigRecoveryHandler recoveryHandler = new VirtualHostConfigRecoveryHandler(this);
 
-        _messageStore = messageStore;
+        // TODO perhaps pass config on construction??
+        _messageStore.configureConfigStore(getName(), recoveryHandler, hostConfig.getStoreConfiguration());
+        _messageStore.configureMessageStore(getName(), recoveryHandler, recoveryHandler, hostConfig.getStoreConfiguration());
 
+    }
 
+    private void activateNonHAMessageStore() throws Exception
+    {
+        _messageStore.activate();
     }
 
     private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -445,7 +375,7 @@ public class VirtualHostImpl implements 
             configureExchange(config.getExchangeConfiguration(exchangeName));
         }
 
-    	String[] queueNames = config.getQueueNames();
+        String[] queueNames = config.getQueueNames();
 
         for (Object queueNameObj : queueNames)
         {
@@ -456,45 +386,45 @@ public class VirtualHostImpl implements 
 
     private void configureExchange(ExchangeConfiguration exchangeConfiguration) throws AMQException
     {
-    	AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
+        AMQShortString exchangeName = new AMQShortString(exchangeConfiguration.getName());
 
         Exchange exchange;
         exchange = _exchangeRegistry.getExchange(exchangeName);
         if (exchange == null)
         {
 
-    		AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
-    		boolean durable = exchangeConfiguration.getDurable();
-    		boolean autodelete = exchangeConfiguration.getAutoDelete();
+            AMQShortString type = new AMQShortString(exchangeConfiguration.getType());
+            boolean durable = exchangeConfiguration.getDurable();
+            boolean autodelete = exchangeConfiguration.getAutoDelete();
 
             Exchange newExchange = _exchangeFactory.createExchange(exchangeName, type, durable, autodelete, 0);
             _exchangeRegistry.registerExchange(newExchange);
 
             if (newExchange.isDurable())
             {
-                _durableConfigurationStore.createExchange(newExchange);
+                _messageStore.createExchange(newExchange);
             }
         }
     }
 
     private void configureQueue(QueueConfiguration queueConfiguration) throws AMQException, ConfigurationException
     {
-    	AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueConfiguration, this);
         String queueName = queue.getName();
 
-    	if (queue.isDurable())
-    	{
-    		getDurableConfigurationStore().createQueue(queue);
-    	}
+        if (queue.isDurable())
+        {
+            getMessageStore().createQueue(queue);
+        }
 
         //get the exchange name (returns default exchange name if none was specified)
-    	String exchangeName = queueConfiguration.getExchange();
+        String exchangeName = queueConfiguration.getExchange();
 
         Exchange exchange = _exchangeRegistry.getExchange(exchangeName);
-    	if (exchange == null)
-    	{
+        if (exchange == null)
+        {
             throw new ConfigurationException("Attempt to bind queue '" + queueName + "' to unknown exchange:" + exchangeName);
-    	}
+        }
 
         Exchange defaultExchange = _exchangeRegistry.getDefaultExchange();
 
@@ -573,11 +503,6 @@ public class VirtualHostImpl implements 
         return _messageStore;
     }
 
-    public DurableConfigurationStore getDurableConfigurationStore()
-    {
-        return _durableConfigurationStore;
-    }
-
     public SecurityManager getSecurityManager()
     {
         return _securityManager;
@@ -587,39 +512,8 @@ public class VirtualHostImpl implements 
     {
         //Stop Connections
         _connectionRegistry.close();
-
-        //Stop the Queues processing
-        if (_queueRegistry != null)
-        {
-            for (AMQQueue queue : _queueRegistry.getQueues())
-            {
-                queue.stop();
-            }
-        }
-
-        //Stop Housekeeping
-        if (_houseKeepingTasks != null)
-        {
-            _houseKeepingTasks.shutdown();
-
-            try
-            {
-                if (!_houseKeepingTasks.awaitTermination(HOUSEKEEPING_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS))
-                {
-                    _houseKeepingTasks.shutdownNow();
-                }
-            }
-            catch (InterruptedException e)
-            {
-                _logger.warn("Interrupted during Housekeeping shutdown:" + e.getMessage());
-                // Swallowing InterruptedException ok as we are shutting down.
-            }
-        }
-
-        if(_dtxRegistry != null)
-        {
-            _dtxRegistry.close();
-        }
+        _queueRegistry.stopAllAndUnregisterMBeans();
+        _dtxRegistry.close();
 
         //Close MessageStore
         if (_messageStore != null)
@@ -635,6 +529,8 @@ public class VirtualHostImpl implements 
             }
         }
 
+        _state = State.STOPPED;
+
         CurrentActor.get().message(VirtualHostMessages.CLOSED());
     }
 
@@ -746,7 +642,6 @@ public class VirtualHostImpl implements 
         return blink;
     }
 
-    
     public void createBrokerConnection(final String transport,
                                        final String host,
                                        final int port,
@@ -783,6 +678,17 @@ public class VirtualHostImpl implements 
         }
     }
 
+    public synchronized LinkRegistry getLinkRegistry(String remoteContainerId)
+    {
+        LinkRegistry linkRegistry = _linkRegistry.get(remoteContainerId);
+        if(linkRegistry == null)
+        {
+            linkRegistry = new LinkRegistry();
+            _linkRegistry.put(remoteContainerId, linkRegistry);
+        }
+        return linkRegistry;
+    }
+
     public ConfigStore getConfigStore()
     {
         return getApplicationRegistry().getConfigStore();
@@ -793,74 +699,161 @@ public class VirtualHostImpl implements 
         return _dtxRegistry;
     }
 
+    @Override
+    public String toString()
+    {
+        return _name;
+    }
+
+    @Override
+    public State getState()
+    {
+        return _state;
+    }
+
+
     /**
-     * Temporary Startup RT class to record the creation of persistent queues / exchanges.
-     *
+     * Virtual host JMX MBean class.
      *
-     * This is so we can replay the creation of queues/exchanges in to the real _RT after it has been loaded.
-     * This should be removed after the _RT has been fully split from the the TL
+     * This has some of the methods implemented from management interface for exchanges. Any
+     * Implementation of an Exchange MBean should extend this class.
      */
-    private static class StartupRoutingTable implements DurableConfigurationStore
+    public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
     {
-        public void configureConfigStore(String name,
-                                         ConfigurationRecoveryHandler recoveryHandler,
-                                         Configuration config,
-                                         LogSubject logSubject) throws Exception
+        public VirtualHostMBean() throws NotCompliantMBeanException
         {
+            super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
         }
 
-        public void createExchange(Exchange exchange) throws AMQStoreException
+        public String getObjectInstanceName()
         {
+            return ObjectName.quote(_name);
         }
 
-        public void removeExchange(Exchange exchange) throws AMQStoreException
+        public String getName()
         {
+            return _name;
         }
 
-        public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+        public VirtualHostImpl getVirtualHost()
         {
+            return VirtualHostImpl.this;
         }
+    }
 
-        public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+    private final class BeforeActivationListener implements EventListener
+    {
+        @Override
+        public void event(Event event)
         {
+            try
+            {
+                _exchangeRegistry.initialise();
+                initialiseModel(_vhostConfig);
+            } catch (Exception e)
+            {
+                throw new RuntimeException("Failed to initialise virtual host after state change", e);
+            }
         }
+    }
 
-        public void createQueue(AMQQueue queue) throws AMQStoreException
+    private final class AfterActivationListener implements EventListener
+    {
+        @Override
+        public void event(Event event)
         {
-        }
+            initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+            try
+            {
+                _brokerMBean.register();
+            } catch (JMException e)
+            {
+                throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+            }
 
-        public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQStoreException
-        {
+            _state = State.ACTIVE;
         }
+    }
 
-        public void removeQueue(AMQQueue queue) throws AMQStoreException
-        {
-        }
+    public class BeforePassivationListener implements EventListener
+    {
 
-        public void updateQueue(AMQQueue queue) throws AMQStoreException
+        @Override
+        public void event(Event event)
         {
-        }
+            _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+            _brokerMBean.unregister();
+            removeHouseKeepingTasks();
 
-        public void createBrokerLink(final BrokerLink link) throws AMQStoreException
-        {
+            _queueRegistry.stopAllAndUnregisterMBeans();
+            _exchangeRegistry.clearAndUnregisterMbeans();
+            _dtxRegistry.close();
+
+            _state = State.PASSIVE;
         }
+    }
 
-        public void deleteBrokerLink(final BrokerLink link) throws AMQStoreException
+    private final class BeforeCloseListener implements EventListener
+    {
+        @Override
+        public void event(Event event)
         {
+            _brokerMBean.unregister();
+            shutdownHouseKeeping();
         }
+    }
 
-        public void createBridge(final Bridge bridge) throws AMQStoreException
+    private class VirtualHostHouseKeepingTask extends HouseKeepingTask
+    {
+        public VirtualHostHouseKeepingTask()
         {
+            super(VirtualHostImpl.this);
         }
 
-        public void deleteBridge(final Bridge bridge) throws AMQStoreException
+        public void execute()
         {
+            for (AMQQueue q : _queueRegistry.getQueues())
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Checking message status for queue: "
+                            + q.getName());
+                }
+                try
+                {
+                    q.checkMessageStatus();
+                } catch (Exception e)
+                {
+                    _logger.error("Exception in housekeeping for queue: "
+                            + q.getNameShortString().toString(), e);
+                    //Don't throw exceptions as this will stop the
+                    // house keeping task from running.
+                }
+            }
+            for (AMQConnectionModel connection : getConnectionRegistry().getConnections())
+            {
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Checking for long running open transactions on connection " + connection);
+                }
+                for (AMQSessionModel session : connection.getSessionModels())
+                {
+                    if (_logger.isDebugEnabled())
+                    {
+                        _logger.debug("Checking for long running open transactions on session " + session);
+                    }
+                    try
+                    {
+                        session.checkTransactionStatus(_vhostConfig.getTransactionTimeoutOpenWarn(),
+                                _vhostConfig.getTransactionTimeoutOpenClose(),
+                                _vhostConfig.getTransactionTimeoutIdleWarn(),
+                                _vhostConfig.getTransactionTimeoutIdleClose());
+                    } catch (Exception e)
+                    {
+                        _logger.error("Exception in housekeeping for connection: " + connection.toString(), e);
+                    }
+                }
+            }
         }
     }
-
-    @Override
-    public String toString()
-    {
-        return _name;
-    }
 }

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java Wed May  2 13:09:18 2012
@@ -37,6 +37,7 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.store.TestableMemoryMessageStoreFactory;
 import org.apache.qpid.server.util.TestApplicationRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -165,7 +166,7 @@ public class AMQBrokerManagerMBeanTest e
 
         XMLConfiguration configXml = new XMLConfiguration();
         configXml.addProperty("virtualhosts.virtualhost(-1).name", "test");
-        configXml.addProperty("virtualhosts.virtualhost(-1).test.store.class", TestableMemoryMessageStore.class.getName());
+        configXml.addProperty("virtualhosts.virtualhost(-1).test.store.factoryclass", TestableMemoryMessageStoreFactory.class.getName());
 
         ServerConfiguration configuration = new ServerConfiguration(configXml);
 

Modified: qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java?rev=1333027&r1=1333026&r2=1333027&view=diff
==============================================================================
--- qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java (original)
+++ qpid/branches/qpid-3767/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java Wed May  2 13:09:18 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
 package org.apache.qpid.server.configuration;
 
 import java.util.UUID;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message