qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rgodf...@apache.org
Subject svn commit: r1769383 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/consumer/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/protocol/ broker-core/src/main/java/org/apac...
Date Sat, 12 Nov 2016 14:45:53 GMT
Author: rgodfrey
Date: Sat Nov 12 14:45:53 2016
New Revision: 1769383

URL: http://svn.apache.org/viewvc?rev=1769383&view=rev
Log:
QPID-7506 : Refactor queue statistics accounting

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java   (with props)
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java   (with props)
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Sat Nov 12 14:45:53 2016
@@ -62,7 +62,7 @@ public interface ConsumerTarget
 
     long getUnacknowledgedMessages();
 
-    AMQSessionModel getSessionModel();
+    AMQSessionModel<?> getSessionModel();
 
     long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Sat Nov 12 14:45:53 2016
@@ -366,12 +366,6 @@ public interface Queue<X extends Queue<X
 
     void requeue(QueueEntry entry);
 
-    void dequeue(QueueEntry entry);
-
-    void decrementUnackedMsgCount(QueueEntry queueEntry);
-
-    void incrementUnackedMsgCount(QueueEntry entry);
-
     boolean resend(QueueEntry entry, QueueConsumer<?> consumer);
 
     List<? extends QueueEntry> getMessagesOnTheQueue();
@@ -413,4 +407,6 @@ public interface Queue<X extends Queue<X
     long getPotentialMemoryFootprint();
 
     boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
+
+    void checkCapacity();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java Sat Nov 12 14:45:53 2016
@@ -22,5 +22,5 @@ package org.apache.qpid.server.protocol;
 
 public interface CapacityChecker
 {
-    void checkCapacity(AMQSessionModel channel);
+    void checkCapacity(AMQSessionModel<?> channel);
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Sat Nov 12 14:45:53 2016
@@ -159,28 +159,11 @@ public abstract class AbstractQueue<X ex
 
 
 
-    private final AtomicInteger _atomicQueueCount = new AtomicInteger(0);
-
-    private final AtomicLong _atomicQueueSize = new AtomicLong(0L);
-
     private final AtomicLong _targetQueueSize = new AtomicLong(INITIAL_TARGET_QUEUE_SIZE);
 
     private final AtomicInteger _activeSubscriberCount = new AtomicInteger();
 
-    private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
-    private final AtomicLong _dequeueCount = new AtomicLong();
-    private final AtomicLong _dequeueSize = new AtomicLong();
-    private final AtomicLong _enqueueCount = new AtomicLong();
-    private final AtomicLong _enqueueSize = new AtomicLong();
-    private final AtomicLong _persistentMessageEnqueueSize = new AtomicLong();
-    private final AtomicLong _persistentMessageDequeueSize = new AtomicLong();
-    private final AtomicLong _persistentMessageEnqueueCount = new AtomicLong();
-    private final AtomicLong _persistentMessageDequeueCount = new AtomicLong();
-    private final AtomicLong _unackedMsgCount = new AtomicLong(0);
-    private final AtomicLong _unackedMsgBytes = new AtomicLong();
-
-    private final AtomicInteger _bindingCountHigh = new AtomicInteger();
+    private final QueueStatistics _queueStatistics = new QueueStatistics();
 
     /** max allowed size(KB) of a single message */
     @ManagedAttributeField( afterSet = "updateAlertChecks" )
@@ -228,7 +211,6 @@ public abstract class AbstractQueue<X ex
 
     private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
 
-    private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
     private final Set<AMQSessionModel> _blockedChannels = new ConcurrentSkipListSet<AMQSessionModel>();
@@ -366,7 +348,7 @@ public abstract class AbstractQueue<X ex
 
         Map<String,Object> attributes = getActualAttributes();
 
-        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<String, Object>(attributes);
+        final LinkedHashMap<String, Object> arguments = new LinkedHashMap<>(attributes);
 
         arguments.put(Queue.EXCLUSIVE, _exclusive);
         arguments.put(Queue.LIFETIME_POLICY, getLifetimePolicy());
@@ -1103,15 +1085,6 @@ public abstract class AbstractQueue<X ex
     public void addBinding(final Binding<?> binding)
     {
         _bindings.add(binding);
-        int bindingCount = _bindings.size();
-        int bindingCountHigh;
-        while(bindingCount > (bindingCountHigh = _bindingCountHigh.get()))
-        {
-            if(_bindingCountHigh.compareAndSet(bindingCountHigh, bindingCount))
-            {
-                break;
-            }
-        }
         childAdded(binding);
     }
 
@@ -1140,10 +1113,6 @@ public abstract class AbstractQueue<X ex
 
     public final void enqueue(ServerMessage message, Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-        incrementQueueCount();
-        incrementQueueSize(message);
-
-        _totalMessagesReceived.incrementAndGet();
 
         if(_recovering.get() != RECOVERED)
         {
@@ -1176,17 +1145,13 @@ public abstract class AbstractQueue<X ex
             doEnqueue(message, action, enqueueRecord);
         }
 
-        long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+        long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
         _flowToDiskChecker.flowToDiskAndReportIfNecessary(message.getStoredMessage(), estimatedQueueSize,
                                                           _targetQueueSize.get());
     }
 
     public final void recover(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
-        incrementQueueCount();
-        incrementQueueSize(message);
-
-        _totalMessagesReceived.incrementAndGet();
 
         doEnqueue(message, null, enqueueRecord);
     }
@@ -1275,7 +1240,7 @@ public abstract class AbstractQueue<X ex
             {
                 arrivalTime = System.currentTimeMillis();
             }
-            if(expiration != 0l)
+            if(expiration != 0L)
             {
                 long calculatedExpiration = arrivalTime+_minimumMessageTtl;
                 if(calculatedExpiration > expiration)
@@ -1285,14 +1250,14 @@ public abstract class AbstractQueue<X ex
                 }
             }
         }
