qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r829675 [6/11] - in /qpid/trunk/qpid/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/ broker/bin/ broker/src/main/java/org/apac...
Date Sun, 25 Oct 2009 22:59:05 GMT
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Sun Oct 25 22:58:57 2009
@@ -3,12 +3,9 @@
 import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.management.JMException;
 
@@ -21,13 +18,12 @@
 import org.apache.qpid.server.configuration.QueueConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.output.ProtocolOutputConverter;
 import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.subscription.Subscription;
 import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.actors.QueueActor;
 import org.apache.qpid.server.logging.subjects.QueueLogSubject;
@@ -35,6 +31,9 @@
 import org.apache.qpid.server.logging.LogActor;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.LocalTransaction;
 
 /*
 *
@@ -60,24 +59,40 @@
 {
     private static final Logger _logger = Logger.getLogger(SimpleAMQQueue.class);
 
+
+    private final VirtualHost _virtualHost;
+
     private final AMQShortString _name;
+    private final String _resourceName;
 
     /** null means shared */
     private final AMQShortString _owner;
 
+    private PrincipalHolder _prinicpalHolder;
+
+    private Object _exclusiveOwner;
+
+
     private final boolean _durable;
 
     /** If true, this queue is deleted when the last subscriber is removed */
     private final boolean _autoDelete;
 
-    private final VirtualHost _virtualHost;
+    private Exchange _alternateExchange;
 
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     private final ExchangeBindings _bindings = new ExchangeBindings(this);
 
-    private final AtomicBoolean _deleted = new AtomicBoolean(false);
 
-    private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+    protected final QueueEntryList _entries;
+
+    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
+
+    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
+
+    private volatile Subscription _exclusiveSubscriber;
+
+
 
     private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
 
@@ -85,18 +100,10 @@
 
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
-    protected final SubscriptionList _subscriptionList = new SubscriptionList(this);
-    private final AtomicReference<SubscriptionList.SubscriptionNode> _lastSubscriptionNode = new AtomicReference<SubscriptionList.SubscriptionNode>(_subscriptionList.getHead());
-
-    private volatile Subscription _exclusiveSubscriber;
+    private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
-    protected final QueueEntryList _entries;
 
-    private final AMQQueueMBean _managedObject;
-    private final Executor _asyncDelivery;
-    private final AtomicLong _totalMessagesReceived = new AtomicLong();
 
-    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
 
     /** max allowed size(KB) of a single message */
     public long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
@@ -113,24 +120,38 @@
     /** the minimum interval between sending out consecutive alerts of the same type */
     public long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
 
-    private static final int MAX_ASYNC_DELIVERIES = 10;
+    private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+
+    private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
 
     private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
 
+
+    static final int MAX_ASYNC_DELIVERIES = 10;
+
+
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
     private AtomicReference _asynchronousRunner = new AtomicReference(null);
+    private final Executor _asyncDelivery;
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
+
+    private final ConcurrentMap<AMQChannel, Boolean> _blockedChannels = new ConcurrentHashMap<AMQChannel, Boolean>();
+
+    private final AtomicBoolean _deleted = new AtomicBoolean(false);
+    private final List<Task> _deleteTaskList = new CopyOnWriteArrayList<Task>();
+
+
     private LogSubject _logSubject;
     private LogActor _logActor;
 
+    private AMQQueueMBean _managedObject;
+    private static final String SUB_FLUSH_RUNNER = "SUB_FLUSH_RUNNER";
+    private boolean _nolocal;
 
-    private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
-    private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
     private final AtomicBoolean _overfull = new AtomicBoolean(false);
 
     protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
-            throws AMQException
     {
         this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
     }
@@ -141,7 +162,6 @@
                              boolean autoDelete,
                              VirtualHost virtualHost,
                              QueueEntryListFactory entryListFactory)
-            throws AMQException
     {
 
         if (name == null)
@@ -155,6 +175,7 @@
         }
 
         _name = name;
+        _resourceName = String.valueOf(name);
         _durable = durable;
         _owner = owner;
         _autoDelete = autoDelete;
@@ -193,13 +214,19 @@
         }
         catch (JMException e)
         {
-            throw new AMQException("AMQQueue MBean creation has failed ", e);
+            _logger.error("AMQQueue MBean creation has failed ", e);
         }
 
         resetNotifications();
 
     }
 
