activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r382919 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region: Destination.java IndirectMessageReference.java LockOwner.java Queue.java QueueSubscription.java Topic.java
Date Fri, 03 Mar 2006 20:07:10 GMT
Author: chirino
Date: Fri Mar  3 12:07:07 2006
New Revision: 382919

URL: http://svn.apache.org/viewcvs?rev=382919&view=rev
Log:
- Improved the delete and purge operations on a queue.
  - Messages now removed from the message list: fixes message count < 0 problem
  - Messages are now locked before they are deleted, if messages are in flight to a consumer,
we don't want to delete them.

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=382919&r1=382918&r2=382919&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
Fri Mar  3 12:07:07 2006
@@ -38,7 +38,7 @@
     void removeSubscription(ConnectionContext context, Subscription sub) throws Exception;
     
     void send(ConnectionContext context, Message messageSend) throws Exception;
-    boolean lock(MessageReference node, Subscription subscription);
+    boolean lock(MessageReference node, LockOwner lockOwner);
     void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final
MessageReference node) throws IOException;
     
     void gc();

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java?rev=382919&r1=382918&r2=382919&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/IndirectMessageReference.java
Fri Mar  3 12:07:07 2006
@@ -48,7 +48,7 @@
     /** The number of times the message has been delivered.*/
     private short redeliveryCounter = 0;
     /** The subscription that has locked the message */
-    private Subscription lockOwner;
+    private LockOwner lockOwner;
     /** Has the message been dropped? */
     private boolean dropped;
     /** Has the message been acked? */
@@ -148,7 +148,7 @@
         }
     }
 
-    public boolean lock(Subscription subscription) {
+    public boolean lock(LockOwner subscription) {
         if( !regionDestination.lock(this, subscription) )
             return false;        
         synchronized (this) {
@@ -163,7 +163,7 @@
         lockOwner = null;
     }
     
-    synchronized public Subscription getLockOwner() {
+    synchronized public LockOwner getLockOwner() {
         return lockOwner;
     }
 

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java?rev=382919&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/LockOwner.java
Fri Mar  3 12:07:07 2006
@@ -0,0 +1,33 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region;
+
+public interface LockOwner {
+
+    public static final LockOwner HIGH_PRIORITY_LOCK_OWNER = new LockOwner() {
+        public int getLockPriority() {
+            return Integer.MAX_VALUE;
+        }
+        public boolean isLockExclusive() {
+            return false;
+        }
+    };
+
+    int getLockPriority();
+    boolean isLockExclusive();
+
+}

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=382919&r1=382918&r2=382919&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Mar  3 12:07:07 2006
@@ -65,7 +65,7 @@
     protected final UsageManager usageManager;
     protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
 
-    private Subscription exclusiveOwner;
+    private LockOwner exclusiveOwner;
     private MessageGroupMap messageGroupOwners;
     private int messageGroupHashBucketCount = 1024;
 
@@ -106,15 +106,15 @@
         }
     }
 
-    public synchronized boolean lock(MessageReference node, Subscription sub) {
-        if (exclusiveOwner == sub)
+    public synchronized boolean lock(MessageReference node, LockOwner lockOwner) {
+        if (exclusiveOwner == lockOwner)
             return true;
         if (exclusiveOwner != null)
             return false;
-        if (sub.getConsumerInfo().getPriority() != highestSubscriptionPriority)
+        if (lockOwner.getLockPriority() < highestSubscriptionPriority)
             return false;
-        if (sub.getConsumerInfo().isExclusive()) {
-            exclusiveOwner = sub;
+        if (lockOwner.isLockExclusive()) {
+            exclusiveOwner = lockOwner;
         }
         return true;
     }
@@ -444,13 +444,18 @@
                 try {
                     IndirectMessageReference r = (IndirectMessageReference) iter.next();
                     if (messageId.equals(r.getMessageId().toString())) {
-                        MessageAck ack = new MessageAck();
-                        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-                        ack.setDestination(destination);
-                        ack.setMessageID(r.getMessageId());
-                        acknowledge(c, null, ack, r);
-                        r.drop();
-                        dropEvent();
+                        
+                        // We should only delete messages that can be locked.
+                        if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) )  {
+                            MessageAck ack = new MessageAck();
+                            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+                            ack.setDestination(destination);
+                            ack.setMessageID(r.getMessageId());
+                            acknowledge(c, null, ack, r);
+                            r.drop();
+                            dropEvent();
+                            iter.remove();
+                        }
                     }
                 } catch (IOException e) {
                 }
@@ -488,13 +493,18 @@
             for (Iterator iter = messages.iterator(); iter.hasNext();) {
                 try {
                     IndirectMessageReference r = (IndirectMessageReference) iter.next();
-                    MessageAck ack = new MessageAck();
-                    ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
-                    ack.setDestination(destination);
-                    ack.setMessageID(r.getMessageId());
-                    acknowledge(c, null, ack, r);
-                    r.drop();
-                    dropEvent();
+                    
+                    // We should only delete messages that can be locked.
+                    if( r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER) )  {
+                        MessageAck ack = new MessageAck();
+                        ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+                        ack.setDestination(destination);
+                        ack.setMessageID(r.getMessageId());
+                        acknowledge(c, null, ack, r);
+                        r.drop();
+                        dropEvent();
+                        iter.remove();
+                    }
                 } catch (IOException e) {
                 }
             }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java?rev=382919&r1=382918&r2=382919&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java
Fri Mar  3 12:07:07 2006
@@ -28,7 +28,7 @@
 
 import java.io.IOException;
 
-public class QueueSubscription extends PrefetchSubscription {
+public class QueueSubscription extends PrefetchSubscription implements LockOwner {
     
     public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info)
throws InvalidSelectorException {
         super(broker,context, info);
@@ -128,6 +128,14 @@
             ", dispatched="+dispatched.size()+
             ", delivered="+this.delivered+
             ", matched="+this.matched.size();
+    }
+
+    public int getLockPriority() {
+        return info.getPriority();
+    }
+
+    public boolean isLockExclusive() {
+        return info.isExclusive();
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=382919&r1=382918&r2=382919&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
Fri Mar  3 12:07:07 2006
@@ -77,7 +77,7 @@
         this.destinationStatistics.setParent(parentStats);
     }
 
-    public boolean lock(MessageReference node, Subscription sub) {
+    public boolean lock(MessageReference node, LockOwner sub) {
         return true;
     }
 



Mime
View raw message