qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject svn commit: r1294242 [4/7] - in /qpid/branches/QPID-3799-acl: ./ cpp/ cpp/bindings/qmf/tests/ cpp/bindings/qpid/ cpp/bindings/qpid/dotnet/ cpp/docs/api/ cpp/examples/old_api/tradedemo/ cpp/include/qmf/engine/ cpp/include/qpid/framing/ cpp/include/qpid/...
Date Mon, 27 Feb 2012 17:40:49 GMT
Modified: qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Mon Feb 27 17:40:42 2012
@@ -21,6 +21,8 @@
 package org.apache.qpid.server.transport;
 
 import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
 import static org.apache.qpid.util.Serial.gt;
 
 import java.security.Principal;
@@ -44,6 +46,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.auth.Subject;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.configuration.ConfigStore;
@@ -67,24 +70,19 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
 import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.DistributedTransaction;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
 import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.transport.Binary;
-import org.apache.qpid.transport.Connection;
-import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageFlow;
-import org.apache.qpid.transport.MessageFlowMode;
-import org.apache.qpid.transport.MessageSetFlowMode;
-import org.apache.qpid.transport.MessageStop;
-import org.apache.qpid.transport.MessageTransfer;
-import org.apache.qpid.transport.Method;
-import org.apache.qpid.transport.Range;
-import org.apache.qpid.transport.RangeSet;
-import org.apache.qpid.transport.RangeSetFactory;
-import org.apache.qpid.transport.Session;
-import org.apache.qpid.transport.SessionDelegate;
+import org.apache.qpid.transport.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,7 +107,6 @@ public class ServerSession extends Sessi
     private ChannelLogSubject _logSubject;
     private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT);
 
-
     public static interface MessageDispositionChangeListener
     {
         public void onAccept();
@@ -359,7 +356,15 @@ public class ServerSession extends Sessi
 
     public void onClose()
     {
-        _transaction.rollback();
+        if(_transaction instanceof LocalTransaction)
+        {
+            _transaction.rollback();
+        }
+        else if(_transaction instanceof DistributedTransaction)
+        {
+            getVirtualHost().getDtxRegistry().endAssociations(this);
+        }
+
         for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
         {
             listener.onRelease(true);
@@ -455,6 +460,95 @@ public class ServerSession extends Sessi
         _txnStarts.incrementAndGet();
     }
 
+    public void selectDtx()
+    {
+        _transaction = new DistributedTransaction(this, getMessageStore(), getVirtualHost());
+
+    }
+
+
+    public void startDtx(Xid xid, boolean join, boolean resume)
+            throws JoinAndResumeDtxException,
+                   UnknownDtxBranchException,
+                   AlreadyKnownDtxException,
+                   DtxNotSelectedException
+    {
+        DistributedTransaction distributedTransaction = assertDtxTransaction();
+        distributedTransaction.start(xid, join, resume);
+    }
+
+
+    public void endDtx(Xid xid, boolean fail, boolean suspend)
+            throws NotAssociatedDtxException,
+            UnknownDtxBranchException,
+            DtxNotSelectedException,
+            SuspendAndFailDtxException, TimeoutDtxException
+    {
+        DistributedTransaction distributedTransaction = assertDtxTransaction();
+        distributedTransaction.end(xid, fail, suspend);
+    }
+
+
+    public long getTimeoutDtx(Xid xid)
+            throws UnknownDtxBranchException
+    {
+        return getVirtualHost().getDtxRegistry().getTimeout(xid);
+    }
+
+
+    public void setTimeoutDtx(Xid xid, long timeout)
+            throws UnknownDtxBranchException
+    {
+        getVirtualHost().getDtxRegistry().setTimeout(xid, timeout);
+    }
+
+
+    public void prepareDtx(Xid xid)
+            throws UnknownDtxBranchException,
+            IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+    {
+        getVirtualHost().getDtxRegistry().prepare(xid);
+    }
+
+    public void commitDtx(Xid xid, boolean onePhase)
+            throws UnknownDtxBranchException,
+            IncorrectDtxStateException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
+    {
+        getVirtualHost().getDtxRegistry().commit(xid, onePhase);
+    }
+
+
+    public void rollbackDtx(Xid xid)
+            throws UnknownDtxBranchException,
+            IncorrectDtxStateException, AMQStoreException, TimeoutDtxException
+    {
+        getVirtualHost().getDtxRegistry().rollback(xid);
+    }
+
+
+    public void forgetDtx(Xid xid) throws UnknownDtxBranchException, IncorrectDtxStateException
+    {
+        getVirtualHost().getDtxRegistry().forget(xid);
+    }
+
+    public List<Xid> recoverDtx()
+    {
+        return getVirtualHost().getDtxRegistry().recover();
+    }
+
+    private DistributedTransaction assertDtxTransaction() throws DtxNotSelectedException
+    {
+        if(_transaction instanceof DistributedTransaction)
+        {
+            return (DistributedTransaction) _transaction;
+        }
+        else
+        {
+            throw new DtxNotSelectedException();
+        }
+    }
+
+
     public void commit()
     {
         _transaction.commit();
@@ -704,7 +798,7 @@ public class ServerSession extends Sessi
     {
         if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
         {
-            if(_blocking.compareAndSet(true,false))
+            if(_blocking.compareAndSet(true,false) && !isClosing())
             {
 
                 _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
@@ -759,6 +853,16 @@ public class ServerSession extends Sessi
         }
     }
 
+    void stopSubscriptions()
+    {
+        final Collection<Subscription_0_10> subscriptions = getSubscriptions();
+        for (Subscription_0_10 subscription_0_10 : subscriptions)
+        {
+            subscription_0_10.stop();
+        }
+    }
+
+
     public void receivedComplete()
     {
         final Collection<Subscription_0_10> subscriptions = getSubscriptions();
@@ -900,7 +1004,11 @@ public class ServerSession extends Sessi
         }
     }
 
-    @Override
+    protected void setClose(boolean close)
+    {
+        super.setClose(close);
+    }
+
     public int compareTo(AMQSessionModel session)
     {
         return getId().toString().compareTo(session.getID().toString());

Modified: qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Mon Feb 27 17:40:42 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.transport
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.AMQUnknownExchangeType;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -50,7 +51,16 @@ import org.apache.qpid.server.store.Mess
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
 import org.apache.qpid.server.subscription.Subscription_0_10;
+import org.apache.qpid.server.txn.AlreadyKnownDtxException;
+import org.apache.qpid.server.txn.DtxNotSelectedException;
+import org.apache.qpid.server.txn.IncorrectDtxStateException;
+import org.apache.qpid.server.txn.JoinAndResumeDtxException;
+import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
+import org.apache.qpid.server.txn.UnknownDtxBranchException;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.*;
 
@@ -199,6 +209,10 @@ public class ServerSessionDelegate exten
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
+                else if(queue.isExclusive() && queue.getExclusiveOwningSession() != null && queue.getExclusiveOwningSession() != session)
+                {
+                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+                }
                 else
                 {
                     if(queue.isExclusive())
@@ -221,7 +235,6 @@ public class ServerSessionDelegate exten
                                 }
                             });
                         }
-
                     }
 
                     FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