+    public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, VirtualHost virtualHost)
+            throws AMQException
+    {
+        this(new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner),autoDelete,virtualHost);
+    }
+
     public void resetNotifications()
     {
         // This ensure that the notification checks for the configured alerts are created.
@@ -211,16 +238,54 @@
 
     // ------ Getters and Setters
 
+    public void execute(ReadWriteRunnable runnable)
+    {
+        _asyncDelivery.execute(runnable);
+    }
+
     public AMQShortString getName()
     {
         return _name;
     }
 
+    public void setNoLocal(boolean nolocal)
+    {
+        _nolocal = nolocal;
+    }
+
     public boolean isDurable()
     {
         return _durable;
     }
 
+    public boolean isExclusive()
+    {
+        return _exclusiveOwner != null;
+    }
+
+    public Exchange getAlternateExchange()
+    {
+        return _alternateExchange;
+    }
+
+    public void setAlternateExchange(Exchange exchange)
+    {
+        if(_alternateExchange != null)
+        {
+            _alternateExchange.removeReference(this);
+        }
+        if(exchange != null)
+        {
+            exchange.addReference(this);
+        }
+        _alternateExchange = exchange;
+    }
+
+    public Map<String, Object> getArguments()
+    {
+        return null;
+    }
+
     public boolean isAutoDelete()
     {
         return _autoDelete;
@@ -231,6 +296,17 @@
         return _owner;
     }
 
+    public PrincipalHolder getPrincipalHolder()
+    {
+        return _prinicpalHolder;
+    }
+
+    public void setPrincipalHolder(PrincipalHolder prinicpalHolder)
+    {
+        _prinicpalHolder = prinicpalHolder;
+    }
+
+
     public VirtualHost getVirtualHost()
     {
         return _virtualHost;
@@ -238,13 +314,31 @@
 
     // ------ bind and unbind
 
+    public void bind(Exchange exchange, String bindingKey, Map<String, Object> arguments) throws AMQException
+    {
+
+        FieldTable fieldTable = FieldTable.convertToFieldTable(arguments);
+        AMQShortString routingKey = new AMQShortString(bindingKey);
+
+        exchange.registerQueue(routingKey, this, fieldTable);
+
+        if (isDurable() && exchange.isDurable())
+        {
+
+            _virtualHost.getDurableConfigurationStore().bindQueue(exchange, routingKey, this, fieldTable);
+        }
+
+        _bindings.addBinding(routingKey, fieldTable, exchange);
+    }
+
+
     public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
     {
 
         exchange.registerQueue(routingKey, this, arguments);
         if (isDurable() && exchange.isDurable())
         {
-            _virtualHost.getMessageStore().bindQueue(exchange, routingKey, this, arguments);
+            _virtualHost.getDurableConfigurationStore().bindQueue(exchange, routingKey, this, arguments);
         }
 
         _bindings.addBinding(routingKey, arguments, exchange);
@@ -255,7 +349,7 @@
         exchange.deregisterQueue(routingKey, this, arguments);
         if (isDurable() && exchange.isDurable())
         {
-            _virtualHost.getMessageStore().unbindQueue(exchange, routingKey, this, arguments);
+            _virtualHost.getDurableConfigurationStore().unbindQueue(exchange, routingKey, this, arguments);
         }
 
         boolean removed = _bindings.remove(routingKey, arguments, exchange);
@@ -295,11 +389,15 @@
 
         _activeSubscriberCount.incrementAndGet();
         subscription.setStateListener(this);
-        subscription.setLastSeenEntry(null, _entries.getHead());
+        subscription.setQueueContext(new QueueContext(_entries.getHead()));
 
         if (!isDeleted())
         {
             subscription.setQueue(this, exclusive);
+            if(_nolocal)
+            {
+                subscription.setNoLocal(_nolocal);
+            }
             _subscriptionList.add(subscription);
             if (isDeleted())
             {
@@ -329,17 +427,11 @@
             subscription.close();
             // No longer can the queue have an exclusive consumer
             setExclusiveSubscriber(null);
-
-            QueueEntry lastSeen;
-
-            while ((lastSeen = subscription.getLastSeenEntry()) != null)
-            {
-                subscription.setLastSeenEntry(lastSeen, null);
-            }
+            subscription.setQueueContext(null);
 
             // auto-delete queues must be deleted if there are no remaining subscribers
 
-            if (_autoDelete && getConsumerCount() == 0)
+            if (_autoDelete && getConsumerCount() == 0  && !isExclusive())
             {
                 if (_logger.isInfoEnabled())
                 {
@@ -358,7 +450,7 @@
 
     // ------ Enqueue / Dequeue
 
-    public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
+    public QueueEntry enqueue(ServerMessage message) throws AMQException
     {
 
         incrementQueueCount();
@@ -378,14 +470,6 @@
                 entry = _entries.add(message);
 
                 deliverToSubscription(exclusiveSub, entry);
-
-                // where there is more than one producer there's a reasonable chance that even though there is
-                // no "queueing" we do not deliver because we get an interleving of _entries.add and
-                // deliverToSubscription between threads.  Therefore have one more try. 
-                if (!(entry.isAcquired() || entry.isDeleted()))
-                {
-                    deliverToSubscription(exclusiveSub, entry);
-                }
             }
             finally
             {
@@ -445,19 +529,18 @@
             }
         }
 
-        if (entry.immediateAndNotDelivered())
-        {
-            dequeue(storeContext, entry);
-            entry.dispose(storeContext);
-        }
-        else if (!(entry.isAcquired() || entry.isDeleted()))
+
+        if (!(entry.isAcquired() || entry.isDeleted()))
         {
             checkSubscriptionsNotAheadOfDelivery(entry);
 
             deliverAsync();
         }
 
-        _managedObject.checkForNotification(entry.getMessage());
+        if(_managedObject != null)
+        {
+            _managedObject.checkForNotification(entry.getMessage());
+        }
 
         return entry;
     }
@@ -474,17 +557,15 @@
             {
                 if (!sub.wouldSuspend(entry))
                 {
-                    if (!sub.isBrowser() && !entry.acquire(sub))
+                    if (sub.acquires() && !entry.acquire(sub))
                     {
                         // restore credit here that would have been taken away by wouldSuspend since we didn't manage
                         // to acquire the entry for this subscription
-                        sub.restoreCredit(entry);
+                        sub.onDequeue(entry);
                     }
                     else
                     {
-
                         deliverMessage(sub, entry);
-
                     }
                 }
             }
@@ -501,7 +582,7 @@
         // Simple Queues don't :-)
     }
 
-    private void incrementQueueSize(final AMQMessage message)
+    private void incrementQueueSize(final ServerMessage message)
     {
         getAtomicQueueSize().addAndGet(message.getSize());
     }
@@ -515,76 +596,48 @@
             throws AMQException
     {
         _deliveredMessages.incrementAndGet();
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug(sub + ": deliverMessage: " + entry.debugIdentity());
-        }
         sub.send(entry);
+
+        setLastSeenEntry(sub,entry);
     }
 
-    private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry)
+    private boolean subscriptionReadyAndHasInterest(final Subscription sub, final QueueEntry entry) throws AMQException
     {
+        return sub.hasInterest(entry) && (getNextAvailableEntry(sub) == entry);
+    }
 
-        // We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
-        // interest in.
-        QueueEntry node = sub.getLastSeenEntry();
-        while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
-        {
-
-            QueueEntry newNode = _entries.next(node);
-            if (newNode != null)
-            {
-                sub.setLastSeenEntry(node, newNode);
-                node = sub.getLastSeenEntry();
-            }
-            else
-            {
-                node = null;
-                break;
-            }
 
-        }
+    private void setLastSeenEntry(final Subscription sub, final QueueEntry entry)
+    {
+        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        QueueEntry releasedEntry = subContext._releasedEntry;
 
-        if (node == entry)
+        QueueContext._lastSeenUpdater.set(subContext, entry);
+        if(releasedEntry == entry)
         {
-            // If the first entry that subscription can process is the one we are trying to deliver to it, then we are
-            // good
-            return true;
-        }
-        else
-        {
-            // Otherwise we should try to update the subscription's last seen entry to the entry we got to, providing
-            // no-one else has updated it to something furhter on in the list
-            //TODO - check
-            //updateLastSeenEntry(sub, entry);
-            return false;
+           QueueContext._releasedUpdater.compareAndSet(subContext, releasedEntry, null);
         }
-
     }
 
-    private void updateLastSeenEntry(final Subscription sub, final QueueEntry entry)
+    private void updateSubRequeueEntry(final Subscription sub, final QueueEntry entry)
     {
-        QueueEntry node = sub.getLastSeenEntry();
 
-        if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+        QueueContext subContext = (QueueContext) sub.getQueueContext();
+        if(subContext != null)
         {
-            do
+            QueueEntry oldEntry;
+
+            while((oldEntry  = subContext._releasedEntry) == null || oldEntry.compareTo(entry) > 0)
             {
-                if (sub.setLastSeenEntry(node, entry))
-                {
-                    return;
-                }
-                else
+                if(QueueContext._releasedUpdater.compareAndSet(subContext, oldEntry, entry))
                 {
-                    node = sub.getLastSeenEntry();
+                    break;
                 }
             }
-            while (node != null && entry.compareTo(node) < 0);
         }
-
     }
 
-    public void requeue(StoreContext storeContext, QueueEntry entry) throws AMQException
+    public void requeue(QueueEntry entry)
     {
 
         SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
@@ -594,9 +647,9 @@
             Subscription sub = subscriberIter.getNode().getSubscription();
 
             // we don't make browsers send the same stuff twice
-            if (!sub.isBrowser())
+            if (sub.seesRequeues())
             {
-                updateLastSeenEntry(sub, entry);
+                updateSubRequeueEntry(sub, entry);
             }
         }
 
@@ -604,36 +657,31 @@
 
     }
 
-    public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
+    public void requeue(QueueEntryImpl entry, Subscription subscription)
     {
-        decrementQueueCount();
-        decrementQueueSize(entry);
-        if (entry.acquiredBySubscription())
+        SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
+        // iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
+        while (subscriberIter.advance())
         {
-            _deliveredMessages.decrementAndGet();
-        }
+            Subscription sub = subscriberIter.getNode().getSubscription();
 
-        try
-        {
-            AMQMessage msg = entry.getMessage();
-            if (msg.isPersistent())
+            // we don't make browsers send the same stuff twice
+            if (sub.seesRequeues() && (!sub.acquires() && sub == subscription))
             {
-                _virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
+                updateSubRequeueEntry(sub, entry);
             }
-            //entry.dispose(storeContext);
-
-        }
-        catch (MessageCleanupException e)
-        {
-            // Message was dequeued, but could not then be deleted
-            // though it is no longer referenced. This should be very
-            // rare and can be detected and cleaned up on recovery or
-            // done through some form of manual intervention.
-            _logger.error(e, e);
         }
-        catch (AMQException e)
+
+        deliverAsync();
+    }
+
+    public void dequeue(QueueEntry entry)
+    {
+        decrementQueueCount();
+        decrementQueueSize(entry);
+        if (entry.acquiredBySubscription())
         {
-            throw new FailedDequeueException(_name.toString(), e);
+            _deliveredMessages.decrementAndGet();
         }
 
         checkCapacity();
@@ -811,7 +859,7 @@
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                final long messageId = entry.getMessage().getMessageNumber();
                 return messageId >= fromMessageId && messageId <= toMessageId;
             }
 
@@ -830,7 +878,7 @@
 
             public boolean accept(QueueEntry entry)
             {
-                _complete = entry.getMessage().getMessageId() == messageId;
+                _complete = entry.getMessage().getMessageNumber() == messageId;
                 return _complete;
             }
 
@@ -872,7 +920,7 @@
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
             private long position = 0;
-            
+
             public boolean accept(QueueEntry entry)
             {
                 position++;
@@ -884,25 +932,25 @@
                 return position >= toPosition;
             }
         });
