activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r638385 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: PrefetchSubscription.java Queue.java QueueSubscription.java Subscription.java
Date Tue, 18 Mar 2008 14:32:46 GMT
Author: rajdavies
Date: Tue Mar 18 07:32:41 2008
New Revision: 638385

URL: http://svn.apache.org/viewvc?rev=638385&view=rev
Log:
change reference count boundaries around messages - so they
are around acks

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Mar 18 07:32:41 2008
@@ -610,15 +610,4 @@
     public void setMaxAuditDepth(int maxAuditDepth) {
         this.maxAuditDepth = maxAuditDepth;
     }
-    
-    
-    public List<MessageReference> getInFlightMessages(){
-        List<MessageReference> result = new ArrayList<MessageReference>();
-        synchronized(pendingLock) {
-            result.addAll(dispatched);
-            result.addAll(pending.pageInList(1000));
-        }
-        return result;
-    }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Tue Mar 18 07:32:41 2008
@@ -944,6 +944,7 @@
         reference.drop();
         acknowledge(context, sub, ack, reference);
         destinationStatistics.getMessages().decrement();
+        reference.decrementReferenceCount();
         synchronized(pagedInMessages) {
             pagedInMessages.remove(reference.getMessageId());
         }
@@ -1034,6 +1035,7 @@
                         if (dispatchSelector.canSelect(s, node)) {
                             if (!s.isFull()) {
                                 s.add(node);
+                                node.incrementReferenceCount();
                                 target = s;
                                 break;
                             } else {
@@ -1055,6 +1057,7 @@
                         }
                         if (target != null) {
                             target.add(node);
+                            node.incrementReferenceCount();
                         }
                     }
                     if (target != null && !strictOrderDispatch && consumers.size()
> 1 &&

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Tue Mar 18 07:32:41 2008
@@ -108,59 +108,6 @@
     }
 
     /**
-     * Override so that the message ref count is > 0 only when the message is
-     * being dispatched to a client. Keeping it at 0 when it is in the pending
-     * list allows the message to be swapped out to disk.
-     * 
-     * @return true if the message was dispatched.
-     */
-    protected boolean dispatch(MessageReference node) throws IOException {
-        boolean rc = false;
-        // This brings the message into memory if it was swapped out.
-        node.incrementReferenceCount();
-        try {
-            rc = super.dispatch(node);
-        } finally {
-            // If the message was dispatched, it could be getting dispatched
-            // async, so we
-            // can only drop the reference count when that completes @see
-            // onDispatch
-            if (!rc) {
-                node.decrementReferenceCount();
-            }
-        }
-        return rc;
-    }
-
-    /**
-     * OK Message was transmitted, we can now drop the reference count.
-     * 
-     * @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference,
-     *      org.apache.activemq.command.Message)
-     */
-    protected void onDispatch(MessageReference node, Message message) {
-        // Now that the message has been sent over the wire to the client,
-        // we can let it get swapped out.
-        node.decrementReferenceCount();
-        super.onDispatch(node, message);
-    }
-
-    /**
-     * Sending a message to the DQL will require us to increment the ref count
-     * so we can get at the content.
-     */
-    protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException,
Exception {
-        // This brings the message into memory if it was swapped out.
-        node.incrementReferenceCount();
-        try {
-            super.sendToDLQ(context, node);
-        } finally {
-            // This let's the message be swapped out of needed.
-            node.decrementReferenceCount();
-        }
-    }
-
-    /**
      */
     public void destroy() {
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=638385&r1=638384&r2=638385&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Subscription.java
Tue Mar 18 07:32:41 2008
@@ -215,11 +215,4 @@
      * @return true if a browser
      */
     boolean isBrowser();
-    
-    /**
-     * Get the list of in flight messages
-     * @return list
-     */
-    List<MessageReference> getInFlightMessages();
-
 }



Mime
View raw message