@@ -426,6 +439,235 @@ public class ServerSessionDelegate exten
         ((ServerSession)session).rollback();
     }
 
+    @Override
+    public void dtxSelect(Session session, DtxSelect method)
+    {
+        // TODO - check current tx mode
+        ((ServerSession)session).selectDtx();
+    }
+
+    @Override
+    public void dtxStart(Session session, DtxStart method)
+    {
+        XaResult result = new XaResult();
+        result.setStatus(DtxXaStatus.XA_OK);
+        try
+        {
+            ((ServerSession)session).startDtx(method.getXid(), method.getJoin(), method.getResume());
+            session.executionResult(method.getId(), result);
+        }
+        catch(JoinAndResumeDtxException e)
+        {
+            exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Unknown xid " + method.getXid());
+        }
+        catch(AlreadyKnownDtxException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Xid already started an neither join nor " +
+                                                                       "resume set" + method.getXid());
+        }
+        catch(DtxNotSelectedException e)
+        {
+            exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
+        }
+
+    }
+
+    @Override
+    public void dtxEnd(Session session, DtxEnd method)
+    {
+        XaResult result = new XaResult();
+        result.setStatus(DtxXaStatus.XA_OK);
+        try
+        {
+            try
+            {
+                ((ServerSession)session).endDtx(method.getXid(), method.getFail(), method.getSuspend());
+            }
+            catch (TimeoutDtxException e)
+            {
+                result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+            }
+            session.executionResult(method.getId(), result);
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+        catch(NotAssociatedDtxException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+        catch(DtxNotSelectedException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+        catch(SuspendAndFailDtxException e)
+        {
+            exception(session, method, ExecutionErrorCode.COMMAND_INVALID, e.getMessage());
+        }
+
+    }
+
+    @Override
+    public void dtxCommit(Session session, DtxCommit method)
+    {
+        XaResult result = new XaResult();
+        result.setStatus(DtxXaStatus.XA_OK);
+        try
+        {
+            try
+            {
+                ((ServerSession)session).commitDtx(method.getXid(), method.getOnePhase());
+            }
+            catch (RollbackOnlyDtxException e)
+            {
+                result.setStatus(DtxXaStatus.XA_RBROLLBACK);
+            }
+            catch (TimeoutDtxException e)
+            {
+                result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+            }
+            session.executionResult(method.getId(), result);
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+        }
+        catch(IncorrectDtxStateException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+        catch(AMQStoreException e)
+        {
+            exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
+        }
+    }
+
+    @Override
+    public void dtxForget(Session session, DtxForget method)
+    {
+        try
+        {
+            ((ServerSession)session).forgetDtx(method.getXid());
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+        }
+        catch(IncorrectDtxStateException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+
+    }
+
+    @Override
+    public void dtxGetTimeout(Session session, DtxGetTimeout method)
+    {
+        GetTimeoutResult result = new GetTimeoutResult();
+        try
+        {
+            result.setTimeout(((ServerSession) session).getTimeoutDtx(method.getXid()));
+            session.executionResult(method.getId(), result);
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+        }
+    }
+
+    @Override
+    public void dtxPrepare(Session session, DtxPrepare method)
+    {
+        XaResult result = new XaResult();
+        result.setStatus(DtxXaStatus.XA_OK);
+        try
+        {
+            try
+            {
+                ((ServerSession)session).prepareDtx(method.getXid());
+            }
+            catch (RollbackOnlyDtxException e)
+            {
+                result.setStatus(DtxXaStatus.XA_RBROLLBACK);
+            }
+            catch (TimeoutDtxException e)
+            {
+                result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+            }
+            session.executionResult((int) method.getId(), result);
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+        }
+        catch(IncorrectDtxStateException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+        catch(AMQStoreException e)
+        {
+            exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
+        }
+    }
+
+    @Override
+    public void dtxRecover(Session session, DtxRecover method)
+    {
+        RecoverResult result = new RecoverResult();
+        List inDoubt = ((ServerSession)session).recoverDtx();
+        result.setInDoubt(inDoubt);
+        session.executionResult(method.getId(), result);
+    }
+
+    @Override
+    public void dtxRollback(Session session, DtxRollback method)
+    {
+
+        XaResult result = new XaResult();
+        result.setStatus(DtxXaStatus.XA_OK);
+        try
+        {
+            try
+            {
+                ((ServerSession)session).rollbackDtx(method.getXid());
+            }
+            catch (TimeoutDtxException e)
+            {
+                result.setStatus(DtxXaStatus.XA_RBTIMEOUT);
+            }
+            session.executionResult(method.getId(), result);
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+        }
+        catch(IncorrectDtxStateException e)
+        {
+            exception(session, method, ExecutionErrorCode.ILLEGAL_STATE, e.getMessage());
+        }
+        catch(AMQStoreException e)
+        {
+            exception(session, method, ExecutionErrorCode.INTERNAL_ERROR, e.getMessage());
+        }
+    }
+
+    @Override
+    public void dtxSetTimeout(Session session, DtxSetTimeout method)
+    {
+        try
+        {
+            ((ServerSession)session).setTimeoutDtx(method.getXid(), method.getTimeout());
+        }
+        catch(UnknownDtxBranchException e)
+        {
+            exception(session, method, ExecutionErrorCode.NOT_FOUND, e.getMessage());
+        }
+    }
 
     @Override
     public void executionSync(final Session ssn, final ExecutionSync sync)
@@ -474,48 +716,96 @@ public class ServerSessionDelegate exten
         {
             if (exchange == null)
             {
-                ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
-                ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+                if(exchangeName.startsWith("amq."))
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                              "Attempt to declare exchange: " + exchangeName +
+                              " which begins with reserved prefix 'amq.'.");
+                }
+                else if(exchangeName.startsWith("qpid."))
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                              "Attempt to declare exchange: " + exchangeName +
+                              " which begins with reserved prefix 'qpid.'.");
+                }
+                else
+                {
+                    ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
+                    ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
 
 
 
-                try
-                {
+                    try
+                    {
 
-                    exchange = exchangeFactory.createExchange(method.getExchange(),
-                                                              method.getType(),
-                                                              method.getDurable(),
-                                                              method.getAutoDelete());
+                        exchange = exchangeFactory.createExchange(method.getExchange(),
+                                                                  method.getType(),
+                                                                  method.getDurable(),
+                                                                  method.getAutoDelete());
 
-                    String alternateExchangeName = method.getAlternateExchange();
-                    if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+                        String alternateExchangeName = method.getAlternateExchange();
+                        boolean validAlternate;
+                        if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+                        {
+                            Exchange alternate = getExchange(session, alternateExchangeName);
+                            if(alternate == null)
+                            {
+                                validAlternate = false;
+                            }
+                            else
+                            {
+                                exchange.setAlternateExchange(alternate);
+                                validAlternate = true;
+                            }
+                        }
+                        else
+                        {
+                            validAlternate = true;
+                        }
+
+                        if(validAlternate)
+                        {
+                            if (exchange.isDurable())
+                            {
+                                DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+                                store.createExchange(exchange);
+                            }
+
+                            exchangeRegistry.registerExchange(exchange);
+                        }
+                        else
+                        {
+                            exception(session, method, ExecutionErrorCode.NOT_FOUND,
+                                        "Unknown alternate exchange " + alternateExchangeName);
+                        }
+                    }
+                    catch(AMQUnknownExchangeType e)
                     {
-                        Exchange alternate = getExchange(session, alternateExchangeName);
-                        exchange.setAlternateExchange(alternate);
+                        exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
                     }
-
-                    if (exchange.isDurable())
+                    catch (AMQException e)
                     {
-                        DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
-                        store.createExchange(exchange);
+                        exception(session, method, e, "Cannot declare exchange '" + exchangeName);
                     }
-
-                    exchangeRegistry.registerExchange(exchange);
-                }
-                catch(AMQUnknownExchangeType e)
-                {
-                    exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
-                }
-                catch (AMQException e)
-                {
-                    exception(session, method, e, "Cannot declare exchange '" + exchangeName);
                 }
             }
             else
             {
                 if(!exchange.getTypeShortString().toString().equals(method.getType()))
                 {
-                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +".");
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                            "Attempt to redeclare exchange: " + exchangeName
+                                    + " of type " + exchange.getTypeShortString()
+                                    + " to " + method.getType() +".");
+                }
+                else if(method.hasAlternateExchange()
+                          && (exchange.getAlternateExchange() == null ||
+                              !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+                {
+                    exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+                            "Attempt to change alternate exchange of: " + exchangeName
+                                    + " from " + exchange.getAlternateExchange()
+                                    + " to " + method.getAlternateExchange() +".");
                 }
             }
 
@@ -732,10 +1022,7 @@ public class ServerSessionDelegate exten
             }
             else
             {
-                AMQShortString routingKey = new AMQShortString(method.getBindingKey());
-                FieldTable fieldTable = FieldTable.convertToFieldTable(method.getArguments());
-
-                if (!exchange.isBound(routingKey, fieldTable, queue))
+                if (!exchange.isBound(method.getBindingKey(), method.getArguments(), queue))
                 {
                     try
                     {
@@ -847,12 +1134,6 @@ public class ServerSessionDelegate exten
                 if(method.hasBindingKey())
                 {
 
-                    if(method.hasArguments())
-                    {
-                        FieldTable args = FieldTable.convertToFieldTable(method.getArguments());
-                        
-                        result.setArgsNotMatched(!exchange.isBound(new AMQShortString(method.getBindingKey()), args, queue));
-                    }
                     if(queueMatched)
                     {
                         result.setKeyNotMatched(!exchange.isBound(method.getBindingKey(), queue));
@@ -861,23 +1142,28 @@ public class ServerSessionDelegate exten
                     {
                         result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
                     }
+
+                    if(method.hasArguments())
+                    {
+                        result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queueMatched ? queue : null));
+                    }
+
                 }
                 else if (method.hasArguments())
                 {
-                    // TODO
-
+                    result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), queueMatched ? queue : null));
                 }
 
-                result.setQueueNotMatched(!exchange.isBound(queue));
-
             }
             else if(exchange != null && method.hasBindingKey())
             {
+                result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
                 if(method.hasArguments())
                 {
-                    // TODO
+                    result.setArgsNotMatched(!exchange.isBound(result.getKeyNotMatched() ? null : method.getBindingKey(), method.getArguments(), queue));
                 }
-                result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
+
 
             }
 