-        
+
         return entries;
     }
 
     public void moveMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
                                            String queueName,
-                                           StoreContext storeContext)
+                                           ServerTransaction txn)
     {
 
-        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        MessageStore store = getVirtualHost().getMessageStore();
+        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
+
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                final long messageId = entry.getMessage().getMessageNumber();
                 return (messageId >= fromMessageId)
                        && (messageId <= toMessageId)
                        && entry.acquire();
@@ -914,61 +962,48 @@
             }
         });
 
-        try
+
+
+        // Move the messages in on the message store.
+        for (final QueueEntry entry : entries)
         {
-            store.beginTran(storeContext);
+            final ServerMessage message = entry.getMessage();
+            txn.enqueue(toQueue, message,
+                        new ServerTransaction.Action()
+                        {
 
-            // Move the messages in on the message store.
-            for (QueueEntry entry : entries)
-            {
-                AMQMessage message = entry.getMessage();
+                            public void postCommit()
+                            {
+                                try
+                                {
+                                    toQueue.enqueue(message);
+                                }
+                                catch (AMQException e)
+                                {
+                                    throw new RuntimeException(e);
+                                }
+                            }
 
-                if (message.isPersistent() && toQueue.isDurable())
-                {
-                    store.enqueueMessage(storeContext, toQueue, message.getMessageId());
-                }
-                // dequeue does not decrement the refence count
-                entry.dequeue(storeContext);
-            }
+                            public void onRollback()
+                            {
+                                entry.release();
+                            }
+                        });
+            txn.dequeue(this, message,
+                        new ServerTransaction.Action()
+                        {
 
-            // Commit and flush the move transcations.
-            try
-            {
-                store.commitTran(storeContext);
-            }
-            catch (AMQException e)
-            {
-                throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-            }
-        }
-        catch (AMQException e)
-        {
-            try
-            {
-                store.abortTran(storeContext);
-            }
-            catch (AMQException rollbackEx)
-            {
-                _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
-            }
-            throw new RuntimeException(e);
-        }
+                            public void postCommit()
+                            {
+                                entry.discard();
+                            }
+
+                            public void onRollback()
+                            {
+
+                            }
+                        });
 
-        try
-        {
-            for (QueueEntry entry : entries)
-            {
-                toQueue.enqueue(storeContext, entry.getMessage());
-                entry.delete();
-            }
-        }
-        catch (MessageCleanupException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException(e);
         }
 
     }
@@ -976,27 +1011,18 @@
     public void copyMessagesToAnotherQueue(final long fromMessageId,
                                            final long toMessageId,
                                            String queueName,
-                                           final StoreContext storeContext)
+                                           final ServerTransaction txn)
     {
-        AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
-        MessageStore store = getVirtualHost().getMessageStore();
+        final AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
 
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
         {
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
-                if ((messageId >= fromMessageId)
-                    && (messageId <= toMessageId))
-                {
-                    if (!entry.isDeleted())
-                    {
-                        return entry.getMessage().incrementReference();
-                    }
-                }
-
-                return false;
+                final long messageId = entry.getMessage().getMessageNumber();
+                return ((messageId >= fromMessageId)
+                    && (messageId <= toMessageId));
             }
 
             public boolean filterComplete()
@@ -1005,98 +1031,69 @@
             }
         });
 
-        try
+
+        // Move the messages in on the message store.
+        for (QueueEntry entry : entries)
         {
-            store.beginTran(storeContext);
+            final ServerMessage message = entry.getMessage();
 
-            // Move the messages in on the message store.
-            for (QueueEntry entry : entries)
+            if (message.isPersistent() && toQueue.isDurable())
             {
-                AMQMessage message = entry.getMessage();
 
-                if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
-                {
-                    store.enqueueMessage(storeContext, toQueue, message.getMessageId());
-                }
-            }
+                txn.enqueue(toQueue, message, new ServerTransaction.Action()
+                    {
+                        public void postCommit()
+                        {
+                            try
+                            {
+                                toQueue.enqueue(message);
+                            }
+                            catch (AMQException e)
+                            {
+                                throw new RuntimeException(e);
+                            }
+                        }
 
-            // Commit and flush the move transcations.
-            try
-            {
-                store.commitTran(storeContext);
-            }
-            catch (AMQException e)
-            {
-                throw new RuntimeException("Failed to commit transaction whilst moving messages on message store.", e);
-            }
-        }
-        catch (AMQException e)
-        {
-            try
-            {
-                store.abortTran(storeContext);
-            }
-            catch (AMQException rollbackEx)
-            {
-                _logger.error("Failed to rollback transaction when error occured moving messages", rollbackEx);
-            }
-            throw new RuntimeException(e);
-        }
+                        public void onRollback()
+                        {
+
+                        }
+                    });
 
-        try
-        {
-            for (QueueEntry entry : entries)
-            {
-                if (entry.getMessage().isReferenced())
-                {
-                    toQueue.enqueue(storeContext, entry.getMessage());
-                }
             }
         }
-        catch (MessageCleanupException e)
-        {
-            throw new RuntimeException(e);
-        }
-        catch (AMQException e)
-        {
-            throw new RuntimeException(e);
-        }
 
     }
 
