qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject svn commit: r941792 - in /qpid/branches/0.5.x-dev/qpid/java/broker/src: main/java/org/apache/qpid/server/ack/ main/java/org/apache/qpid/server/queue/ main/java/org/apache/qpid/server/txn/ test/java/org/apache/qpid/server/ack/
Date Thu, 06 May 2010 16:20:24 GMT
Author: robbie
Date: Thu May  6 16:20:24 2010
New Revision: 941792

URL: http://svn.apache.org/viewvc?rev=941792&view=rev
Log:
QPID-2568: make some necessary adjustments to support the release of the message object upon
QueueEntry deletion

Modified:
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
    qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMap.java
Thu May  6 16:20:24 2010
@@ -74,8 +74,6 @@ public interface UnacknowledgedMessageMa
      * @return a set of delivery tags
      */
     Set<Long> getDeliveryTags();
-
-    public long getUnacknowledgeBytes();
 }
 
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessageMapImpl.java
Thu May  6 16:20:24 2010
@@ -24,14 +24,10 @@ import org.apache.qpid.server.store.Stor
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.AMQMessage;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.txn.TransactionalContext;
 
@@ -39,8 +35,6 @@ public class UnacknowledgedMessageMapImp
 {
     private final Object _lock = new Object();
 
-    private long _unackedSize;
-
     private Map<Long, QueueEntry> _map;
 
     private long _lastDeliveryTag;
@@ -90,14 +84,9 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
 
-            QueueEntry message = _map.remove(deliveryTag);
-            if(message != null)
-            {
-                _unackedSize -= message.getMessage().getSize();
-
-            }
+            QueueEntry entry = _map.remove(deliveryTag);
 
-            return message;
+            return entry;
         }
     }
 
@@ -119,7 +108,6 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             _map.put(deliveryTag, message);
-            _unackedSize += message.getMessage().getSize();
             _lastDeliveryTag = deliveryTag;
         }
     }
@@ -130,7 +118,6 @@ public class UnacknowledgedMessageMapImp
         {
             Collection<QueueEntry> currentEntries = _map.values();
             _map = new LinkedHashMap<Long, QueueEntry>(_prefetchLimit);
-            _unackedSize = 0l;
             return currentEntries;
         }
     }
@@ -157,7 +144,6 @@ public class UnacknowledgedMessageMapImp
         synchronized (_lock)
         {
             _map.clear();
-            _unackedSize = 0l;
         }
     }
 
@@ -183,9 +169,6 @@ public class UnacknowledgedMessageMapImp
 
                 it.remove();
 
-                _unackedSize -= unacked.getValue().getMessage().getSize();
-
-
                 if (unacked.getKey() == deliveryTag)
                 {
                     break;
@@ -225,8 +208,4 @@ public class UnacknowledgedMessageMapImp
         }
     }
 
-    public long getUnacknowledgeBytes()
-    {
-        return _unackedSize;
-    }
 }

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
Thu May  6 16:20:24 2010
@@ -137,8 +137,6 @@ public interface QueueEntry extends Comp
 
     long getSize();
 
-    boolean getDeliveredToConsumer();
-
     boolean expired() throws AMQException;
 
     boolean isAcquired();

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
Thu May  6 16:20:24 2010
@@ -43,6 +43,8 @@ public class QueueEntryImpl implements Q
     private final SimpleQueueEntryList _queueEntryList;
 
     private AMQMessage _message;
+    
+    private boolean _immediateAndNotDelivered = false;
 
     private Set<Subscription> _rejectedBy = null;
 
@@ -119,17 +121,14 @@ public class QueueEntryImpl implements Q
 
     public long getSize()
     {
-        return getMessage().getSize();
-    }
-
-    public boolean getDeliveredToConsumer()
-    {
-        return getMessage().getDeliveredToConsumer();
+        AMQMessage message = getMessage();
+        return message == null ? 0 : message.getSize();
     }
 
     public boolean expired() throws AMQException
     {
-        return getMessage().expired(getQueue());
+        AMQMessage message = getMessage();
+        return message == null ? false : message.expired(getQueue());
     }
 
     public boolean isAcquired()
@@ -167,13 +166,16 @@ public class QueueEntryImpl implements Q
 
     public boolean acquiredBySubscription()
     {
-
         return (_state instanceof SubscriptionAcquiredState);
     }
 
     public void setDeliveredToSubscription()
     {
-        getMessage().setDeliveredToConsumer();
+        AMQMessage message = getMessage();
+        if (message != null)
+        {
+            message.setDeliveredToConsumer();
+        }
     }
 
     public void release()
@@ -197,12 +199,17 @@ public class QueueEntryImpl implements Q
 
     public boolean immediateAndNotDelivered() 
     {
-        return getMessage().immediateAndNotDelivered();
+        AMQMessage message = getMessage();
+        return message == null ? _immediateAndNotDelivered : message.immediateAndNotDelivered();
     }
 
     public void setRedelivered(boolean b)
     {
-        getMessage().setRedelivered(b);
+        AMQMessage message = getMessage();
+        if(message != null)
+        {
+            message.setRedelivered(b);
+        }
     }
 
     public Subscription getDeliveredSubscription()