@@ -886,11 +1172,15 @@ public class ServerSessionDelegate exten
         {
             if(method.hasArguments())
             {
-                // TODO
+                result.setArgsNotMatched(!exchange.isBound(method.getBindingKey(), method.getArguments(), null));
             }
             result.setKeyNotMatched(!exchange.isBound(method.getBindingKey()));
 
         }
+        else if(exchange != null && method.hasArguments())
+        {
+            result.setArgsNotMatched(!exchange.isBound(null, method.getArguments(), null));
+        }
 
 
         session.executionResult((int) method.getId(), result);
@@ -1134,6 +1424,10 @@ public class ServerSessionDelegate exten
                 {
                     exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
                 }
+                else if(queue.isExclusive() && queue.getExclusiveOwningSession()  != null && queue.getExclusiveOwningSession() != session)
+                {
+                    exception(session,method,ExecutionErrorCode.RESOURCE_LOCKED, "Exclusive Queue: " + queueName + " owned exclusively by another session");
+                }
                 else if (method.getIfEmpty() && !queue.isEmpty())
                 {
                     exception(session, method, ExecutionErrorCode.PRECONDITION_FAILED, "Queue " + queueName + " not empty");
@@ -1280,8 +1574,9 @@ public class ServerSessionDelegate exten
 
         ServerSession serverSession = (ServerSession)session;
 
-        serverSession.unregisterSubscriptions();
+        serverSession.stopSubscriptions();
         serverSession.onClose();
+        serverSession.unregisterSubscriptions();
     }
 
     @Override

Propchange: qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 27 17:40:42 2012
@@ -1,2 +1,3 @@
 /qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:930288
 /qpid/branches/qpid-2935/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1061302-1072333