-    public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
+    public void removeMessagesFromQueue(long fromMessageId, long toMessageId)
     {
 
-        try
+        QueueEntryIterator queueListIterator = _entries.iterator();
+
+        while (queueListIterator.advance())
         {
-            QueueEntryIterator queueListIterator = _entries.iterator();
+            QueueEntry node = queueListIterator.getNode();
 
-            while (queueListIterator.advance())
+            final ServerMessage message = node.getMessage();
+            if(message != null)
             {
-                QueueEntry node = queueListIterator.getNode();
-
-                final long messageId = node.getMessage().getMessageId();
+                final long messageId = message.getMessageNumber();
 
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId)
                     && !node.isDeleted()
                     && node.acquire())
                 {
-                    node.discard(storeContext);
+                    dequeueEntry(node);
                 }
-
             }
         }
-        catch (AMQException e)
-        {
-            throw new RuntimeException(e);
-        }
 
     }
 
     // ------ Management functions
 
-    public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
+    public void deleteMessageFromTop()
     {
         QueueEntryIterator queueListIterator = _entries.iterator();
         boolean noDeletes = true;
@@ -1106,33 +1103,62 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.acquire())
             {
-                node.discard(storeContext);
+                dequeueEntry(node);
                 noDeletes = false;
             }
 
         }
     }
 
-    public long clearQueue(StoreContext storeContext) throws AMQException
+    public long clearQueue()
     {
 
         QueueEntryIterator queueListIterator = _entries.iterator();
         long count = 0;
 
+        ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+
         while (queueListIterator.advance())
         {
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.acquire())
             {
-                node.discard(storeContext);
+                dequeueEntry(node, txn);
                 count++;
             }
 
         }
+
+        txn.commit();
+
         return count;
 
     }
 
+    private void dequeueEntry(final QueueEntry node)
+    {
+        ServerTransaction txn = new AutoCommitTransaction(getVirtualHost().getTransactionLog());
+        dequeueEntry(node, txn);
+    }
+
+    private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
+    {
+        txn.dequeue(this, node.getMessage(),
+                    new ServerTransaction.Action()
+                    {
+
+                        public void postCommit()
+                        {
+                            node.discard();
+                        }
+
+                        public void onRollback()
+                        {
+
+                        }
+                    });
+    }
+
     public void addQueueDeleteTask(final Task task)
     {
         _deleteTaskList.add(task);
@@ -1157,7 +1183,111 @@
             _bindings.deregister();
             _virtualHost.getQueueRegistry().unregisterQueue(_name);
 
-            _managedObject.unregister();
+            List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
+            {
+
+                public boolean accept(QueueEntry entry)
+                {
+                    return entry.acquire();
+                }
+
+                public boolean filterComplete()
+                {
+                    return false;
+                }
+            });
+
+            ServerTransaction txn = new LocalTransaction(getVirtualHost().getTransactionLog());
+
+            if(_alternateExchange != null)
+            {
+
+                InboundMessageAdapter adapter = new InboundMessageAdapter();
+                for(final QueueEntry entry : entries)
+                {
+                    adapter.setEntry(entry);
+                    final List<AMQQueue> rerouteQueues = _alternateExchange.route(adapter);
+                    final ServerMessage message = entry.getMessage();
+                    if(rerouteQueues != null & rerouteQueues.size() != 0)
+                    {
+                        txn.enqueue(rerouteQueues, entry.getMessage(),
+                                    new ServerTransaction.Action()
+                                    {
+
+                                        public void postCommit()
+                                        {
+                                            try
+                                            {
+                                                for(AMQQueue queue : rerouteQueues)
+                                                {
+                                                    QueueEntry entry = queue.enqueue(message);
+                                                }
+                                            }
+                                            catch (AMQException e)
+                                            {
+                                                throw new RuntimeException(e);
+                                            }
+
+                                        }
+
+                                        public void onRollback()
+                                        {
+
+                                        }
+                                    });
+                        txn.dequeue(this, entry.getMessage(),
+                                    new ServerTransaction.Action()
+                                    {
+
+                                        public void postCommit()
+                                        {
+                                            entry.discard();
+                                        }
+
+                                        public void onRollback()
+                                        {
+                                        }
+                                    });
+                    }
+
+                }
+
+                _alternateExchange.removeReference(this);
+            }
+            else
+            {
+                // TODO log discard
+
+                for(final QueueEntry entry : entries)
+                {
+                    final ServerMessage message = entry.getMessage();
+                    if(message != null)
+                    {
+                        txn.dequeue(this, message,
+                                    new ServerTransaction.Action()
+                                    {
+
+                                        public void postCommit()
+                                        {
+                                            entry.discard();
+                                        }
+
+                                        public void onRollback()
+                                        {
+                                        }
+                                    });
+                    }
+                }
+            }
+
+            txn.commit();
+
+
+            if(_managedObject!=null)
+            {
+                _managedObject.unregister();
+            }
+
             for (Task task : _deleteTaskList)
             {
                 task.doTask(this);
@@ -1165,7 +1295,7 @@
 
             _deleteTaskList.clear();
             stop();
-            
+
             //Log Queue Deletion
             CurrentActor.get().message(_logSubject, QueueMessages.QUE_1002());
 
@@ -1249,7 +1379,13 @@
 
     public void deliverAsync(Subscription sub)
     {
-        _asyncDelivery.execute(new SubFlushRunner(sub));
+        SubFlushRunner flusher = (SubFlushRunner) sub.get(SUB_FLUSH_RUNNER);
+        if(flusher == null)
+        {
+            flusher = new SubFlushRunner(sub);
+            sub.set(SUB_FLUSH_RUNNER, flusher);
+        }
+        _asyncDelivery.execute(flusher);
     }
 
 
@@ -1298,66 +1434,12 @@
         }
     }
 
-    private class SubFlushRunner implements ReadWriteRunnable
-    {
-        private final Subscription _sub;
-
-        public SubFlushRunner(Subscription sub)
-        {
-            _sub = sub;
-        }
-
-        public void run()
-        {
-
-            String originalName = Thread.currentThread().getName();
-            try{
-                Thread.currentThread().setName("SubFlushRunner-"+_sub);
-
-                boolean complete = false;
-                try
-                {
-                    CurrentActor.set(_sub.getLogActor());
-                    complete = flushSubscription(_sub, new Long(MAX_ASYNC_DELIVERIES));
-
-                }
-                catch (AMQException e)
-                {
-                    _logger.error(e);
-                }
-                finally
-                {
-                    CurrentActor.remove();
-                }
-                if (!complete && !_sub.isSuspended())
-                {
-                    _asyncDelivery.execute(this);
-                }
-            }
-            finally
-            {
-                Thread.currentThread().setName(originalName);
-            }
-
-        }
-
-        public boolean isRead()
-        {
-            return false;
-        }
-
-        public boolean isWrite()
-        {
-            return true;
-        }
-    }
-
     public void flushSubscription(Subscription sub) throws AMQException
     {
         flushSubscription(sub, Long.MAX_VALUE);
     }
 