@@ -298,10 +305,16 @@ public class QueueEntryImpl implements Q
     {
         if(delete())
         {
-            getMessage().decrementReference(storeContext);
-            
-            //Ensure we can't hang on to the message;
-            _message = null;
+            AMQMessage msg = getMessage();
+            if(msg != null)
+            {
+                getMessage().decrementReference(storeContext);
+
+                _immediateAndNotDelivered = _message.immediateAndNotDelivered();
+
+                //Ensure we can't hang on to the message, release the ref;
+                _message = null;
+            }
         }
     }
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Thu May  6 16:20:24 2010
@@ -452,7 +452,7 @@ public class SimpleAMQQueue implements A
             deliverAsync();
         }
 
-        _managedObject.checkForNotification(entry.getMessage());
+        _managedObject.checkForNotification(message);
 
         return entry;
     }
@@ -780,7 +780,13 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                AMQMessage message = entry.getMessage();
+                if(message == null)
+                {
+                    return false;
+                }
+                
+                final long messageId = message.getMessageId();
                 return messageId >= fromMessageId && messageId <= toMessageId;
             }
 
@@ -799,7 +805,13 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                _complete = entry.getMessage().getMessageId() == messageId;
+                AMQMessage message = entry.getMessage();
+                if(message == null)
+                {
+                    return false;
+                }
+                
+                _complete = message.getMessageId() == messageId;
                 return _complete;
             }
 
@@ -871,7 +883,13 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                AMQMessage message = entry.getMessage();
+                if(message == null)
+                {
+                    return false;
+                }
+                
+                final long messageId = message.getMessageId();
                 return (messageId >= fromMessageId)
                        && (messageId <= toMessageId)
                        && entry.acquire();
@@ -955,13 +973,19 @@ public class SimpleAMQQueue implements A
 
             public boolean accept(QueueEntry entry)
             {
-                final long messageId = entry.getMessage().getMessageId();
+                AMQMessage message = entry.getMessage();
+                if(message == null)
+                {
+                    return false;
+                }
+                
+                final long messageId = message.getMessageId();
                 if ((messageId >= fromMessageId)
                     && (messageId <= toMessageId))
                 {
                     if (!entry.isDeleted())
                     {
-                        return entry.getMessage().incrementReference();
+                        return message.incrementReference();
                     }
                 }
 
@@ -982,6 +1006,10 @@ public class SimpleAMQQueue implements A
             for (QueueEntry entry : entries)
             {
                 AMQMessage message = entry.getMessage();
+                if(message == null)
+                {
+                    continue;
+                }
 
                 if (message.isReferenced() && message.isPersistent())
                 {
@@ -1043,6 +1071,11 @@ public class SimpleAMQQueue implements A
             while (queueListIterator.advance())
             {
                 QueueEntry node = queueListIterator.getNode();
+                
+                if(node.isDeleted())
+                {
+                    continue;
+                }
 
                 final long messageId = node.getMessage().getMessageId();
 

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
Thu May  6 16:20:24 2010
@@ -98,7 +98,7 @@ public class NonTransactionalContext imp
         //required by the 'immediate' flag:
         if(entry.immediateAndNotDelivered())
         {
-            _returnMessages.add(new NoConsumersException(entry.getMessage()));
+            _returnMessages.add(new NoConsumersException(message));
         }
 
     }
@@ -180,17 +180,17 @@ public class NonTransactionalContext imp
                 beginTranIfNecessary();
             }
 
-            //Message has been ack so discard it. This will dequeue and decrement the reference.
-            msg.discard(_storeContext);
-
             unacknowledgedMessageMap.remove(deliveryTag);
 
-
             if (debug)
             {
                 _log.debug("Received non-multiple ack for messaging with delivery tag " +
deliveryTag + " msg id " +
                            msg.getMessage().getMessageId());
             }
+
+            //Message has been ack so discard it. This will dequeue and decrement the reference.
+            msg.discard(_storeContext);
+
         }
         if(_inTran)
         {

Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java?rev=941792&r1=941791&r2=941792&view=diff
==============================================================================
--- qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
(original)
+++ qpid/branches/0.5.x-dev/qpid/java/broker/src/test/java/org/apache/qpid/server/ack/TxAckTest.java
Thu May  6 16:20:24 2010
@@ -123,6 +123,7 @@ public class TxAckTest extends TestCase
         private final List<Long> _unacked;
         private StoreContext _storeContext = new StoreContext();
 		private AMQQueue _queue;
+		private Map<QueueEntry, TestMessage> _messages;
 
         Scenario(int messageCount, List<Long> acked, List<Long> unacked) throws
Exception
         {
@@ -131,6 +132,8 @@ public class TxAckTest extends TestCase
                                                                           new LinkedList<RequiredDeliveryException>()
             );
 
+            _messages = new HashMap<QueueEntry, TestMessage>();
+
             VirtualHost virtualHost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts().iterator().next();
 
             _queue = AMQQueueFactory.createAMQQueueImpl(new AMQShortString("test"), false,
null, false,
@@ -171,6 +174,9 @@ public class TxAckTest extends TestCase
 
                 TestMessage message = new TestMessage(deliveryTag, i, info, txnContext.getStoreContext());
                 _map.add(deliveryTag, _queue.enqueue(new StoreContext(), message));
+                QueueEntry entry = _map.get(deliveryTag);
+
+                _messages.put(entry, (TestMessage) entry.getMessage());
             }
             _acked = acked;
             _unacked = unacked;
@@ -187,7 +193,7 @@ public class TxAckTest extends TestCase
             {
                 QueueEntry u = _map.get(tag);
                 assertTrue("Message not found for tag " + tag, u != null);
-                ((TestMessage) u.getMessage()).assertCountEquals(expected);
+                _messages.get(u).assertCountEquals(expected);
             }
         }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message