+/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:1291399-1294217

Modified: qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Mon Feb 27 17:40:42 2012
@@ -20,6 +20,9 @@
 */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledFuture;
 import org.apache.qpid.common.Closeable;
 import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.configuration.ConfigStore;
@@ -33,13 +36,10 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.stats.StatisticsGatherer;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
-
-import java.util.Map;
-import java.util.UUID;
+import org.apache.qpid.server.txn.DtxRegistry;
 
 public interface VirtualHost extends DurableConfigurationStore.Source, VirtualHostConfig, Closeable, StatisticsGatherer
 {
@@ -59,8 +59,6 @@ public interface VirtualHost extends Dur
 
     DurableConfigurationStore getDurableConfigurationStore();
 
-    AuthenticationManager getAuthenticationManager();
-
     SecurityManager getSecurityManager();
 
     void close();
@@ -96,5 +94,9 @@ public interface VirtualHost extends Dur
 
     ConfigStore getConfigStore();
 
+    DtxRegistry getDtxRegistry();
+
     void removeBrokerConnection(BrokerLink brokerLink);
+
+    ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
 }

Modified: qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Mon Feb 27 17:40:42 2012
@@ -20,9 +20,18 @@
 */
 package org.apache.qpid.server.virtualhost;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
 import org.apache.log4j.Logger;
-
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQStoreException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.binding.BindingFactory;
@@ -32,29 +41,26 @@ import org.apache.qpid.server.logging.ac
 import org.apache.qpid.server.logging.messages.TransactionLogMessages;
 import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
 import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.AbstractServerMessageImpl;
 import org.apache.qpid.server.message.EnqueableMessage;
 import org.apache.qpid.server.message.MessageTransferMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.txn.DtxBranch;
+import org.apache.qpid.server.txn.DtxRegistry;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.transport.Xid;
+import org.apache.qpid.transport.util.Functions;
 import org.apache.qpid.util.ByteBufferInputStream;
 
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.UUID;
-
 public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHandler,
                                                         ConfigurationRecoveryHandler.QueueRecoveryHandler,
                                                         ConfigurationRecoveryHandler.ExchangeRecoveryHandler,
@@ -63,7 +69,8 @@ public class VirtualHostConfigRecoveryHa
                                                         MessageStoreRecoveryHandler,
                                                         MessageStoreRecoveryHandler.StoredMessageRecoveryHandler,
                                                         TransactionLogRecoveryHandler,
-                                                        TransactionLogRecoveryHandler.QueueEntryRecoveryHandler
+                                                        TransactionLogRecoveryHandler.QueueEntryRecoveryHandler,
+                                                        TransactionLogRecoveryHandler.DtxRecordRecoveryHandler
 {
     private static final Logger _logger = Logger.getLogger(VirtualHostConfigRecoveryHandler.class);
 
@@ -76,7 +83,7 @@ public class VirtualHostConfigRecoveryHa
     private MessageStore _store;
 
     private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
-    private Map<Long, ServerMessage> _recoveredMessages = new HashMap<Long, ServerMessage>();
+    private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
     private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
 
 
@@ -158,7 +165,7 @@ public class VirtualHostConfigRecoveryHa
 
     public void message(StoredMessage message)
     {
-        ServerMessage serverMessage;
+        AbstractServerMessageImpl serverMessage;
         switch(message.getMetaData().getType())
         {
             case META_DATA_0_8:
@@ -193,6 +200,164 @@ public class VirtualHostConfigRecoveryHa
     {
     }
 
+    public void dtxRecord(long format, byte[] globalId, byte[] branchId,
+                          MessageStore.Transaction.Record[] enqueues,
+                          MessageStore.Transaction.Record[] dequeues)
+    {
+        Xid id = new Xid(format, globalId, branchId);
+        DtxRegistry dtxRegistry = _virtualHost.getDtxRegistry();
+        DtxBranch branch = dtxRegistry.getBranch(id);
+        if(branch == null)
+        {
+            branch = new DtxBranch(id, _store, _virtualHost);
+            dtxRegistry.registerBranch(branch);
+        }
+        for(MessageStore.Transaction.Record record : enqueues)
+        {
+            final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+            if(queue != null)
+            {
+                final long messageId = record.getMessage().getMessageNumber();
+                final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+                _unusedMessages.remove(messageId);
+
+                if(message != null)
+                {
+                    message.incrementReference();
+
+                    branch.enqueue(queue,message);
+
+                    branch.addPostTransactionAcion(new ServerTransaction.Action()
+                    {
+
+                        public void postCommit()
+                        {
+                            try
+                            {
+
+                                queue.enqueue(message, true, null);
+                                message.decrementReference();
+                            }
+                            catch (AMQException e)
+                            {
+                                _logger.error("Unable to enqueue message " + message.getMessageNumber() + " into " +
+                                              "queue " + queue.getName() + " (from XA transaction)", e);
+                                throw new RuntimeException(e);
+                            }
+                        }
+
+                        public void onRollback()
+                        {
+                            message.decrementReference();
+                        }
+                    });
+                }
+                else
+                {
+                    StringBuilder xidString = xidAsString(id);
+                    String messageNumberString = String.valueOf(message.getMessageNumber());
+                    CurrentActor.get().message(_logSubject,
+                                               TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+                                                                                            messageNumberString));
+                    
+                }
+
+            }
+            else
+            {
+                StringBuilder xidString = xidAsString(id);
+                CurrentActor.get().message(_logSubject,
+                                           TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+                                                                                      record.getQueue().getResourceName()));
+
+            }
+        }
+        for(MessageStore.Transaction.Record record : dequeues)
+        {
+            final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+            if(queue != null)
+            {
+                final long messageId = record.getMessage().getMessageNumber();
+                final AbstractServerMessageImpl message = _recoveredMessages.get(messageId);
+                _unusedMessages.remove(messageId);
+
+                if(message != null)
+                {
+                    final QueueEntry entry = queue.getMessageOnTheQueue(messageId);
+                    
+                    entry.acquire();
+                    
+                    branch.dequeue(queue, message);
+
+                    branch.addPostTransactionAcion(new ServerTransaction.Action()
+                    {
+
+                        public void postCommit()
+                        {
+                            entry.discard();
+                        }
+
+                        public void onRollback()
+                        {
+                            entry.release();
+                        }
+                    });
+                }
+                else
+                {
+                    StringBuilder xidString = xidAsString(id);
+                    String messageNumberString = String.valueOf(message.getMessageNumber());
+                    CurrentActor.get().message(_logSubject,
+                                               TransactionLogMessages.XA_INCOMPLETE_MESSAGE(xidString.toString(),
+                                                                                            messageNumberString));
+
+                }
+
+            }
+            else
+            {
+                StringBuilder xidString = xidAsString(id);
+                CurrentActor.get().message(_logSubject,
+                                           TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
+                                                                                      queue.getName()));
+            }
+
+        }
+
+        try
+        {
+            branch.setState(DtxBranch.State.PREPARED);
+            branch.prePrepareTransaction();
+        }
+        catch (AMQStoreException e)
+        {
+            _logger.error("Unexpected database exception when attempting to prepare a recovered XA transaction " +
+                          xidAsString(id), e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static StringBuilder xidAsString(Xid id)
+    {
+        return new StringBuilder("(")
+                    .append(id.getFormat())
+                    .append(',')
+                    .append(Functions.str(id.getGlobalId()))
+                    .append(',')
+                    .append(Functions.str(id.getBranchId()))
+                    .append(')');
+    }
+
+    public void completeDtxRecordRecovery()
+    {
+        for(StoredMessage m : _unusedMessages.values())
+        {
+            _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
+            m.remove();
+        }
+        CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+    }
+
     private static final class ProcessAction
     {
         private final AMQQueue _queue;
@@ -349,15 +514,9 @@ public class VirtualHostConfigRecoveryHa
 
     }
 
-    public void completeQueueEntryRecovery()
+    public DtxRecordRecoveryHandler completeQueueEntryRecovery()
     {
 
-        for(StoredMessage m : _unusedMessages.values())
-        {
-            _logger.warn("Message id " + m.getMessageNumber() + " in store, but not in any queue - removing....");
-            m.remove();
-        }
-
         for(Map.Entry<String,Integer> entry : _queueRecoveries.entrySet())
         {
             CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERED(entry.getValue(), entry.getKey()));
@@ -365,7 +524,9 @@ public class VirtualHostConfigRecoveryHa
             CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(entry.getKey(), true));
         }
 
-        CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
+
+
+        return this;
     }
 
     private static class DummyMessage implements EnqueableMessage

Modified: qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Mon Feb 27 17:40:42 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.concurrent.ScheduledFuture;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.log4j.Logger;
@@ -61,11 +62,11 @@ import org.apache.qpid.server.queue.Queu
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.registry.IApplicationRegistry;
 import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
 import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
 
@@ -95,12 +96,12 @@ public class VirtualHostImpl implements 
 
     private MessageStore _messageStore;
 
+    private DtxRegistry _dtxRegistry;
+
     private VirtualHostMBean _virtualHostMBean;
 
     private AMQBrokerManagerMBean _brokerMBean;
 
-    private final AuthenticationManager _authenticationManager;
-
     private SecurityManager _securityManager;
 
     private final ScheduledThreadPoolExecutor _houseKeepingTasks;
@@ -118,6 +119,7 @@ public class VirtualHostImpl implements 
     private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
     private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
 
+
     public IConnectionRegistry getConnectionRegistry()
     {
         return _connectionRegistry;
@@ -188,6 +190,7 @@ public class VirtualHostImpl implements 
         _broker = _appRegistry.getBroker();
         _configuration = hostConfig;
         _name = _configuration.getName();
+        _dtxRegistry = new DtxRegistry();
 
         _id = _appRegistry.getConfigStore().createId();
 
@@ -238,7 +241,6 @@ public class VirtualHostImpl implements 
 			initialiseMessageStore(hostConfig);
         }
 		
-        _authenticationManager = ApplicationRegistry.getInstance().getAuthenticationManager();
 
         _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
         _brokerMBean.register();
@@ -351,6 +353,11 @@ public class VirtualHostImpl implements 
                                                TimeUnit.MILLISECONDS);
     }
 
