activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r558767 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java test/java/org/apache/activemq/ExclusiveConsumerTest.java
Date Mon, 23 Jul 2007 15:09:29 GMT
Author: chirino
Date: Mon Jul 23 08:09:28 2007
New Revision: 558767

URL: http://svn.apache.org/viewvc?view=rev&rev=558767
Log:
AMQ-1335 - Exclusive consumers are now selected up front when the consumer gets registered.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=558767&r1=558766&r2=558767
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Mon Jul 23 08:09:28 2007
@@ -87,7 +87,6 @@
     private int garbageSizeBeforeCollection = 1000;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private final MessageStore store;
-    private int highestSubscriptionPriority = Integer.MIN_VALUE;
     private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
     private int maximumPagedInMessages = garbageSizeBeforeCollection * 2;
@@ -179,20 +178,12 @@
      */
     public boolean lock(MessageReference node,LockOwner lockOwner){
         synchronized(exclusiveLockMutex){
-            if(exclusiveOwner==lockOwner){
-                return true;
-            }
-            if(exclusiveOwner!=null){
-                return false;
-            }
-            if(lockOwner.getLockPriority()<highestSubscriptionPriority){
-                return false;
-            }
-            if(lockOwner.isLockExclusive()){
-                exclusiveOwner=lockOwner;
-            }
+        	if (exclusiveOwner == lockOwner)
+				return true;
+			if (exclusiveOwner != null)
+				return false;
         }
-        return true;
+     	return true;
     }
 
     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception
{
@@ -204,25 +195,28 @@
         
         MessageEvaluationContext msgContext=context.getMessageEvaluationContext();
         try{
-            synchronized(consumers){
-                if (sub.getConsumerInfo().isExclusive()) {
-                    // Add to front of list to ensure that an exclusive consumer gets all
messages
-                    // before non-exclusive consumers
-                    consumers.add(0, sub);
-                } else {
-                    consumers.add(sub);
-                }
-            }
+            synchronized (consumers) {
+				consumers.add(sub);
+				if (sub.getConsumerInfo().isExclusive()) {
+					LockOwner owner = (LockOwner) sub;
+					if (exclusiveOwner == null) {
+						exclusiveOwner = owner;
+					} else {
+						// switch the owner if the priority is higher.
+						if (owner.getLockPriority() > exclusiveOwner.getLockPriority()) {
+							exclusiveOwner = owner;
+						}
+					}
+				}
+			}
             // page in messages
             doPageIn();
             // synchronize with dispatch method so that no new messages are sent
             // while
-            // setting up a subscription. avoid out of order messages, duplicates
+            // setting up a subscription. avoid out of order messages,
+			// duplicates
             // etc.
             dispatchValve.turnOff();
-            if (sub.getConsumerInfo().getPriority() > highestSubscriptionPriority) {
-                highestSubscriptionPriority = sub.getConsumerInfo().getPriority();
-            }
             msgContext.setDestination(destination);
             synchronized(pagedInMessages){
                 // Add all the matching messages in the queue to the
@@ -261,14 +255,29 @@
         try {
 
             synchronized (consumers) {
-                consumers.remove(sub);
-                if (consumers.isEmpty()) {
-                    messages.gc();
-                }
-            }
-            sub.remove(context, this);
+				consumers.remove(sub);
+				if (sub.getConsumerInfo().isExclusive()) {
+					LockOwner owner = (LockOwner) sub;
+					// Did we loose the exclusive owner??
+					if (exclusiveOwner == owner) {
+
+						// Find the exclusive consumer with the higest Lock
+						// Priority.
+						exclusiveOwner = null;
+						for (Iterator iter = consumers.iterator(); iter.hasNext();) {
+							Subscription s = (Subscription) iter.next();
+							LockOwner so = (LockOwner) s;
+							if (s.getConsumerInfo().isExclusive() && (exclusiveOwner == null || so.getLockPriority()
> exclusiveOwner.getLockPriority()))
+								exclusiveOwner = so;
+						}
+					}
+				}
+				if (consumers.isEmpty()) {
+					messages.gc();
+				}
 
-            highestSubscriptionPriority = calcHighestSubscriptionPriority();
+			}
+            sub.remove(context, this);
 
             boolean wasExclusiveOwner = false;
             if (exclusiveOwner == sub) {
@@ -627,20 +636,6 @@
         MessageReference result =  new IndirectMessageReference(this, store, message);
         result.decrementReferenceCount();
         return result;
-    }
-
-    
-    private int calcHighestSubscriptionPriority() {
-        int rc = Integer.MIN_VALUE;
-        synchronized (consumers) {
-            for (Iterator iter = consumers.iterator(); iter.hasNext();) {
-                Subscription sub = (Subscription) iter.next();
-                if (sub.getConsumerInfo().getPriority() > rc) {
-                    rc = sub.getConsumerInfo().getPriority();
-                }
-            }
-        }
-        return rc;
     }
 
     public MessageStore getMessageStore() {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java?view=auto&rev=558767
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java
Mon Jul 23 08:09:28 2007
@@ -0,0 +1,348 @@
+/**
+*
+* Copyright 2005-2006 The Apache Software Foundation
+*
+*  Licensed under the Apache License, Version 2.0 (the "License");
+*  you may not use this file except in compliance with the License.
+*  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+*  Unless required by applicable law or agreed to in writing, software
+*  distributed under the License is distributed on an "AS IS" BASIS,
+*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+*  See the License for the specific language governing permissions and
+*  limitations under the License.
+*/
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class ExclusiveConsumerTest extends TestCase {
+
+	private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true";
+	
+	public ExclusiveConsumerTest(String name) {
+		super(name);
+	}
+
+	protected void setUp() throws Exception {
+		super.setUp();
+	}
+
+	protected void tearDown() throws Exception {
+		super.tearDown();
+	}
+
+	private Connection createConnection(final boolean start) throws JMSException {
+		ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL);
+		Connection conn = cf.createConnection();
+		if (start) {
+			conn.start();
+		}
+		return conn;
+	}
+	
+	public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException
{
+		Connection conn = createConnection(true);
+		
+		Session exclusiveSession = null;
+		Session fallbackSession = null;
+		Session senderSession = null;
+
+		try {
+			
+			exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true");
+			MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+			ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1");
+			MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+	
+	
+			ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1");
+
+			MessageProducer producer = senderSession.createProducer(senderQueue);
+		
+			Message msg = senderSession.createTextMessage("test");
+			producer.send(msg);
+			//TODO need two send a 2nd message - bug AMQ-1024
+			//producer.send(msg);
+			Thread.sleep(100);
+			
+			//Verify exclusive consumer receives the message.
+			Assert.assertNotNull(exclusiveConsumer.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+			
+		} finally {
+			fallbackSession.close();
+			senderSession.close();
+			conn.close();
+		}
+		
+	}
+	
+	public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException
{
+		Connection conn = createConnection(true);
+		
+		Session exclusiveSession = null;
+		Session fallbackSession = null;
+		Session senderSession = null;
+
+		try {
+			
+			exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5");
+			MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+	
+			ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true");
+			MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+	
+			ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5");
+
+			MessageProducer producer = senderSession.createProducer(senderQueue);
+		
+			Message msg = senderSession.createTextMessage("test");
+			producer.send(msg);
+			Thread.sleep(100);
+			
+			//Verify exclusive consumer receives the message.
+			Assert.assertNotNull(exclusiveConsumer.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+			
+		} finally {
+			fallbackSession.close();
+			senderSession.close();
+			conn.close();
+		}
+		
+	}
+	
+	public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, InterruptedException
{
+		Connection conn = createConnection(true);
+		
+		Session exclusiveSession1 = null;
+		Session exclusiveSession2 = null;
+		Session fallbackSession = null;
+		Session senderSession = null;
+
+		try {
+			
+			exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			// This creates the exclusive consumer first which avoids AMQ-1024 bug.
+			ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true");
+			MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
+			MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
+
+			ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2");
+			MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+	
+			ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2");
+
+			MessageProducer producer = senderSession.createProducer(senderQueue);
+		
+			Message msg = senderSession.createTextMessage("test");
+			producer.send(msg);
+			Thread.sleep(100);
+			
+			//Verify exclusive consumer receives the message.
+			Assert.assertNotNull(exclusiveConsumer1.receive(100));
+			Assert.assertNull(exclusiveConsumer2.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+			
+			// Close the exclusive consumer to verify the non-exclusive consumer takes over
+			exclusiveConsumer1.close();
+	
+			producer.send(msg);
+			producer.send(msg);
+			
+			Assert.assertNotNull(exclusiveConsumer2.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+
+
+		} finally {
+			fallbackSession.close();
+			senderSession.close();
+			conn.close();
+		}
+		
+	}
+	
+	public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException, InterruptedException
{
+		Connection conn = createConnection(true);
+		
+		Session exclusiveSession1 = null;
+		Session exclusiveSession2 = null;
+		Session fallbackSession = null;
+		Session senderSession = null;
+
+		try {
+			
+			exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			// This creates the exclusive consumer first which avoids AMQ-1024 bug.
+			ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true");
+			MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue);
+
+			ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6");
+			MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+	
+			MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue);
+
+			ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6");
+
+			MessageProducer producer = senderSession.createProducer(senderQueue);
+		
+			Message msg = senderSession.createTextMessage("test");
+			producer.send(msg);
+			Thread.sleep(100);
+			
+			//Verify exclusive consumer receives the message.
+			Assert.assertNotNull(exclusiveConsumer1.receive(100));
+			Assert.assertNull(exclusiveConsumer2.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+			
+			// Close the exclusive consumer to verify the non-exclusive consumer takes over
+			exclusiveConsumer1.close();
+	
+			producer.send(msg);
+			producer.send(msg);
+			
+			Assert.assertNotNull(exclusiveConsumer2.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+
+
+		} finally {
+			fallbackSession.close();
+			senderSession.close();
+			conn.close();
+		}
+		
+	}
+	public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException
{
+		Connection conn = createConnection(true);
+		
+		Session exclusiveSession = null;
+		Session fallbackSession = null;
+		Session senderSession = null;
+
+		try {
+			
+			exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			// This creates the exclusive consumer first which avoids AMQ-1024 bug.
+			ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true");
+			MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+			ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3");
+			MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+	
+			ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3");
+
+			MessageProducer producer = senderSession.createProducer(senderQueue);
+		
+			Message msg = senderSession.createTextMessage("test");
+			producer.send(msg);
+			Thread.sleep(100);
+			
+			//Verify exclusive consumer receives the message.
+			Assert.assertNotNull(exclusiveConsumer.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+			
+			// Close the exclusive consumer to verify the non-exclusive consumer takes over
+			exclusiveConsumer.close();
+	
+			producer.send(msg);
+	
+			Assert.assertNotNull(fallbackConsumer.receive(100));
+
+		} finally {
+			fallbackSession.close();
+			senderSession.close();
+			conn.close();
+		}
+		
+	}
+	
+	public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException
{
+		Connection conn = createConnection(true);
+		
+		Session exclusiveSession = null;
+		Session fallbackSession = null;
+		Session senderSession = null;
+
+		try {
+			
+			exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+			
+			// This creates the exclusive consumer first which avoids AMQ-1024 bug.
+			ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true");
+			MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+			ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4");
+			MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue);
+		
+			ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4");
+
+			MessageProducer producer = senderSession.createProducer(senderQueue);
+		
+			Message msg = senderSession.createTextMessage("test");
+			producer.send(msg);
+			Thread.sleep(100);
+			
+			//Verify exclusive consumer receives the message.
+			Assert.assertNotNull(exclusiveConsumer.receive(100));
+			Assert.assertNull(fallbackConsumer.receive(100));
+			
+			// Close the exclusive consumer to verify the non-exclusive consumer takes over
+			exclusiveConsumer.close();
+	
+			producer.send(msg);
+
+			// Verify other non-exclusive consumer receices the message.
+			Assert.assertNotNull(fallbackConsumer.receive(100));
+
+			// Create exclusive consumer to determine if it will start receiving the messages.
+			exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue);
+
+			producer.send(msg);
+			Assert.assertNotNull(exclusiveConsumer.receive(100));	
+			Assert.assertNull(fallbackConsumer.receive(100));
+
+		} finally {
+			fallbackSession.close();
+			senderSession.close();
+			conn.close();
+		}
+		
+	}
+}



Mime
View raw message