-    public boolean flushSubscription(Subscription sub, Long iterations) throws AMQException
+    public boolean flushSubscription(Subscription sub, long iterations) throws AMQException
     {
         boolean atTail = false;
 
@@ -1371,8 +1453,8 @@
                 {
                     unregisterSubscription(sub);
 
-                    ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                    converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+                    sub.confirmAutoClose();
+
                 }
                 else if (!atTail)
                 {
@@ -1408,41 +1490,26 @@
     private boolean attemptDelivery(Subscription sub) throws AMQException
     {
         boolean atTail = false;
-        boolean advanced = false;
+
         boolean subActive = sub.isActive() && !sub.isSuspended();
         if (subActive)
         {
-            QueueEntry node = moveSubscriptionToNextNode(sub);
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug(sub + ": attempting Delivery: " + node.debugIdentity());
-            }
-            if (!(node.isAcquired() || node.isDeleted()))
+
+            QueueEntry node  = getNextAvailableEntry(sub);
+
+            if (node != null && !(node.isAcquired() || node.isDeleted()))
             {
                 if (sub.hasInterest(node))
                 {
                     if (!sub.wouldSuspend(node))
                     {
-                        if (!sub.isBrowser() && !node.acquire(sub))
+                        if (sub.acquires() && !node.acquire(sub))
                         {
-                            sub.restoreCredit(node);
+                            sub.onDequeue(node);
                         }
                         else
                         {
                             deliverMessage(sub, node);
-
-                            if (sub.isBrowser())
-                            {
-                                QueueEntry newNode = _entries.next(node);
-
-                                if (newNode != null)
-                                {
-                                    advanced = true;
-                                    sub.setLastSeenEntry(node, newNode);
-                                    node = sub.getLastSeenEntry();
-                                }
-                                
-                            }
                         }
 
                     }
@@ -1450,29 +1517,13 @@
                     {
                         //QPID-1187 - Treat the subscription as suspended for this message
                         // and wait for the message to be removed to continue delivery.
-                        
-                        // 2009-09-30 : MR : setting subActive = false only causes, this
-                        // particular delivery attempt to end. This is called from
-                        // flushSubscription and processQueue both of which attempt
-                        // delivery a number of times. Won't a bytes limited
-                        // subscriber with not enough credit for the next message
-                        // create a lot of new QELs? How about a browser that calls
-                        // this method LONG.MAX_LONG times! 
                         subActive = false;
-                        node.addStateChangeListener(new QueueEntryListener(sub, node));
-                    }
-                }
-                else
-                {
-                    // this subscription is not interested in this node so we can skip over it
-                    QueueEntry newNode = _entries.next(node);
-                    if (newNode != null)
-                    {
-                        sub.setLastSeenEntry(node, newNode);
+                        node.addStateChangeListener(new QueueEntryListener(sub));
                     }
                 }
+
             }
-            atTail = (_entries.next(node) == null) && !advanced;
+            atTail = (node == null) || (_entries.next(node) == null);
         }
         return atTail || !subActive;
     }
@@ -1484,46 +1535,58 @@
         {
             SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
             Subscription sub = subNode.getSubscription();
-            moveSubscriptionToNextNode(sub);
+            if(sub.acquires())
+            {
+                getNextAvailableEntry(sub);
+            }
+            else
+            {
+                // TODO
+            }
         }
     }
 
-    private QueueEntry moveSubscriptionToNextNode(final Subscription sub)
+    private QueueEntry getNextAvailableEntry(final Subscription sub)
             throws AMQException
     {
-        QueueEntry node = sub.getLastSeenEntry();
-
-        while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
+        QueueContext context = (QueueContext) sub.getQueueContext();
+        if(context != null)
         {
-            if (!node.isAcquired() && !node.isDeleted() && node.expired())
+            QueueEntry lastSeen = context._lastSeenEntry;
+            QueueEntry releasedNode = context._releasedEntry;
+
+            QueueEntry node = (releasedNode != null && lastSeen.compareTo(releasedNode)>=0) ? releasedNode : _entries.next(lastSeen);
+
+            boolean expired = false;
+            while (node != null && (node.isAcquired() || node.isDeleted() || (expired = node.expired()) || !sub.hasInterest(node)))
             {
-                if (node.acquire())
+                if (expired)
                 {
-                    final StoreContext reapingStoreContext = new StoreContext();
-                    node.discard(reapingStoreContext);
+                    expired = false;
+                    if (node.acquire())
+                    {
+                        dequeueEntry(node);
+                    }
                 }
-            }
-            QueueEntry newNode = _entries.next(node);
-            if (newNode != null)
-            {
-                sub.setLastSeenEntry(node, newNode);
-                node = sub.getLastSeenEntry();
-            }
-            else
-            {
-                break;
-            }
 
-        }
+                if(QueueContext._lastSeenUpdater.compareAndSet(context, lastSeen, node))
+                {
+                    QueueContext._releasedUpdater.compareAndSet(context, releasedNode, null);
+                }
 
-        if (_logger.isDebugEnabled())
+                lastSeen = context._lastSeenEntry;
+                releasedNode = context._releasedEntry;
+                node = (releasedNode != null && lastSeen.compareTo(releasedNode)>0) ? releasedNode : _entries.next(lastSeen);
+            }
+            return node;
+        }
+        else
         {
-            _logger.debug(sub + ": nextNode: " + (node == null ? "null" : node.debugIdentity()));
+            return null;
         }
-
-        return node;
     }
 