+    public ScheduledFuture<?> scheduleTask(long delay, Runnable task)
+    {
+        return _houseKeepingTasks.schedule(task, delay, TimeUnit.MILLISECONDS);
+    }
+
     public long getHouseKeepingTaskCount()
     {
         return _houseKeepingTasks.getTaskCount();
@@ -574,11 +581,6 @@ public class VirtualHostImpl implements 
         return _durableConfigurationStore;
     }
 
-    public AuthenticationManager getAuthenticationManager()
-    {
-        return _authenticationManager;
-    }
-
     public SecurityManager getSecurityManager()
     {
         return _securityManager;
@@ -617,6 +619,11 @@ public class VirtualHostImpl implements 
             }
         }
 
+        if(_dtxRegistry != null)
+        {
+            _dtxRegistry.close();
+        }
+
         //Close MessageStore
         if (_messageStore != null)
         {
@@ -784,6 +791,11 @@ public class VirtualHostImpl implements 
         return getApplicationRegistry().getConfigStore();
     }
 
+    public DtxRegistry getDtxRegistry()
+    {
+        return _dtxRegistry;
+    }
+
     /**
      * Temporary Startup RT class to record the creation of persistent queues / exchanges.
      *

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java Mon Feb 27 17:40:42 2012
@@ -34,7 +34,6 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.AuthorizationHolder;
 import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
@@ -357,12 +356,12 @@ public class MockAMQQueue implements AMQ
         return null;
     }
 
-    public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
+    public void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName)
     {
       
     }
 
-    public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName, ServerTransaction storeContext)
+    public void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName)
     {
       
     }

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java Mon Feb 27 17:40:42 2012
@@ -45,7 +45,6 @@ import org.apache.qpid.server.store.Test
 import org.apache.qpid.server.subscription.MockSubscription;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.InternalBrokerBaseCase;
 import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -839,7 +838,7 @@ public class SimpleAMQQueueTest extends 
 
     /**
      * Tests that dequeued message is not copied as part of invocation of
-     * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String, ServerTransaction)}
+     * {@link SimpleAMQQueue#copyMessagesToAnotherQueue(long, long, String)}
      */
     public void testCopyMessagesWithDequeuedEntry()
     {
@@ -856,14 +855,8 @@ public class SimpleAMQQueueTest extends 
         // create another queue
         SimpleAMQQueue queue = createQueue(anotherQueueName);
 
-        // create transaction
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
         // copy messages into another queue
-        _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
-
-        // commit transaction
-        txn.commit();
+        _queue.copyMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
 
         // get messages on another queue
         List<QueueEntry> entries = queue.getMessagesOnTheQueue();
@@ -889,7 +882,7 @@ public class SimpleAMQQueueTest extends 
 
     /**
      * Tests that dequeued message is not moved as part of invocation of
-     * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String, ServerTransaction)}
+     * {@link SimpleAMQQueue#moveMessagesToAnotherQueue(long, long, String)}
      */
     public void testMovedMessagesWithDequeuedEntry()
     {
@@ -906,14 +899,8 @@ public class SimpleAMQQueueTest extends 
         // create another queue
         SimpleAMQQueue queue = createQueue(anotherQueueName);
 
-        // create transaction
-        ServerTransaction txn = new LocalTransaction(_queue.getVirtualHost().getMessageStore());
-
         // move messages into another queue
-        _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName, txn);
-
-        // commit transaction
-        txn.commit();
+        _queue.moveMessagesToAnotherQueue(0, messageNumber, anotherQueueName);
 
         // get messages on another queue
         List<QueueEntry> entries = queue.getMessagesOnTheQueue();

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/registry/ApplicationRegistryShutdownTest.java Mon Feb 27 17:40:42 2012
@@ -80,11 +80,10 @@ public class ApplicationRegistryShutdown
             }
         }
 