-        if(_maximumMessageTtl != 0l)
+        if(_maximumMessageTtl != 0L)
         {
             if(arrivalTime == 0)
             {
                 arrivalTime = System.currentTimeMillis();
             }
             long calculatedExpiration = arrivalTime+_maximumMessageTtl;
-            if(expiration == 0l || expiration > calculatedExpiration)
+            if(expiration == 0L || expiration > calculatedExpiration)
             {
                 entry.setExpiration(calculatedExpiration);
             }
@@ -1443,19 +1408,6 @@ public abstract class AbstractQueue<X ex
         // Simple Queues don't :-)
     }
 
-    private void incrementQueueSize(final ServerMessage message)
-    {
-        long size = message.getSize();
-        getAtomicQueueSize().addAndGet(size);
-        _enqueueCount.incrementAndGet();
-        _enqueueSize.addAndGet(size);
-        if(message.isPersistent() && isDurable())
-        {
-            _persistentMessageEnqueueSize.addAndGet(size);
-            _persistentMessageEnqueueCount.incrementAndGet();
-        }
-    }
-
     @Override
     public void setTargetSize(final long targetSize)
     {
@@ -1467,17 +1419,12 @@ public abstract class AbstractQueue<X ex
 
     public long getTotalDequeuedMessages()
     {
-        return _dequeueCount.get();
+        return _queueStatistics.getDequeueCount();
     }
 
     public long getTotalEnqueuedMessages()
     {
-        return _enqueueCount.get();
-    }
-
-    private void incrementQueueCount()
-    {
-        getAtomicQueueCount().incrementAndGet();
+        return _queueStatistics.getEnqueueCount();
     }
 
     private void deliverMessage(final QueueConsumer<?> sub,
@@ -1490,8 +1437,6 @@ public abstract class AbstractQueue<X ex
             setLastSeenEntry(sub, entry);
         }
 
-        _deliveredMessages.incrementAndGet();
-
         sub.send(entry, batch);
     }
 
@@ -1548,39 +1493,6 @@ public abstract class AbstractQueue<X ex
 
     }
 