+
     private void processQueue(Runnable runner) throws AMQException
     {
         long stateChangeCount;
@@ -1563,7 +1626,10 @@
                 sub.getSendLock();
                 try
                 {
-                    done = attemptDelivery(sub);
+                    if (sub != null)
+                    {
+                        done = attemptDelivery(sub);
+                    }
                     if (done)
                     {
                         if (extraLoops == 0)
@@ -1573,8 +1639,7 @@
                             {
                                 unregisterSubscription(sub);
 
-                                ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
-                                converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+                                sub.confirmAutoClose();
                             }
                         }
                         else
@@ -1611,8 +1676,6 @@
     public void checkMessageStatus() throws AMQException
     {
 
-        final StoreContext storeContext = new StoreContext();
-
         QueueEntryIterator queueListIterator = _entries.iterator();
 
         while (queueListIterator.advance())
@@ -1620,11 +1683,14 @@
             QueueEntry node = queueListIterator.getNode();
             if (!node.isDeleted() && node.expired() && node.acquire())
             {
-                node.discard(storeContext);
+                dequeueEntry(node);
             }
             else
             {
-                _managedObject.checkForNotification(node.getMessage());
+                if(_managedObject!=null)
+                {
+                    _managedObject.checkForNotification(node.getMessage());
+                }
             }
         }
 
@@ -1748,23 +1814,22 @@
 
     private final class QueueEntryListener implements QueueEntry.StateChangeListener
     {
-        private final QueueEntry _entry;
+
         private final Subscription _sub;
 
-        public QueueEntryListener(final Subscription sub, final QueueEntry entry)
+        public QueueEntryListener(final Subscription sub)
         {
-            _entry = entry;
             _sub = sub;
         }
 
         public boolean equals(Object o)
         {
-            return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub;
+            return _sub == ((QueueEntryListener) o)._sub;
         }
 
         public int hashCode()
         {
-            return System.identityHashCode(_entry) ^ System.identityHashCode(_sub);
+            return System.identityHashCode(_sub);
         }
 
         public void stateChanged(QueueEntry entry, QueueEntry.State oldSate, QueueEntry.State newState)
@@ -1791,11 +1856,22 @@
         for (int i = 0; i < num && !it.atTail(); i++)
         {
             it.advance();
-            ids.add(it.getNode().getMessage().getMessageId());
+            ids.add(it.getNode().getMessage().getMessageNumber());
         }
         return ids;
     }
 
+    public Object getExclusiveOwner()
+    {
+        return _exclusiveOwner;
+    }
+
+    public void setExclusiveOwner(Object exclusiveOwner)
+    {
+        _exclusiveOwner = exclusiveOwner;
+    }
+
+
     public void configure(QueueConfiguration config)
     {
         if (config != null)
@@ -1809,4 +1885,16 @@
             _flowResumeCapacity = config.getFlowResumeCapacity();
         }
     }
+
+    public String getResourceName()
+    {
+        return _resourceName;
+    }
+
+
+    @Override
+    public String toString()
+    {
+        return String.valueOf(getName());
+    }
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java Sun Oct 25 22:58:57 2009
@@ -1,6 +1,10 @@
 package org.apache.qpid.server.queue;
 
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.ServerMessage;
+
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import java.util.concurrent.atomic.AtomicLong;
 
 /*
 *
@@ -24,6 +28,7 @@
 */
 public class SimpleQueueEntryList implements QueueEntryList
 {
+
     private final QueueEntryImpl _head;
 
     private volatile QueueEntryImpl _tail;
@@ -40,9 +45,7 @@
                 _nextUpdater =
             AtomicReferenceFieldUpdater.newUpdater
             (QueueEntryImpl.class, QueueEntryImpl.class, "_next");
-
-
-
+    private AtomicLong _deletes = new AtomicLong(0L);
 
 
     public SimpleQueueEntryList(AMQQueue queue)
@@ -52,21 +55,77 @@
         _tail = _head;
     }
 
+    
+
     void advanceHead()
     {
+        _deletes.incrementAndGet();
         QueueEntryImpl head = _head.nextNode();
+        boolean deleted = head.isDeleted();
         while(head._next != null && head.isDeleted())
         {
 
+            deleted = true;
             final QueueEntryImpl newhead = head.nextNode();
             if(newhead != null)
             {
-                _nextUpdater.compareAndSet(_head,head, newhead);
+                if(_nextUpdater.compareAndSet(_head,head, newhead))
+                {
+                    _deletes.decrementAndGet();
+                }
             }
             head = _head.nextNode();
         }
+
+        if(!deleted)
+        {
+            deleted = true;
+        }
+
+        if(_deletes.get() > 1000L)
+        {
+            _deletes.set(0L);
+            scavenge();
+        }
     }
 
+    void scavenge()
+    {
+        QueueEntryImpl root = _head;
+        QueueEntryImpl next = root.nextNode();
+
+        do
+        {
+
+
+            while(next._next != null && next.isDeleted())
+            {
+
+                final QueueEntryImpl newhead = next.nextNode();
+                if(newhead != null)
+                {
+                    _nextUpdater.compareAndSet(root,next, newhead);
+                }
+                next = root.nextNode();
+            }
+            if(next._next != null)
+            {
+                if(!next.isDeleted())
+                {
+                    root = next;
+                    next = root.nextNode();
+                }
+            }
+            else
+            {
+                break;
+            }
+
+        } while (next != null && next._next != null);
+
+    }
+
+
 
     public AMQQueue getQueue()
     {
@@ -74,7 +133,7 @@
     }
 
 
-    public QueueEntry add(AMQMessage message)
+    public QueueEntry add(ServerMessage message)
     {
         QueueEntryImpl node = new QueueEntryImpl(this, message);
         for (;;)

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java Sun Oct 25 22:58:57 2009
@@ -35,8 +35,8 @@
 import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager;
 import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager;
-import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 
 import java.io.File;
 
@@ -51,15 +51,15 @@
 
     public void initialise(int instanceID) throws Exception
     {
-        _rootMessageLogger = new RootMessageLoggerImpl(_configuration, 
+        _rootMessageLogger = new RootMessageLoggerImpl(_configuration,
                                                        new Log4jMessageLogger());
-        
+
         _registryName = String.valueOf(instanceID);
 
         // Set the Actor for current log messages
         CurrentActor.set(new BrokerActor(_registryName, _rootMessageLogger));
 
-        CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));        
+        CurrentActor.get().message(BrokerMessages.BRK_1001(QpidProperties.getReleaseVersion(),QpidProperties.getBuildVersion()));
 
         initialiseManagedObjectRegistry();
 
@@ -68,7 +68,7 @@
         _pluginManager = new PluginManager(_configuration.getPluginDirectory());
 
         _accessManager = new ACLManager(_configuration.getSecurityConfiguration(), _pluginManager);
-        
+
         _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration);
 
         _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null);
@@ -99,10 +99,10 @@
     }
 
     private void initialiseVirtualHosts() throws Exception
-    {        
+    {
         for (String name : _configuration.getVirtualHosts())
         {
-            _virtualHostRegistry.registerVirtualHost(new VirtualHost(_configuration.getVirtualHostConfig(name)));
+            _virtualHostRegistry.registerVirtualHost(new VirtualHostImpl(_configuration.getVirtualHostConfig(name)));
         }
         getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java Sun Oct 25 22:58:57 2009
@@ -14,9 +14,9 @@
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.    
+ *  under the License.
+ *
  *
- * 
  */
 package org.apache.qpid.server.security.access;
 
@@ -32,14 +32,11 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.configuration.SecurityConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
-import org.apache.qpid.server.security.access.plugins.SimpleXML;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ACLManager
@@ -78,7 +75,7 @@
     {
         _hostPlugins = configurePlugins(hostConfig);
     }
-    
+
     public Map<String, ACLPlugin> configurePlugins(SecurityConfiguration hostConfig) throws ConfigurationException
     {
         Configuration securityConfig = hostConfig.getConfiguration();
@@ -108,7 +105,7 @@
             }
         }
         return plugins;
-    }    
+    }
 
     public static Logger getLogger()
     {
@@ -131,18 +128,18 @@
             if (result == AuthzResult.DENIED)
             {
                 // Something vetoed the access, we're done
-                return false; 
+                return false;
             }
             else if (result == AuthzResult.ALLOWED)
             {
-                // Remove plugin from global check list since 
+                // Remove plugin from global check list since
                 // host allow overrides global allow
                 remainingPlugins.remove(plugin.getKey());
             }
         }
-        
+
         for (ACLPlugin plugin : remainingPlugins.values())