-        // Not using isEmpty as that is not in Java 5
-        assertTrue("No new SASL mechanisms added by initialisation.", additions.size() != 0 );
+        assertFalse("No new SASL mechanisms added by initialisation.", additions.isEmpty());
 
         //Close the registry which will perform the close the AuthenticationManager
-        getRegistry().close();
+        stopBroker();
 
         //Validate that the SASL plugFins have been removed.
         Provider[] providersAfterClose = Security.getProviders();

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java Mon Feb 27 17:40:42 2012
@@ -146,6 +146,14 @@ public class SkeletonMessageStore implem
             {
 
             }
+
+            public void removeXid(long format, byte[] globalId, byte[] branchId)
+            {
+            }
+
+            public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+            {
+            }
         };
     }
 

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Mon Feb 27 17:40:42 2012
@@ -100,6 +100,14 @@ public class TestableMemoryMessageStore 
         public void abortTran() throws AMQStoreException
         {
         }
+
+        public void removeXid(long format, byte[] globalId, byte[] branchId)
+        {
+        }
+
+        public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+        {
+        }
     }
 
 

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java Mon Feb 27 17:40:42 2012
@@ -112,6 +112,14 @@ class MockStoreTransaction implements Tr
         _state = TransactionState.ABORTED;
     }
 
+    public void removeXid(long format, byte[] globalId, byte[] branchId)
+    {
+    }
+
+    public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+    {
+    }
+
     public static MessageStore createTestTransactionLog(final MockStoreTransaction storeTransaction)
     {
         return new MessageStore()

Modified: qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java (original)
+++ qpid/branches/QPID-3799-acl/java/broker/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java Mon Feb 27 17:40:42 2012
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.virtualhost;
 
+import java.util.concurrent.ScheduledFuture;
 import org.apache.qpid.server.binding.BindingFactory;
 import org.apache.qpid.server.configuration.BrokerConfig;
 import org.apache.qpid.server.configuration.ConfigStore;
@@ -39,6 +40,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.stats.StatisticsCounter;
 import org.apache.qpid.server.store.DurableConfigurationStore;
 import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.DtxRegistry;
 
 import java.util.Map;
 import java.util.UUID;
@@ -94,6 +96,11 @@ public class MockVirtualHost implements 
         return null;
     }
 
+    public DtxRegistry getDtxRegistry()
+    {
+        return null;
+    }
+
     public VirtualHostConfiguration getConfiguration()
     {
         return null;
@@ -170,6 +177,11 @@ public class MockVirtualHost implements 
 
     }
 
+    public ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask)
+    {
+        return null;
+    }
+
     public void scheduleHouseKeepingTask(long period, HouseKeepingTask task)
     {
 

Modified: qpid/branches/QPID-3799-acl/java/build.deps
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/build.deps?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/build.deps (original)
+++ qpid/branches/QPID-3799-acl/java/build.deps Mon Feb 27 17:40:42 2012
@@ -48,10 +48,9 @@ jetty-util=lib/jetty-util-6.1.14.jar
 jetty-servlet-tester=lib/jetty-servlet-tester-6.1.14.jar
 servlet-api=lib/servlet-api.jar
 
-osgi-core=lib/org.osgi.core-1.0.0.jar
-felix-framework=lib/org.apache.felix.framework-2.0.5.jar
+felix-main=lib/org.apache.felix.main-2.0.5.jar
 
-felix.libs=${osgi-core} ${felix-framework}
+felix.libs=${felix-main}
 
 commons-configuration.libs = ${commons-beanutils-core} ${commons-digester} \
   ${commons-codec} ${commons-lang} ${commons-collections} ${commons-configuration}

Modified: qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java (original)
+++ qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQQueueBrowser.java Mon Feb 27 17:40:42 2012
@@ -20,6 +20,12 @@
  */
 package org.apache.qpid.client;
 
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInternalException;
+import org.apache.qpid.client.filter.JMSSelectorFilter;
+import org.apache.qpid.protocol.AMQConstant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,12 +54,50 @@ public class AMQQueueBrowser implements 
         _session = session;
         _queue = queue;
         _messageSelector = ((messageSelector == null) || (messageSelector.trim().length() == 0)) ? null : messageSelector;
-        // Create Consumer to verify message selector.
-        BasicMessageConsumer consumer =
-                (BasicMessageConsumer) _session.createBrowserConsumer(_queue, _messageSelector, false);
-        // Close this consumer as we are not looking to consume only to establish that, at least for now,
-        // the QB can be created
-        consumer.close();
+
+
+        validateQueue((AMQDestination) queue);
+
+        if(_messageSelector != null)
+        {
+            validateSelector(_messageSelector);
+        }
+    }
+
+    private void validateSelector(String messageSelector) throws InvalidSelectorException
+    {
+        try
+        {
+            new JMSSelectorFilter(messageSelector);
+        }
+        catch (AMQInternalException e)
+        {
+            throw new InvalidSelectorException(e.getMessage());
+        }
+    }
+
+    private void validateQueue(AMQDestination queue) throws JMSException
+    {
+        try
+        {
+            // Essentially just test the connection/session is still active
+            _session.sync();
+            // TODO - should really validate queue exists, but we often rely on creating the consumer to create the queue :(
+            // _session.declareQueuePassive( queue );
+        }
+        catch (AMQException e)
+        {
+            if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+            {
+                throw new InvalidDestinationException(e.getMessage());
+            }
+            else
+            {
+                final JMSException jmsException = new JMSException(e.getMessage(), String.valueOf(e.getErrorCode().getCode()));
+                jmsException.setLinkedException(e);
+                throw jmsException;
+            }
+        }
     }
 
     public Queue getQueue() throws JMSException
