activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rajdav...@apache.org
Subject svn commit: r912656 - in /activemq/branches/activemq-5.3: ./ activemq-core/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/ activemq-core/src/main/java/org/apache/activemq/transport...
Date Mon, 22 Feb 2010 17:03:36 GMT
Author: rajdavies
Date: Mon Feb 22 17:03:36 2010
New Revision: 912656

URL: http://svn.apache.org/viewvc?rev=912656&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2616

Added:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
      - copied unchanged from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2616Test.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/
      - copied from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
      - copied unchanged from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
      - copied unchanged from r912652, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java
Modified:
    activemq/branches/activemq-5.3/   (props changed)
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
  (props changed)
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
  (props changed)

Propchange: activemq/branches/activemq-5.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 22 17:03:36 2010
@@ -1 +1 @@
-/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,911812-912087,912496
+/activemq/trunk:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,911812-912087,912496-912652

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=912656&r1=912655&r2=912656&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
Mon Feb 22 17:03:36 2010
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.broker.region;
 
+import java.io.IOException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
@@ -23,6 +24,8 @@
 import org.apache.activemq.command.ActiveMQTempDestination;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * The Queue is a List of MessageEntry objects that are dispatched to matching
@@ -31,6 +34,7 @@
  * @version $Revision: 1.28 $
  */
 public class TempQueue extends Queue{
+    private static final Log LOG = LogFactory.getLog(TempQueue.class);
     private final ActiveMQTempDestination tempDest;
    
     
@@ -50,6 +54,7 @@
         this.tempDest = (ActiveMQTempDestination) destination;
     }
     
+    @Override
     public void initialize() throws Exception {
         this.messages=new VMPendingMessageCursor();
         this.messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
@@ -58,6 +63,7 @@
         this.taskRunner = taskFactory.createTaskRunner(this, "TempQueue:  " + destination.getPhysicalName());
     }
     
+    @Override
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
         // Only consumers on the same connection can consume from
         // the temporary destination
@@ -74,4 +80,14 @@
         }
         super.addSubscription(context, sub);
     }