-        {   
+        {
             result = checker.allowed(plugin);
             if (result == AuthzResult.DENIED)
             {
@@ -152,7 +149,7 @@
         return true;
     }
 
-    public boolean authoriseBind(final AMQProtocolSession session, final Exchange exch, final AMQQueue queue,
+    public boolean authoriseBind(final PrincipalHolder session, final Exchange exch, final AMQQueue queue,
             final AMQShortString routingKey)
     {
         return checkAllPlugins(new AccessCheck()
@@ -167,7 +164,7 @@
         });
     }
 
-    public boolean authoriseConnect(final AMQProtocolSession session, final VirtualHost virtualHost)
+    public boolean authoriseConnect(final PrincipalHolder session, final VirtualHost virtualHost)
     {
         return checkAllPlugins(new AccessCheck()
         {
@@ -181,7 +178,7 @@
         });
     }
 
-    public boolean authoriseConsume(final AMQProtocolSession session, final boolean noAck, final AMQQueue queue)
+    public boolean authoriseConsume(final PrincipalHolder session, final boolean noAck, final AMQQueue queue)
     {
         return checkAllPlugins(new AccessCheck()
         {
@@ -195,7 +192,7 @@
         });
     }
 
-    public boolean authoriseConsume(final AMQProtocolSession session, final boolean exclusive, final boolean noAck,
+    public boolean authoriseConsume(final PrincipalHolder session, final boolean exclusive, final boolean noAck,
             final boolean noLocal, final boolean nowait, final AMQQueue queue)
     {
         return checkAllPlugins(new AccessCheck()
@@ -210,7 +207,7 @@
         });
     }
 
-    public boolean authoriseCreateExchange(final AMQProtocolSession session, final boolean autoDelete,
+    public boolean authoriseCreateExchange(final PrincipalHolder session, final boolean autoDelete,
             final boolean durable, final AMQShortString exchangeName, final boolean internal, final boolean nowait,
             final boolean passive, final AMQShortString exchangeType)
     {
@@ -227,7 +224,7 @@
         });
     }
 
-    public boolean authoriseCreateQueue(final AMQProtocolSession session, final boolean autoDelete,
+    public boolean authoriseCreateQueue(final PrincipalHolder session, final boolean autoDelete,
             final boolean durable, final boolean exclusive, final boolean nowait, final boolean passive,
             final AMQShortString queue)
     {
@@ -243,7 +240,7 @@
         });
     }
 
-    public boolean authoriseDelete(final AMQProtocolSession session, final AMQQueue queue)
+    public boolean authoriseDelete(final PrincipalHolder session, final AMQQueue queue)
     {
         return checkAllPlugins(new AccessCheck()
         {
@@ -257,7 +254,7 @@
         });
     }
 
-    public boolean authoriseDelete(final AMQProtocolSession session, final Exchange exchange)
+    public boolean authoriseDelete(final PrincipalHolder session, final Exchange exchange)
     {
         return checkAllPlugins(new AccessCheck()
         {
@@ -270,8 +267,8 @@
 
         });
     }
-    
-    public boolean authorisePublish(final AMQProtocolSession session, final boolean immediate, final boolean mandatory,
+
+    public boolean authorisePublish(final PrincipalHolder session, final boolean immediate, final boolean mandatory,
             final AMQShortString routingKey, final Exchange e)
     {
         return checkAllPlugins(new AccessCheck()
@@ -286,7 +283,7 @@
         });
     }
 
-    public boolean authorisePurge(final AMQProtocolSession session, final AMQQueue queue)
+    public boolean authorisePurge(final PrincipalHolder session, final AMQQueue queue)
     {
         return checkAllPlugins(new AccessCheck()
         {
@@ -300,7 +297,7 @@
         });
     }
 
-    public boolean authoriseUnbind(final AMQProtocolSession session, final Exchange exch,
+    public boolean authoriseUnbind(final PrincipalHolder session, final Exchange exch,
             final AMQShortString routingKey, final AMQQueue queue)
     {
         return checkAllPlugins(new AccessCheck()

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java Sun Oct 25 22:58:57 2009
@@ -24,9 +24,9 @@
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.security.PrincipalHolder;
 
 public interface ACLPlugin
 {
@@ -34,37 +34,37 @@
     {
         ALLOWED,
         DENIED,
-        ABSTAIN        
+        ABSTAIN
     }
 
     void setConfiguration(Configuration config) throws ConfigurationException;
 
-    // These return true if the plugin thinks the action should be allowed, and false if not. 
-    
-    AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey);
+    // These return true if the plugin thinks the action should be allowed, and false if not.
 
-    AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable,
+    AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey);
+
+    AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
             AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType);
 
-    AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive,
+    AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive,
             boolean nowait, boolean passive, AMQShortString queue);
 
-    AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost);
+    AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost);
 
-    AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue);
+    AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue);
 
-    AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal,
+    AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal,
             boolean nowait, AMQQueue queue);
 
-    AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue);
+    AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue);
 
-    AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange);
+    AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange);
 
-    AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory,
+    AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory,
             AMQShortString routingKey, Exchange e);
 
-    AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue);
+    AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue);
 
-    AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue);
+    AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue);
 
 }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java Sun Oct 25 22:58:57 2009
@@ -425,8 +425,9 @@
                 // This will allow consumption from any temporary queue including ones not owned by this user.
                 // Of course the exclusivity will not be broken.
                 {
+
                     // if not limited to ownQueuesOnly then ok else check queue Owner.
-                    return (!ownQueuesOnly || queue.getOwner().equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
+                    return (!ownQueuesOnly || new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user)) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
                 }
             }
             //if this is a temporary queue and the user does not have permissions for temporary queues then deny
@@ -441,7 +442,7 @@
                 // if no queues are listed then ALL are ok othereise it must be specified.
                 if (ownQueuesOnly)
                 {
-                    if (queue.getOwner().equals(_user))
+                    if ( new AMQShortString(queue.getPrincipalHolder().getPrincipal().getName()).equals(_user))
                     {
                         return (queues.size() == 0 || queues.contains(queue.getName())) ? AuthzResult.ALLOWED : AuthzResult.DENIED;
                     }

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java Sun Oct 25 22:58:57 2009
@@ -22,76 +22,76 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.security.PrincipalHolder;
 
 /**
- * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations. 
+ * This ACLPlugin abstains from all votes. Useful if your plugin only cares about a few operations.
  */
 public abstract class AbstractACLPlugin implements ACLPlugin
 {
 
     private static final AuthzResult DEFAULT_ANSWER = AuthzResult.ABSTAIN;
 
-    public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue,
+    public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue,
             AMQShortString routingKey)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+    public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue)
+    public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal,
+    public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal,
             boolean nowait, AMQQueue queue)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable,
+    public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
             AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType)
     {
         // TODO Auto-generated method stub
         return null;
     }
 
-    public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable,
+    public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable,
             boolean exclusive, boolean nowait, boolean passive, AMQShortString queue)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+    public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory,
+    public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory,
             AMQShortString routingKey, Exchange e)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue)
     {
         return DEFAULT_ANSWER;
     }
 