@@ -118,12 +162,12 @@ public class AMQQueueBrowser implements 
                 _consumer = consumer;
                 prefetchMessage();
             }
-            _logger.info("QB:created with first element:" + _nextMessage);
+            _logger.debug("QB:created with first element:" + _nextMessage);
         }
 
         public boolean hasMoreElements()
         {
-            _logger.info("QB:hasMoreElements:" + (_nextMessage != null));
+            _logger.debug("QB:hasMoreElements:" + (_nextMessage != null));
             return (_nextMessage != null);
         }
 
@@ -136,9 +180,9 @@ public class AMQQueueBrowser implements 
             }
             try
             {
-                _logger.info("QB:nextElement about to receive");
+                _logger.debug("QB:nextElement about to receive");
                 prefetchMessage();
-                _logger.info("QB:nextElement received:" + _nextMessage);
+                _logger.debug("QB:nextElement received:" + _nextMessage);
             }
             catch (JMSException e)
             {

Modified: qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Mon Feb 27 17:40:42 2012
@@ -120,18 +120,6 @@ public abstract class AMQSession<C exten
     public static final long DEFAULT_FLOW_CONTROL_WAIT_FAILURE = 120000L;
 
     /**
-     * The default value for immediate flag used by producers created by this session is false. That is, a consumer does
-     * not need to be attached to a queue.
-     */
-    private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
-
-    /**
-     * The default value for mandatory flag used by producers created by this session is true. That is, server will not
-     * silently drop messages where no queue is connected to the exchange for the message.
-     */
-    private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
-
-    /**
      * The period to wait while flow controlled before sending a log message confirming that the session is still
      * waiting on flow control being revoked
      */
@@ -1198,12 +1186,12 @@ public abstract class AMQSession<C exten
 
     public P createProducer(Destination destination) throws JMSException
     {
-        return createProducerImpl(destination, _defaultMandatoryValue, _defaultImmediateValue);
+        return createProducerImpl(destination, null, null);
     }
 
     public P createProducer(Destination destination, boolean immediate) throws JMSException
     {
-        return createProducerImpl(destination, _defaultMandatoryValue, immediate);
+        return createProducerImpl(destination, null, immediate);
     }
 
     public P createProducer(Destination destination, boolean mandatory, boolean immediate)
@@ -1692,7 +1680,7 @@ public abstract class AMQSession<C exten
     {
         AMQProtocolHandler protocolHandler = getProtocolHandler();
         declareExchange(amqd, protocolHandler, false);
-        AMQShortString queueName = declareQueue(amqd, protocolHandler, false);
+        AMQShortString queueName = declareQueue(amqd, false);
         bindQueue(queueName, amqd.getRoutingKey(), new FieldTable(), amqd.getExchangeName(), amqd);
     }
 
@@ -2613,7 +2601,7 @@ public abstract class AMQSession<C exten
     public abstract void sendConsume(C consumer, AMQShortString queueName,
                                      AMQProtocolHandler protocolHandler, boolean nowait, int tag) throws AMQException, FailoverException;
 
-    private P createProducerImpl(final Destination destination, final boolean mandatory, final boolean immediate)
+    private P createProducerImpl(final Destination destination, final Boolean mandatory, final Boolean immediate)
             throws JMSException
     {
         return new FailoverRetrySupport<P, JMSException>(
@@ -2642,8 +2630,8 @@ public abstract class AMQSession<C exten
                 }, _connection).execute();
     }
 
-    public abstract P createMessageProducer(final Destination destination, final boolean mandatory,
-                                                               final boolean immediate, final long producerId) throws JMSException;
+    public abstract P createMessageProducer(final Destination destination, final Boolean mandatory,
+                                            final Boolean immediate, final long producerId) throws JMSException;
 
     private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
@@ -2726,6 +2714,12 @@ public abstract class AMQSession<C exten
     public abstract void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
                                              final boolean nowait) throws AMQException, FailoverException;
 
+
+    void declareQueuePassive(AMQDestination queue) throws AMQException
+    {
+        declareQueue(queue,false,false,true);
+    }
+
     /**
      * Declares a queue for a JMS destination.
      *
@@ -2735,27 +2729,35 @@ public abstract class AMQSession<C exten
      *
      * <p/>Note that this operation automatically retries in the event of fail-over.
      *
-     * @param amqd            The destination to declare as a queue.
-     * @param protocolHandler The protocol handler to communicate through.
      *
+     * @param amqd            The destination to declare as a queue.
      * @return The name of the decalred queue. This is useful where the broker is generating a queue name on behalf of
      *         the client.
      *
+     *
+     *
      * @throws AMQException If the queue cannot be declared for any reason.
      * @todo Verify the destiation is valid or throw an exception.
      * @todo Be aware of possible changes to parameter order as versions change.
      */
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+    protected AMQShortString declareQueue(final AMQDestination amqd,
                                           final boolean noLocal) throws AMQException
     {
-        return declareQueue(amqd, protocolHandler, noLocal, false);
+        return declareQueue(amqd, noLocal, false);
     }
 
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
+    protected AMQShortString declareQueue(final AMQDestination amqd,
                                           final boolean noLocal, final boolean nowait)
+                throws AMQException
+    {
+        return declareQueue(amqd, noLocal, nowait, false);
+    }
+
+    protected AMQShortString declareQueue(final AMQDestination amqd,
+                                          final boolean noLocal, final boolean nowait, final boolean passive)
             throws AMQException
     {
-        /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
         return new FailoverNoopSupport<AMQShortString, AMQException>(
                 new FailoverProtectedOperation<AMQShortString, AMQException>()
                 {
@@ -2767,7 +2769,7 @@ public abstract class AMQSession<C exten
                             amqd.setQueueName(protocolHandler.generateQueueName());
                         }
 
-                        sendQueueDeclare(amqd, protocolHandler, nowait);
+                        sendQueueDeclare(amqd, protocolHandler, nowait, passive);
 
                         return amqd.getAMQQueueName();
                     }
@@ -2775,7 +2777,7 @@ public abstract class AMQSession<C exten
     }
 
     public abstract void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean nowait) throws AMQException, FailoverException;