+    
+    @Override
+    public void dispose(ConnectionContext context) throws IOException {
+        try {
+           purge();
+        } catch (Exception e) {
+          LOG.warn("Caught an exception purging Queue: " + destination);
+        }
+        super.dispose(context);
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=912656&r1=912655&r2=912656&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
Mon Feb 22 17:03:36 2010
@@ -86,7 +86,12 @@
             clearIterator(true);
             recovered = true;
         } else {
-            LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + " cursor
got duplicate: " + message);
+            /*
+             * we should expect to get these - as the message is recorded as it before it
goes into
+             * the cache. If subsequently, we pull out that message from the store (before
its deleted)
+             * it will be a duplicate - but should be ignored
+             */
+            //LOG.error(regionDestination.getActiveMQDestination().getPhysicalName() + "
cursor got duplicate: " + message);
             storeHasMessages = true;
         }
         return recovered;

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?rev=912656&r1=912655&r2=912656&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Mon Feb 22 17:03:36 2010
@@ -20,7 +20,6 @@
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -33,38 +32,39 @@
  * @version $Revision$
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor {
-    private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
+    private final LinkedList<MessageReference> list = new LinkedList<MessageReference>();
     private Iterator<MessageReference> iter;
-    public VMPendingMessageCursor(){
-        this.useCache=false;
+    public VMPendingMessageCursor() {
+        this.useCache = false;
     }
 
-    
     @Override
-    public synchronized List<MessageReference> remove(ConnectionContext context, Destination
destination) throws Exception {
-    	List<MessageReference> rc = new ArrayList<MessageReference>();
+    public synchronized List<MessageReference> remove(ConnectionContext context, Destination
destination)
+            throws Exception {
+        List<MessageReference> rc = new ArrayList<MessageReference>();
         for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
             MessageReference r = iterator.next();
-            if( r.getRegionDestination()==destination ) {
+            if (r.getRegionDestination() == destination) {
                 r.decrementReferenceCount();
                 rc.add(r);
                 iterator.remove();
             }
         }
-        return rc ;        
+        return rc;
     }
-    
+
     /**
      * @return true if there are no pending messages
      */
+    @Override
     public synchronized boolean isEmpty() {
         if (list.isEmpty()) {
             return true;
         } else {
             for (Iterator<MessageReference> iterator = list.iterator(); iterator.hasNext();)
{
                 MessageReference node = iterator.next();
-                if (node== QueueMessageReference.NULL_MESSAGE){
-                	continue;
+                if (node == QueueMessageReference.NULL_MESSAGE) {
+                    continue;
                 }
                 if (!node.isDropped()) {
                     return false;
@@ -79,6 +79,7 @@
     /**
      * reset the cursor
      */
+    @Override
     public synchronized void reset() {
         iter = list.listIterator();
         last = null;
@@ -89,6 +90,7 @@
      * 
      * @param node
      */
+    @Override
     public synchronized void addMessageLast(MessageReference node) {
         node.incrementReferenceCount();
         list.addLast(node);
@@ -100,6 +102,7 @@
      * @param position
      * @param node
      */
+    @Override
     public synchronized void addMessageFirst(MessageReference node) {
         node.incrementReferenceCount();
         list.addFirst(node);
@@ -108,6 +111,7 @@
     /**
      * @return true if there pending messages to dispatch
      */
+    @Override
     public synchronized boolean hasNext() {
         return iter.hasNext();
     }
@@ -115,8 +119,9 @@
     /**
      * @return the next pending message
      */
+    @Override
     public synchronized MessageReference next() {
-        last = (MessageReference)iter.next();
+        last = iter.next();
         if (last != null) {
             last.incrementReferenceCount();
         }
@@ -126,6 +131,7 @@
     /**
      * remove the message at the cursor position
      */
+    @Override
     public synchronized void remove() {
         if (last != null) {
             last.decrementReferenceCount();
@@ -136,6 +142,7 @@
     /**
      * @return the number of pending messages
      */
+    @Override
     public synchronized int size() {
         return list.size();
     }
@@ -143,10 +150,16 @@
     /**
      * clear all pending messages
      */
+    @Override
     public synchronized void clear() {
+        for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
+            MessageReference ref = i.next();
+            ref.decrementReferenceCount();
+        }
         list.clear();
     }
 
+    @Override
     public synchronized void remove(MessageReference node) {
         for (Iterator<MessageReference> i = list.iterator(); i.hasNext();) {
             MessageReference ref = i.next();
@@ -164,11 +177,19 @@
      * @param maxItems
      * @return a list of paged in messages
      */
+    @Override
     public LinkedList<MessageReference> pageInList(int maxItems) {
         return list;
     }
-    
+
+    @Override
     public boolean isTransient() {
         return true;
     }
+
+    @Override
+    public void destroy() throws Exception {
+        super.destroy();
+        clear();
+    }
 }

Propchange: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 22 17:03:36 2010
@@ -1 +1 @@
-/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,905769,911812-912087,912496
+/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/InactivityMonitor.java:816278-816279,816298,818138,818140-818147,818155,818160-818176,818209,818211,818224-818262,818480,818484,818487,818496,818502,818504-818510,818513-818516,818609,818635,818724-818762,818888,818905,818914,818923,818947-818955,818985,820031,820713-820714,820764,821090,821103,821115,824807,825009-825084,881278-881313,905769,911812-912087,912496-912652

Propchange: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Feb 22 17:03:36 2010
@@ -1 +1 @@
-/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764,821090,821103,821115,825009-825084,881278-881313,911812-912087,912496
+/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java:502054-818935,818937-819035,820031,820713-820714,820764,821090,821103,821115,825009-825084,881278-881313,911812-912087,912496-912652



Mime
View raw message