-    public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey,
+    public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey,
             AMQQueue queue)
     {
         return DEFAULT_ANSWER;

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java Sun Oct 25 22:58:57 2009
@@ -21,46 +21,45 @@
 package org.apache.qpid.server.security.access.plugins;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.security.access.ACLPlugin;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public abstract class BasicACLPlugin implements ACLPlugin
 {
 
-    // Returns true or false if the plugin should authorise or deny the request    
+    // Returns true or false if the plugin should authorise or deny the request
     protected abstract AuthzResult getResult();
-    
-    public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch,
+
+    public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch,
             AMQQueue queue, AMQShortString routingKey)
     {
         return getResult();
     }
 
-    public AuthzResult authoriseConnect(AMQProtocolSession session,
+    public AuthzResult authoriseConnect(PrincipalHolder session,
             VirtualHost virtualHost)
     {
         return getResult();
     }
 
-    public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck,
+    public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck,
             AMQQueue queue)
     {
-        return getResult();    
+        return getResult();
     }
 
-    public AuthzResult authoriseConsume(AMQProtocolSession session,
+    public AuthzResult authoriseConsume(PrincipalHolder session,
             boolean exclusive, boolean noAck, boolean noLocal, boolean nowait,
             AMQQueue queue)
     {
         return getResult();
     }
 
-    public AuthzResult authoriseCreateExchange(AMQProtocolSession session,
+    public AuthzResult authoriseCreateExchange(PrincipalHolder session,
             boolean autoDelete, boolean durable, AMQShortString exchangeName,
             boolean internal, boolean nowait, boolean passive,
             AMQShortString exchangeType)
@@ -68,36 +67,36 @@
         return getResult();
     }
 
-    public AuthzResult authoriseCreateQueue(AMQProtocolSession session,
+    public AuthzResult authoriseCreateQueue(PrincipalHolder session,
             boolean autoDelete, boolean durable, boolean exclusive,
             boolean nowait, boolean passive, AMQShortString queue)
     {
         return getResult();
     }
 
-    public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
     {
         return getResult();
     }
 
-    public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+    public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
     {
         return getResult();
     }
 
-    public AuthzResult authorisePublish(AMQProtocolSession session,
+    public AuthzResult authorisePublish(PrincipalHolder session,
             boolean immediate, boolean mandatory, AMQShortString routingKey,
             Exchange e)
     {
         return getResult();
     }
 
-    public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue)
     {
         return getResult();
     }
 
-    public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch,
+    public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch,
             AMQShortString routingKey, AMQQueue queue)
     {
         return getResult();

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java Sun Oct 25 22:58:57 2009
@@ -54,7 +54,7 @@
         if (ACLManager.getLogger().isInfoEnabled())
         {
             ACLManager.getLogger().info(
-                    "Denying user:" + session.getAuthorizedID());
+                    "Denying user:" + session.getPrincipal());
         }
         throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
                 "DenyAll Plugin");

Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java?rev=829675&r1=829674&r2=829675&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java Sun Oct 25 22:58:57 2009
@@ -22,24 +22,16 @@
 package org.apache.qpid.server.security.access.plugins;
 
 import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicPublishBody;
 
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.security.access.ACLManager;
 import org.apache.qpid.server.security.access.ACLPlugin;
 import org.apache.qpid.server.security.access.ACLPluginFactory;
 import org.apache.qpid.server.security.access.AccessResult;
 import org.apache.qpid.server.security.access.Permission;
 import org.apache.qpid.server.security.access.PrincipalPermissions;
-import org.apache.qpid.server.security.access.ACLPlugin.AuthzResult;
+import org.apache.qpid.server.security.PrincipalHolder;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -64,7 +56,7 @@
             return plugin;
         }
     };
-    
+
     private Map<String, PrincipalPermissions> _users;
     private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED);
 
@@ -110,7 +102,7 @@
     
     /**
      * Publish format takes Exchange + Routing Key Pairs
-     * 
+     *
      * @param config
      *            XML Configuration
      */
@@ -349,9 +341,9 @@
         return "Simple";
     }
 
-    public AuthzResult authoriseBind(AMQProtocolSession session, Exchange exch, AMQQueue queue, AMQShortString routingKey)
+    public AuthzResult authoriseBind(PrincipalHolder session, Exchange exch, AMQQueue queue, AMQShortString routingKey)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -362,9 +354,9 @@
         }
     }
 
-    public AuthzResult authoriseConnect(AMQProtocolSession session, VirtualHost virtualHost)
+    public AuthzResult authoriseConnect(PrincipalHolder session, VirtualHost virtualHost)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -375,9 +367,9 @@
         }
     }
 
-    public AuthzResult authoriseConsume(AMQProtocolSession session, boolean noAck, AMQQueue queue)
+    public AuthzResult authoriseConsume(PrincipalHolder session, boolean noAck, AMQQueue queue)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -388,16 +380,16 @@
         }
     }
 
-    public AuthzResult authoriseConsume(AMQProtocolSession session, boolean exclusive, boolean noAck, boolean noLocal,
+    public AuthzResult authoriseConsume(PrincipalHolder session, boolean exclusive, boolean noAck, boolean noLocal,
             boolean nowait, AMQQueue queue)
     {
         return authoriseConsume(session, noAck, queue);
     }
 
-    public AuthzResult authoriseCreateExchange(AMQProtocolSession session, boolean autoDelete, boolean durable,
+    public AuthzResult authoriseCreateExchange(PrincipalHolder session, boolean autoDelete, boolean durable,
             AMQShortString exchangeName, boolean internal, boolean nowait, boolean passive, AMQShortString exchangeType)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -408,10 +400,10 @@
         }
     }
 
-    public AuthzResult authoriseCreateQueue(AMQProtocolSession session, boolean autoDelete, boolean durable, boolean exclusive,
+    public AuthzResult authoriseCreateQueue(PrincipalHolder session, boolean autoDelete, boolean durable, boolean exclusive,
             boolean nowait, boolean passive, AMQShortString queue)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -422,9 +414,9 @@
         }
     }
 
-    public AuthzResult authoriseDelete(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authoriseDelete(PrincipalHolder session, AMQQueue queue)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -435,9 +427,9 @@
         }
     }
 
-    public AuthzResult authoriseDelete(AMQProtocolSession session, Exchange exchange)
+    public AuthzResult authoriseDelete(PrincipalHolder session, Exchange exchange)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -448,10 +440,10 @@
         }
     }
 
-    public AuthzResult authorisePublish(AMQProtocolSession session, boolean immediate, boolean mandatory,
+    public AuthzResult authorisePublish(PrincipalHolder session, boolean immediate, boolean mandatory,
             AMQShortString routingKey, Exchange e)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -462,9 +454,9 @@
         }
     }
 
-    public AuthzResult authorisePurge(AMQProtocolSession session, AMQQueue queue)
+    public AuthzResult authorisePurge(PrincipalHolder session, AMQQueue queue)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;
@@ -475,9 +467,9 @@
         }
     }
 
-    public AuthzResult authoriseUnbind(AMQProtocolSession session, Exchange exch, AMQShortString routingKey, AMQQueue queue)
+    public AuthzResult authoriseUnbind(PrincipalHolder session, Exchange exch, AMQShortString routingKey, AMQQueue queue)
     {
-        PrincipalPermissions principalPermissions = _users.get(session.getAuthorizedID().getName());
+        PrincipalPermissions principalPermissions = _users.get(session.getPrincipal().getName());
         if (principalPermissions == null)
         {
             return AuthzResult.DENIED;



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message