+                                          final boolean nowait, boolean passive) throws AMQException, FailoverException;
 
     /**
      * Undeclares the specified queue.
@@ -2916,7 +2918,7 @@ public abstract class AMQSession<C exten
     
             if (_delareQueues || amqd.isNameRequired())
             {
-                declareQueue(amqd, protocolHandler, consumer.isNoLocal(), nowait);
+                declareQueue(amqd, consumer.isNoLocal(), nowait);
             }
             bindQueue(amqd.getAMQQueueName(), amqd.getRoutingKey(), consumer.getArguments(), amqd.getExchangeName(), amqd, nowait);
         }

Modified: qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Mon Feb 27 17:40:42 2012
@@ -17,9 +17,20 @@
  */
 package org.apache.qpid.client;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination.AddressOption;
 import org.apache.qpid.client.AMQDestination.Binding;
@@ -27,7 +38,6 @@ import org.apache.qpid.client.AMQDestina
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.filter.MessageFilter;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.message.FieldTableSupport;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
@@ -42,28 +52,14 @@ import org.apache.qpid.framing.AMQShortS
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.transport.*;
-import org.apache.qpid.util.Serial;
-import org.apache.qpid.util.Strings;
-
 import static org.apache.qpid.transport.Option.BATCH;
 import static org.apache.qpid.transport.Option.NONE;
 import static org.apache.qpid.transport.Option.SYNC;
 import static org.apache.qpid.transport.Option.UNRELIABLE;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import java.lang.ref.WeakReference;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.qpid.util.Serial;
+import org.apache.qpid.util.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a 0.10 Session
@@ -654,8 +650,8 @@ public class AMQSession_0_10 extends AMQ
     /**
      * Create an 0_10 message producer
      */
-    public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final boolean mandatory,
-                                                      final boolean immediate, final long producerId) throws JMSException
+    public BasicMessageProducer_0_10 createMessageProducer(final Destination destination, final Boolean mandatory,
+                                                           final Boolean immediate, final long producerId) throws JMSException
     {
         try
         {
@@ -725,7 +721,7 @@ public class AMQSession_0_10 extends AMQ
      * Declare a queue with the given queueName
      */
     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                 final boolean nowait)
+                                 final boolean nowait, boolean passive)
             throws AMQException, FailoverException
     {
         // do nothing this is only used by 0_8
@@ -735,7 +731,7 @@ public class AMQSession_0_10 extends AMQ
      * Declare a queue with the given queueName
      */
     public AMQShortString send0_10QueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                               final boolean noLocal, final boolean nowait)
+                                               final boolean noLocal, final boolean nowait, boolean passive)
             throws AMQException
     {
         AMQShortString queueName;
@@ -761,7 +757,8 @@ public class AMQSession_0_10 extends AMQ
             getQpidSession().queueDeclare(queueName.toString(), "" , arguments,
                                           amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
                                           amqd.isDurable() ? Option.DURABLE : Option.NONE,
-                                          amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);   
+                                          amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE,
+                                          passive ? Option.PASSIVE : Option.NONE);
         }
         else
         {
@@ -931,11 +928,12 @@ public class AMQSession_0_10 extends AMQ
         return getCurrentException();
     }
 
-    protected AMQShortString declareQueue(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                          final boolean noLocal, final boolean nowait)
+    protected AMQShortString declareQueue(final AMQDestination amqd,
+                                          final boolean noLocal, final boolean nowait, final boolean passive)
             throws AMQException
     {
-        /*return new FailoverRetrySupport<AMQShortString, AMQException>(*/
+        final AMQProtocolHandler protocolHandler = getProtocolHandler();
+
         return new FailoverNoopSupport<AMQShortString, AMQException>(
                 new FailoverProtectedOperation<AMQShortString, AMQException>()
                 {
@@ -952,7 +950,7 @@ public class AMQSession_0_10 extends AMQ
                             amqd.setQueueName(new AMQShortString( binddingKey + "@"
                                     + amqd.getExchangeName().toString() + "_" + UUID.randomUUID()));
                         }
-                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait);
+                        return send0_10QueueDeclare(amqd, protocolHandler, noLocal, nowait, passive);
                     }
                 }, getAMQConnection()).execute();
     }
@@ -1209,7 +1207,7 @@ public class AMQSession_0_10 extends AMQ
                     else if(createNode)
                     {
                         setLegacyFiledsForQueueType(dest);
-                        send0_10QueueDeclare(dest,null,noLocal,noWait);
+                        send0_10QueueDeclare(dest,null,noLocal,noWait, false);
                         sendQueueBind(dest.getAMQQueueName(), dest.getRoutingKey(),
                                       null,dest.getExchangeName(),dest, false);
                         break;
@@ -1315,7 +1313,7 @@ public class AMQSession_0_10 extends AMQ
         }
         node.setExclusive(true);
         node.setAutoDelete(!node.isDurable());
-        send0_10QueueDeclare(dest,null,noLocal,true);
+        send0_10QueueDeclare(dest,null,noLocal,true, false);
         getQpidSession().exchangeBind(dest.getQueueName(), 
         		              dest.getAddressName(), 
         		              dest.getSubject(), 

Modified: qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=1294242&r1=1294241&r2=1294242&view=diff
==============================================================================
--- qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/branches/QPID-3799-acl/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Feb 27 17:40:42 2012
@@ -38,7 +38,6 @@ import org.apache.qpid.client.protocol.A
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
-import org.apache.qpid.client.filter.MessageFilter;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
 import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
@@ -401,9 +400,17 @@ public class AMQSession_0_8 extends AMQS
     }
 
     public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
-                                 final boolean nowait) throws AMQException, FailoverException
+                                 final boolean nowait, boolean passive) throws AMQException, FailoverException
     {
-        QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
+        QueueDeclareBody body =
+                getMethodRegistry().createQueueDeclareBody(getTicket(),
+                                                           amqd.getAMQQueueName(),
+                                                           passive,
+                                                           amqd.isDurable(),
+                                                           amqd.isExclusive(),
+                                                           amqd.isAutoDelete(),
+                                                           false,
+                                                           null);
 
         AMQFrame queueDeclare = body.generateFrame(getChannelId());
 
@@ -441,8 +448,8 @@ public class AMQSession_0_8 extends AMQS
     }
 
 
-    public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
-            final boolean immediate, long producerId) throws JMSException
+    public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final Boolean mandatory,
+            final Boolean immediate, long producerId) throws JMSException
     {
        try
        {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message