-    @Override
-    public void dequeue(QueueEntry entry)
-    {
-        decrementQueueCount();
-        decrementQueueSize(entry);
-        if (entry.acquiredByConsumer())
-        {
-            _deliveredMessages.decrementAndGet();
-        }
-
-        checkCapacity();
-
-    }
-
-    private void decrementQueueSize(final QueueEntry entry)
-    {
-        final ServerMessage message = entry.getMessage();
-        long size = message.getSize();
-        getAtomicQueueSize().addAndGet(-size);
-        _dequeueSize.addAndGet(size);
-        if(message.isPersistent() && isDurable())
-        {
-            _persistentMessageDequeueSize.addAndGet(size);
-            _persistentMessageDequeueCount.incrementAndGet();
-        }
-    }
-
-    void decrementQueueCount()
-    {
-        getAtomicQueueCount().decrementAndGet();
-        _dequeueCount.incrementAndGet();
-    }
-
     public boolean resend(final QueueEntry entry, final QueueConsumer<?> consumer)
     {
         /* TODO : This is wrong as the consumer may be suspended, we should instead change the state of the message
@@ -1630,38 +1542,20 @@ public abstract class AbstractQueue<X ex
     @Override
     public int getQueueDepthMessages()
     {
-        return getAtomicQueueCount().get();
+        return _queueStatistics.getQueueCount();
     }
 
     public long getQueueDepthBytes()
     {
-        return getAtomicQueueSize().get();
-    }
-
-    public int getUndeliveredMessageCount()
-    {
-        int count = getQueueDepthMessages() - _deliveredMessages.get();
-        if (count < 0)
-        {
-            return 0;
-        }
-        else
-        {
-            return count;
-        }
-    }
-
-    public long getReceivedMessageCount()
-    {
-        return _totalMessagesReceived.get();
+        return _queueStatistics.getQueueSize();
     }
 
     @Override
     public long getOldestMessageArrivalTime()
     {
-        long oldestMessageArrivalTime = -1l;
+        long oldestMessageArrivalTime = -1L;
 
-        while(oldestMessageArrivalTime == -1l)
+        while(oldestMessageArrivalTime == -1L)
         {
             QueueEntry entry = getEntries().getOldestEntry();
             if (entry != null)
@@ -1756,16 +1650,6 @@ public abstract class AbstractQueue<X ex
         return getName().compareTo(o.getName());
     }
 
-    public AtomicInteger getAtomicQueueCount()
-    {
-        return _atomicQueueCount;
-    }
-
-    public AtomicLong getAtomicQueueSize()
-    {
-        return _atomicQueueSize;
-    }
-
     private boolean hasExclusiveConsumer()
     {
         return _exclusiveSubscriber != null;
@@ -1784,6 +1668,11 @@ public abstract class AbstractQueue<X ex
     /** Used to track bindings to exchanges so that on deletion they can easily be cancelled. */
     abstract QueueEntryList getEntries();
 
+    final QueueStatistics getQueueStatistics()
+    {
+        return _queueStatistics;
+    }
+
     protected QueueConsumerList getConsumerList()
     {
         return _consumerList;
@@ -1794,33 +1683,14 @@ public abstract class AbstractQueue<X ex
         return _virtualHost.getEventLogger();
     }
 
-    public static interface QueueEntryFilter
+    public interface QueueEntryFilter
     {
-        public boolean accept(QueueEntry entry);
+        boolean accept(QueueEntry entry);
 
-        public boolean filterComplete();
+        boolean filterComplete();
     }
 
 
-
-    public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
-    {
-        return getMessagesOnTheQueue(new QueueEntryFilter()
-        {
-
-            public boolean accept(QueueEntry entry)
-            {
-                final long messageId = entry.getMessage().getMessageNumber();
-                return messageId >= fromMessageId && messageId <= toMessageId;
-            }
-
-            public boolean filterComplete()
-            {
-                return false;
-            }
-        });
-    }
-
     public QueueEntry getMessageOnTheQueue(final long messageId)
     {
         List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
@@ -2108,27 +1978,28 @@ public abstract class AbstractQueue<X ex
         _closing = false;
     }
 
-    public void checkCapacity(AMQSessionModel channel)
+    @Override
+    public void checkCapacity(AMQSessionModel<?> channel)
     {
-        if(_queueFlowControlSizeBytes != 0l)
+        if(_queueFlowControlSizeBytes != 0L)
         {
-            if(_atomicQueueSize.get() > _queueFlowControlSizeBytes)
+            if(_queueStatistics.getQueueSize() > _queueFlowControlSizeBytes)
             {
                 _overfull.set(true);
                 //Overfull log message
-                getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_atomicQueueSize.get(),
+                getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
                                                                              _queueFlowControlSizeBytes));
 
                 _blockedChannels.add(channel);
 
                 channel.block(this);
 
-                if(_atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+                if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
                 {
 
                     //Underfull log message
                     getEventLogger().message(_logSubject,
-                                             QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
+                                             QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
 
                    channel.unblock(this);
                    _blockedChannels.remove(channel);
@@ -2141,22 +2012,27 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    private void checkCapacity()
+    public void checkCapacity()
     {
-        if(_queueFlowControlSizeBytes != 0L)
+        if(getEntries() != null)
         {
-            if(_overfull.get() && _atomicQueueSize.get() <= _queueFlowResumeSizeBytes)
+            if (_queueFlowControlSizeBytes != 0L)
             {
-                if(_overfull.compareAndSet(true,false))
-                {//Underfull log message
-                    getEventLogger().message(_logSubject,
-                                             QueueMessages.UNDERFULL(_atomicQueueSize.get(), _queueFlowResumeSizeBytes));
-                }
-
-                for(final AMQSessionModel blockedChannel : _blockedChannels)
+                if (_overfull.get() && _queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
                 {
-                    blockedChannel.unblock(this);
-                    _blockedChannels.remove(blockedChannel);
+                    if (_overfull.compareAndSet(true, false))
+                    {
+                        //Underfull log message
+                        getEventLogger().message(_logSubject,
+                                                 QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(),
+                                                                         _queueFlowResumeSizeBytes));
+                    }
+
+                    for (final AMQSessionModel<?> blockedChannel : _blockedChannels)
+                    {
+                        blockedChannel.unblock(this);
+                        _blockedChannels.remove(blockedChannel);
+                    }
                 }
             }
         }
@@ -2588,7 +2464,7 @@ public abstract class AbstractQueue<X ex
     {
         QueueEntryIterator queueListIterator = getEntries().iterator();
 
-        final long estimatedQueueSize = _atomicQueueSize.get() + _atomicQueueCount.get() * _estimatedAverageMessageHeaderSize;
+        final long estimatedQueueSize = _queueStatistics.getQueueSize() + _queueStatistics.getQueueCount() * _estimatedAverageMessageHeaderSize;
         _flowToDiskChecker.reportFlowToDiskStatusIfNecessary(estimatedQueueSize, _targetQueueSize.get());
 
         final Set<NotificationCheck> perMessageChecks = new HashSet<>();
@@ -2904,8 +2780,7 @@ public abstract class AbstractQueue<X ex
 
             try
             {
-                 long foo = ByteStreams.copy(inputStream, outputStream);
-                foo = foo +1 -1;
+                ByteStreams.copy(inputStream, outputStream);
             }
             finally
             {
@@ -2952,32 +2827,32 @@ public abstract class AbstractQueue<X ex
 
     public long getTotalEnqueuedBytes()
     {
-        return _enqueueSize.get();
+        return _queueStatistics.getEnqueueSize();
     }
 
     public long getTotalDequeuedBytes()
     {
-        return _dequeueSize.get();
+        return _queueStatistics.getDequeueSize();
     }
 
     public long getPersistentEnqueuedBytes()
     {
-        return _persistentMessageEnqueueSize.get();
+        return _queueStatistics.getPersistentEnqueueSize();
     }
 
     public long getPersistentDequeuedBytes()
     {
-        return _persistentMessageDequeueSize.get();
+        return _queueStatistics.getPersistentDequeueSize();
     }
 
     public long getPersistentEnqueuedMessages()
     {
-        return _persistentMessageEnqueueCount.get();
+        return _queueStatistics.getPersistentEnqueueCount();
     }
 
     public long getPersistentDequeuedMessages()
     {
-        return _persistentMessageDequeueCount.get();
+        return _queueStatistics.getPersistentDequeueCount();
     }
 
     @Override
@@ -3025,26 +2900,12 @@ public abstract class AbstractQueue<X ex
 
     public long getUnacknowledgedMessages()
     {
-        return _unackedMsgCount.get();
+        return _queueStatistics.getUnackedCount();
     }
 
     public long getUnacknowledgedBytes()
     {
-        return _unackedMsgBytes.get();
-    }
-
-    @Override
-    public void decrementUnackedMsgCount(QueueEntry queueEntry)
-    {
-        _unackedMsgCount.decrementAndGet();
-        _unackedMsgBytes.addAndGet(-queueEntry.getSize());
-    }
-
-    @Override
-    public void incrementUnackedMsgCount(QueueEntry entry)
-    {
-        _unackedMsgCount.incrementAndGet();
-        _unackedMsgBytes.addAndGet(entry.getSize());
+        return _queueStatistics.getUnackedSize();
     }
 
     @Override

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java?rev=1769383&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageDurability;
+
+abstract class AbstractQueueEntryList implements QueueEntryList
+{
+
+    private final boolean _forcePersistent;
+    private final boolean _respectPersistent;
+    private final Queue<?> _queue;
+    private final QueueStatistics _queueStatistics;
+
+    protected AbstractQueueEntryList(final Queue<?> queue, final QueueStatistics queueStatistics)
+    {
+
+        final MessageDurability messageDurability = queue.getMessageDurability();
+        _queue = queue;
+        _queueStatistics = queueStatistics;
+        _forcePersistent = messageDurability == MessageDurability.ALWAYS;
+        _respectPersistent = messageDurability == MessageDurability.DEFAULT;
+    }
+
+
+    void updateStatsOnEnqueue(QueueEntry entry)
+    {
+        final long size = entry.getSize();
+        final QueueStatistics queueStatistics = _queueStatistics;
+        queueStatistics.addToAvailable(size);
+        queueStatistics.addToQueue(size);
+        queueStatistics.addToEnqueued(size);
+        if(_forcePersistent || (_respectPersistent && entry.getMessage().isPersistent()))
+        {
+            queueStatistics.addToPersistentEnqueued(size);
+        }
+    }
+
+    public void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState)
+    {
+        final QueueStatistics queueStatistics = _queueStatistics;
+        final long size = entry.getSize();
+
+        final boolean isConsumerAcquired = toState instanceof MessageInstance.ConsumerAcquiredState;
+        final boolean wasConsumerAcquired = fromState instanceof MessageInstance.ConsumerAcquiredState;
+
+        switch(fromState.getState())
+        {
+            case AVAILABLE:
+                queueStatistics.removeFromAvailable(size);
+                break;
+            case ACQUIRED:
+                if(wasConsumerAcquired && !isConsumerAcquired)
+                {
+                    queueStatistics.removeFromUnacknowledged(size);
+                }
+                break;
+        }
+        switch(toState.getState())
+        {
+            case AVAILABLE:
+                queueStatistics.addToAvailable(size);
+                break;
+            case ACQUIRED:
+                if(isConsumerAcquired && !wasConsumerAcquired)
+                {
+                    queueStatistics.addToUnacknowledged(size);
+                }
+                break;
+            case DELETED:
+                queueStatistics.removeFromQueue(size);
+                queueStatistics.addToDequeued(size);
+                if(_forcePersistent || (_respectPersistent && entry.getMessage().isPersistent()))
+                {
+                    queueStatistics.addToPersistentDequeued(size);
+                }
+                _queue.checkCapacity();
+
+        }
+    }
+
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueImpl.java Sat Nov 12 14:45:53 2016
@@ -44,7 +44,7 @@ public class LastValueQueueImpl extends
     protected void onOpen()
     {
         super.onOpen();
-        _entries = new LastValueQueueList(this);
+        _entries = new LastValueQueueList(this, getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Sat Nov 12 14:45:53 2016
@@ -57,9 +57,9 @@ public class LastValueQueueList extends
     private final ConflationQueueEntry _deleteInProgress = new ConflationQueueEntry(this);
     private final ConflationQueueEntry _newerEntryAlreadyBeenAndGone = new ConflationQueueEntry(this);
 
-    public LastValueQueueList(LastValueQueueImpl queue)
+    public LastValueQueueList(LastValueQueue<?> queue, QueueStatistics queueStatistics)
     {
-        super(queue, HEAD_CREATOR);
+        super(queue, queueStatistics, HEAD_CREATOR);
         _conflationKey = queue.getLvqKey();
     }
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -28,7 +28,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 
-public abstract class OrderedQueueEntryList implements QueueEntryList
+public abstract class OrderedQueueEntryList extends AbstractQueueEntryList
 {
 
     private final OrderedQueueEntry _head;
@@ -51,8 +51,11 @@ public abstract class OrderedQueueEntryL
     private final AtomicReference<QueueEntry> _unscavengedHWM = new AtomicReference<QueueEntry>();
 
 
-    public OrderedQueueEntryList(Queue<?> queue, HeadCreator headCreator)
+    public OrderedQueueEntryList(Queue<?> queue,
+                                 final QueueStatistics queueStatistics,
+                                 HeadCreator headCreator)
     {
+        super(queue, queueStatistics);
         _queue = queue;
         _head = headCreator.createHead(this);
         _tail = _head;
@@ -81,7 +84,8 @@ public abstract class OrderedQueueEntryL
 
     public QueueEntry add(ServerMessage message, final MessageEnqueueRecord enqueueRecord)
     {
-        OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+        final OrderedQueueEntry node = createQueueEntry(message, enqueueRecord);
+        updateStatsOnEnqueue(node);
         for (;;)
         {
             OrderedQueueEntry tail = _tail;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Sat Nov 12 14:45:53 2016
@@ -35,7 +35,7 @@ abstract public class PriorityQueueList
     public PriorityQueueList(final PriorityQueueImpl queue,
                              final HeadCreator headCreator)
     {
-        super(queue, headCreator);
+        super(queue, queue.getQueueStatistics(), headCreator);
     }
 
     static class PriorityQueueMasterList extends PriorityQueueList

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java Sat Nov 12 14:45:53 2016
@@ -624,7 +624,6 @@ class QueueConsumerImpl
     public void acquisitionRemoved(final QueueEntry node)
     {
         _target.acquisitionRemoved(node);
-        _queue.decrementUnackedMsgCount(node);
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Sat Nov 12 14:45:53 2016
@@ -296,7 +296,7 @@ public abstract class QueueEntryImpl imp
             }
         }
 
-        if(acquired && _stateChangeListeners != null)
+        if(acquired)
         {
             notifyStateChange(AVAILABLE_STATE, state);
         }
@@ -310,7 +310,6 @@ public abstract class QueueEntryImpl imp
         if(acquired)
         {
             _deliveryCountUpdater.compareAndSet(this,-1,0);
-            getQueue().incrementUnackedMsgCount(this);
         }
         return acquired;
     }
@@ -404,7 +403,7 @@ public abstract class QueueEntryImpl imp
     {
         EntryState state = _state;
 
-        if((state.getState() == State.ACQUIRED) &&_stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
+        if((state.getState() == State.ACQUIRED) && _stateUpdater.compareAndSet(this, state, AVAILABLE_STATE))
         {
             postRelease(state);
         }
@@ -422,15 +421,11 @@ public abstract class QueueEntryImpl imp
 
     private void postRelease(final EntryState previousState)
     {
-        if (previousState instanceof ConsumerAcquiredState)
-        {
-            getQueue().decrementUnackedMsgCount(this);
-        }
 
         if(!getQueue().isDeleted())
         {
             getQueue().requeue(this);
-            if (_stateChangeListeners != null && previousState.getState() == State.ACQUIRED)
+            if (previousState.getState() == State.ACQUIRED)
             {
                 notifyStateChange(previousState, AVAILABLE_STATE);
             }
@@ -523,16 +518,7 @@ public abstract class QueueEntryImpl imp
 
         if(state.getState() == State.ACQUIRED)
         {
-            if (state instanceof ConsumerAcquiredState)
-            {
-                getQueue().decrementUnackedMsgCount(this);
-            }
-
-            getQueue().dequeue(this);
-            if(_stateChangeListeners != null)
-            {
-                notifyStateChange(state, DEQUEUED_STATE);
-            }
+            notifyStateChange(state, DEQUEUED_STATE);
             return true;
         }
         else
@@ -544,6 +530,7 @@ public abstract class QueueEntryImpl imp
 
     private void notifyStateChange(final EntryState oldState, final EntryState newState)
     {
+        _queueEntryList.updateStatsOnStateChange(this, oldState, newState);
         StateChangeListenerEntry<? super QueueEntry, EntryState> entry = _listenersUpdater.get(this);
         while(entry != null)
         {
@@ -562,6 +549,7 @@ public abstract class QueueEntryImpl imp
 
         if(state != DELETED_STATE && _stateUpdater.compareAndSet(this,state,DELETED_STATE))
         {
+            notifyStateChange(state, DELETED_STATE);
             _queueEntryList.entryDeleted(this);
             onDelete();
             _message.release();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.store.MessageEnqueueRecord;
 
-public interface QueueEntryList
+interface QueueEntryList
 {
     Queue<?> getQueue();
 
@@ -44,4 +44,6 @@ public interface QueueEntryList
     
     int getPriorities();
 
+    void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
+
 }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java?rev=1769383&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java Sat Nov 12 14:45:53 2016
@@ -0,0 +1,181 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+final class QueueStatistics
+{
+    private final AtomicInteger _queueCount = new AtomicInteger();
+    private final AtomicLong _queueSize = new AtomicLong();
+
+    private final AtomicInteger _unackedCount = new AtomicInteger();
+    private final AtomicLong _unackedSize = new AtomicLong();
+
+    private final AtomicInteger _availableCount = new AtomicInteger();
+    private final AtomicLong _availableSize = new AtomicLong();
+
+    private final AtomicLong _dequeueCount = new AtomicLong();
+    private final AtomicLong _dequeueSize = new AtomicLong();
+
+    private final AtomicLong _enqueueCount = new AtomicLong();
+    private final AtomicLong _enqueueSize = new AtomicLong();
+
+    private final AtomicLong _persistentEnqueueCount = new AtomicLong();
+    private final AtomicLong _persistentEnqueueSize = new AtomicLong();
+
+    private final AtomicLong _persistentDequeueCount = new AtomicLong();
+    private final AtomicLong _persistentDequeueSize = new AtomicLong();
+
+    public final int getQueueCount()
+    {
+        return _queueCount.get();
+    }
+
+    public final long getQueueSize()
+    {
+        return _queueSize.get();
+    }
+
+    public final int getUnackedCount()
+    {
+        return _unackedCount.get();
+    }
+
+    public final long getUnackedSize()
+    {
+        return _unackedSize.get();
+    }
+
+    public final int getAvailableCount()
+    {
+        return _availableCount.get();
+    }
+
+    public final long getAvailableSize()
+    {
+        return _availableSize.get();
+    }
+
+    public final long getEnqueueCount()
+    {
+        return _enqueueCount.get();
+    }
+
+    public final long getEnqueueSize()
+    {
+        return _enqueueSize.get();
+    }
+
+    public final long getDequeueCount()
+    {
+        return _dequeueCount.get();
+    }
+
+    public final long getDequeueSize()
+    {
+        return _dequeueSize.get();
+    }
+
+    public final long getPersistentEnqueueCount()
+    {
+        return _persistentEnqueueCount.get();
+    }
+
+    public final long getPersistentEnqueueSize()
+    {
+        return _persistentEnqueueSize.get();
+    }
+
+    public final long getPersistentDequeueCount()
+    {
+        return _persistentDequeueCount.get();
+    }
+
+    public final long getPersistentDequeueSize()
+    {
+        return _persistentDequeueSize.get();
+    }
+
+
+
+    void addToQueue(long size)
+    {
+        _queueCount.incrementAndGet();
+        _queueSize.addAndGet(size);
+    }
+
+    void removeFromQueue(long size)
+    {
+        _queueCount.decrementAndGet();
+        _queueSize.addAndGet(-size);
+    }
+
+    void addToAvailable(long size)
+    {
+        _availableCount.incrementAndGet();
+        _availableSize.addAndGet(size);
+    }
+
+    void removeFromAvailable(long size)
+    {
+        _availableCount.decrementAndGet();
+        _availableSize.addAndGet(-size);
+    }
+
+    void addToUnacknowledged(long size)
+    {
+        _unackedCount.incrementAndGet();
+        _unackedSize.addAndGet(size);
+    }
+
+    void removeFromUnacknowledged(long size)
+    {
+        _unackedCount.decrementAndGet();
+        _unackedSize.addAndGet(-size);
+    }
+
+    void addToEnqueued(long size)
+    {
+        _enqueueCount.incrementAndGet();
+        _enqueueSize.addAndGet(size);
+    }
+
+    void addToDequeued(long size)
+    {
+        _dequeueCount.incrementAndGet();
+        _dequeueSize.addAndGet(size);
+    }
+
+    void addToPersistentEnqueued(long size)
+    {
+        _persistentEnqueueCount.incrementAndGet();
+        _persistentEnqueueSize.addAndGet(size);
+    }
+
+    void addToPersistentDequeued(long size)
+    {
+        _persistentDequeueCount.incrementAndGet();
+        _persistentDequeueSize.addAndGet(size);
+    }
+
+}

Propchange: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueStatistics.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -31,7 +31,7 @@ import org.apache.qpid.server.store.Mess
  * ISBN-13: 978-0262033848
  * see http://en.wikipedia.org/wiki/Red-black_tree
  */
-public class SortedQueueEntryList implements QueueEntryList
+public class SortedQueueEntryList extends AbstractQueueEntryList
 {
     private final SortedQueueEntry _head;
     private SortedQueueEntry _root;
@@ -40,8 +40,9 @@ public class SortedQueueEntryList implem
     private final SortedQueueImpl _queue;
     private final String _propertyName;
 
-    public SortedQueueEntryList(final SortedQueueImpl queue)
+    public SortedQueueEntryList(final SortedQueueImpl queue, final QueueStatistics queueStatistics)
     {
+        super(queue, queueStatistics);
         _queue = queue;
         _head = new SortedQueueEntry(this);
         _propertyName = queue.getSortKey();
@@ -64,6 +65,8 @@ public class SortedQueueEntryList implem
             }
 
             final SortedQueueEntry entry = new SortedQueueEntry(this,message, ++_entryId, enqueueRecord);
+            updateStatsOnEnqueue(entry);
+
             entry.setKey(key);
 
             insert(entry);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueImpl.java Sat Nov 12 14:45:53 2016
@@ -50,7 +50,7 @@ public class SortedQueueImpl extends Out
     protected void onOpen()
     {
         super.onOpen();
-        _entries = new SortedQueueEntryList(this);
+        _entries = new SortedQueueEntryList(this, getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -35,9 +35,9 @@ public class StandardQueueEntryList exte
         }
     };
 
-    public StandardQueueEntryList(final StandardQueueImpl queue)
+    public StandardQueueEntryList(final StandardQueue<?> queue, QueueStatistics queueStatistics)
     {
-        super(queue, HEAD_CREATOR);
+        super(queue, queueStatistics, HEAD_CREATOR);
     }
 
 

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueImpl.java Sat Nov 12 14:45:53 2016
@@ -39,7 +39,7 @@ public class StandardQueueImpl extends A
     protected void onOpen()
     {
         super.onOpen();
-        _entries = new StandardQueueEntryList(this);
+        _entries = new StandardQueueEntryList(this, getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/QueueMessageRecoveryTest.java Sat Nov 12 14:45:53 2016
@@ -193,6 +193,7 @@ public class QueueMessageRecoveryTest ex
     {
 
         private final List<ServerMessage<?>> _messageList;
+        private final QueueEntryList _entries = mock(QueueEntryList.class);
 
         protected TestQueue(final Map<String, Object> attributes,
                             final QueueManagingVirtualHost<?> virtualHost,
@@ -205,7 +206,7 @@ public class QueueMessageRecoveryTest ex
         @Override
         QueueEntryList getEntries()
         {
-            return null;
+            return _entries;
         }
 
         @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java Sat Nov 12 14:45:53 2016
@@ -33,7 +33,7 @@ public class SelfValidatingSortedQueueEn
 {
     public SelfValidatingSortedQueueEntryList(SortedQueueImpl queue)
     {
-        super(queue);
+        super(queue, queue.getQueueStatistics());
     }
 
     @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java Sat Nov 12 14:45:53 2016
@@ -148,7 +148,12 @@ public class StandardQueueEntryListTest
     public ServerMessage getTestMessageToAdd()
     {
         ServerMessage msg = mock(ServerMessage.class);
+        MessageReference ref = mock(MessageReference.class);
+        when(ref.getMessage()).thenReturn(msg);
         when(msg.getMessageNumber()).thenReturn(1l);
+        when(msg.newReference()).thenReturn(ref);
+        when(msg.newReference(any(TransactionLogResource.class))).thenReturn(ref);
+
         return msg;
     }
 
@@ -160,7 +165,7 @@ public class StandardQueueEntryListTest
 
     public void testScavenge() throws Exception
     {
-        OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueueImpl.class));
+        OrderedQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueue.class), new QueueStatistics());
         ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
 
 

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java Sat Nov 12 14:45:53 2016
@@ -259,7 +259,7 @@ public class StandardQueueTest extends A
     private static class DequeuedQueue extends AbstractQueue
     {
 
-        private QueueEntryList _entries = new DequeuedQueueEntryList(this);
+        private QueueEntryList _entries = new DequeuedQueueEntryList(this, getQueueStatistics());
 
         public DequeuedQueue(QueueManagingVirtualHost<?> virtualHost)
         {
@@ -295,9 +295,9 @@ public class StandardQueueTest extends A
                     }
                 };
 
-        public DequeuedQueueEntryList(final DequeuedQueue queue)
+        public DequeuedQueueEntryList(final DequeuedQueue queue, final QueueStatistics queueStatistics)
         {
-            super(queue, HEAD_CREATOR);
+            super(queue, queueStatistics, HEAD_CREATOR);
         }
 
         /**

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java Sat Nov 12 14:45:53 2016
@@ -359,7 +359,7 @@ public abstract class ConsumerTarget_0_8
         return _targetAddress;
     }
 
-    public AMQSessionModel getSessionModel()
+    public AMQChannel getSessionModel()
     {
         return _channel;
     }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1769383&r1=1769382&r2=1769383&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Sat Nov 12 14:45:53 2016
@@ -27,6 +27,7 @@ import java.util.Collection;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl;
@@ -504,7 +505,7 @@ class ConsumerTarget_1_0 extends Abstrac
     }
 
     @Override
-    public AMQSessionModel getSessionModel()
+    public Session_1_0 getSessionModel()
     {
         return getSession();
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message