activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r737017 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Queue.java main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Date Fri, 23 Jan 2009 12:11:02 GMT
Author: dejanb
Date: Fri Jan 23 04:11:00 2009
New Revision: 737017

URL: http://svn.apache.org/viewvc?rev=737017&view=rev
Log:
fix for http://issues.apache.org/activemq/browse/AMQ-2016

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=737017&r1=737016&r2=737017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jan 23 04:11:00 2009
@@ -27,12 +27,16 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
+
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
+
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -96,6 +100,10 @@
     private boolean strictOrderDispatch=false;
     private QueueDispatchSelector  dispatchSelector;
     private boolean optimizedDispatch=false;
+    private boolean firstConsumer = false;
+    private int timeBeforeDispatchStarts = 0;
+    private int consumersBeforeDispatchStarts = 0;
+    private CountDownLatch consumersBeforeStartsLatch;
     private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
         public void run() {
             wakeup();
@@ -134,7 +142,7 @@
         }
         // If a VMPendingMessageCursor don't use the default Producer System Usage
         // since it turns into a shared blocking queue which can lead to a network deadlock.
 
-        // If we are ccursoring to disk..it's not and issue because it does not block due

+        // If we are cursoring to disk..it's not and issue because it does not block due

         // to large disk sizes.
         if( messages instanceof VMPendingMessageCursor ) {
             this.systemUsage = brokerService.getSystemUsage();
@@ -221,6 +229,18 @@
 
             // needs to be synchronized - so no contention with dispatching
             synchronized (consumers) {
+            	
+            	// set a flag if this is a first consumer
+            	if (consumers.size() == 0) {
+            		firstConsumer = true;
+            	} else {
+            		firstConsumer = false;
+            	}
+            	
+            	if (consumersBeforeStartsLatch != null) {
+            		consumersBeforeStartsLatch.countDown();
+            	}
+            	
                 addToConsumerList(sub);
                 if (sub.getConsumerInfo().isExclusive()) {
                     Subscription exclusiveConsumer = dispatchSelector.getExclusiveConsumer();
@@ -610,6 +630,22 @@
     public void setOptimizedDispatch(boolean optimizedDispatch) {
         this.optimizedDispatch = optimizedDispatch;
     }
+	public int getTimeBeforeDispatchStarts() {
+		return timeBeforeDispatchStarts;
+	}
+
+	public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
+		this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
+	}
+
+	public int getConsumersBeforeDispatchStarts() {
+		return consumersBeforeDispatchStarts;
+	}
+
+	public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
+		this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
+		consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts);
+	}
 
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -990,6 +1026,35 @@
 	            }
 	        }
 	        
+	        if (firstConsumer) {
+	        	firstConsumer = false;
+	        	try {
+	        		if (consumersBeforeDispatchStarts > 0) {
+	        			int timeout = 1000; // wait one second by default if consumer count isn't reached
 
+	        			if (timeBeforeDispatchStarts > 0) {
+	        				timeout = timeBeforeDispatchStarts;
+	        			}
+	        			if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+	        				if (LOG.isDebugEnabled()) {
+	        					LOG.debug(consumers.size() + " consumers subscribed. Starting dispatch.");
+	        				}
+	        			} else {
+	        				if (LOG.isDebugEnabled()) {
+	        					LOG.debug(timeout + " ms elapsed and " +  consumers.size() + " consumers subscribed.
Starting dispatch.");
+	        				}
+	        			}
+	        		}	        		
+	        		if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <=
0) {
+	        			iteratingMutex.wait(timeBeforeDispatchStarts);
+	        			if (LOG.isDebugEnabled()) {
+	        				LOG.debug(timeBeforeDispatchStarts + " ms elapsed. Starting dispatch.");
+	        			}
+	        		}
+	        	} catch (Exception e) {
+	        		LOG.error(e);
+	        	}
+	        }
+	        
 	        boolean pageInMoreMessages = false;
 	        synchronized (messages) {
 	            pageInMoreMessages = !messages.isEmpty();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java?rev=737017&r1=737016&r2=737017&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java
Fri Jan 23 04:11:00 2009
@@ -65,6 +65,8 @@
     private boolean useConsumerPriority=true;
     private boolean strictOrderDispatch=false;
     private boolean lazyDispatch=false;
+    private int timeBeforeDispatchStarts = 0;
+    private int consumersBeforeDispatchStarts = 0;
     private boolean advisoryForSlowConsumers;
     private boolean advisdoryForFastProducers;
     private boolean advisoryForDiscardingMessages;
@@ -93,7 +95,8 @@
         queue.setStrictOrderDispatch(isStrictOrderDispatch());
         queue.setOptimizedDispatch(isOptimizedDispatch());
         queue.setLazyDispatch(isLazyDispatch());
-        
+        queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
+        queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
     }
 
     public void configure(Topic topic) {
@@ -439,6 +442,22 @@
         this.lazyDispatch = lazyDispatch;
     }
 
+    public int getTimeBeforeDispatchStarts() {
+        return timeBeforeDispatchStarts;
+    }
+
+    public void setTimeBeforeDispatchStarts(int timeBeforeDispatchStarts) {
+        this.timeBeforeDispatchStarts = timeBeforeDispatchStarts;
+    }
+
+    public int getConsumersBeforeDispatchStarts() {
+        return consumersBeforeDispatchStarts;
+    }
+
+    public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
+        this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
+    }
+
     /**
      * @return the advisoryForSlowConsumers
      */

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java?rev=737017&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java
Fri Jan 23 04:11:00 2009
@@ -0,0 +1,229 @@
+package org.apache.activemq.usecases;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import junit.framework.Test;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JMSConsumerTest;
+import org.apache.activemq.JmsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+
+public class MessageGroupDelayedTest extends JmsTestSupport {
+  public static final Log log = LogFactory.getLog(MessageGroupDelayedTest.class);
+  protected Connection connection;
+  protected Session session;
+  protected MessageProducer producer;
+  protected Destination destination;
+  
+  public int consumersBeforeDispatchStarts;
+  public int timeBeforeDispatchStarts;
+  
+  BrokerService broker;
+  protected TransportConnector connector;
+  
+  protected HashMap<String, Integer> messageCount = new HashMap<String, Integer>();
+  protected HashMap<String, Set<String>> messageGroups = new HashMap<String,
Set<String>>();
+  
+  public static Test suite() {
+      return suite(MessageGroupDelayedTest.class);
+  }
+
+  public static void main(String[] args) {
+      junit.textui.TestRunner.run(suite());
+  }
+
+  public void setUp() throws Exception {
+	broker = createBroker();  
+	broker.start();
+    ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(connector.getConnectUri());
+	//ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
 
+    connection = connFactory.createConnection();
+    session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    destination = new ActiveMQQueue("test-queue2");
+    producer = session.createProducer(destination);
+    connection.start();
+  }
+  
+  protected BrokerService createBroker() throws Exception {
+      BrokerService service = new BrokerService();
+      service.setPersistent(false);
+      service.setUseJmx(false);
+
+      // Setup a destination policy where it takes only 1 message at a time.
+      PolicyMap policyMap = new PolicyMap();
+      PolicyEntry policy = new PolicyEntry();
+      policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
+      policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
+      policyMap.setDefaultEntry(policy);
+      service.setDestinationPolicy(policyMap);
+
+      connector = service.addConnector("tcp://localhost:0");
+      return service;
+  }
+  
+  public void tearDown() throws Exception {
+      producer.close();
+      session.close();
+      connection.close();
+  }
+  
+  
+  
+  public void initCombosForTestDelayedDirectConnectionListener() {
+	  addCombinationValues("consumersBeforeDispatchStarts", new Object[] {0, 3, 5});
+	  addCombinationValues("timeBeforeDispatchStarts", new Object[] {0, 100});
+  }
+  
+  public void testDelayedDirectConnectionListener() throws Exception {
+	  
+	  for(int i = 0; i < 10; i++) {
+      Message msga = session.createTextMessage("hello a");
+      msga.setStringProperty("JMSXGroupID", "A");
+      producer.send(msga);
+      Message msgb = session.createTextMessage("hello b");
+      msgb.setStringProperty("JMSXGroupID", "B");
+      producer.send(msgb);
+      Message msgc = session.createTextMessage("hello c");
+      msgc.setStringProperty("JMSXGroupID", "C");
+      producer.send(msgc);
+    }
+    log.info("30 messages sent to group A/B/C");
+   
+    int[] counters = {10, 10, 10};
+    
+    CountDownLatch startSignal = new CountDownLatch(1);
+    CountDownLatch doneSignal = new CountDownLatch(1);
+
+    messageCount.put("worker1", 0);
+    messageGroups.put("worker1", new HashSet<String>());
+    Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal,
counters, messageCount, messageGroups);
+    messageCount.put("worker2", 0);
+    messageGroups.put("worker2", new HashSet<String>());
+    Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal,
counters, messageCount, messageGroups);
+    messageCount.put("worker3", 0);
+    messageGroups.put("worker3", new HashSet<String>());
+    Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal,
counters, messageCount, messageGroups);
+
+
+    new Thread(worker1).start();
+    new Thread(worker2).start();
+    new Thread(worker3).start();
+
+    startSignal.countDown();
+    doneSignal.await();
+    
+    // check results
+    if (consumersBeforeDispatchStarts == 0 && timeBeforeDispatchStarts == 0) {
+    	log.info("Ignoring results because both parameters are 0");
+    	return;
+    }
+    
+    for (String worker: messageCount.keySet()) {
+    	log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from
groups " + messageGroups.get(worker));
+    	assertEquals(10, messageCount.get(worker).intValue());
+    	assertEquals(1, messageGroups.get(worker).size());
+    }
+    
+  }
+
+  private static final class Worker implements Runnable {
+    private Connection connection = null;
+    private Destination queueName = null;
+    private String workerName = null;
+    private CountDownLatch startSignal = null;
+    private CountDownLatch doneSignal = null;
+    private int[] counters = null;
+    private HashMap<String, Integer> messageCount;
+    private HashMap<String, Set<String>>messageGroups;
+    
+    
+    private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch
startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount,
HashMap<String, Set<String>>messageGroups) {
+      this.connection = connection;
+      this.queueName = queueName;
+      this.workerName = workerName;
+      this.startSignal = startSignal;
+      this.doneSignal = doneSignal;
+      this.counters = counters;
+      this.messageCount = messageCount;
+      this.messageGroups = messageGroups;
+    }
+    
+    private void update(String group) {
+        int msgCount = messageCount.get(workerName);
+        messageCount.put(workerName, msgCount + 1);
+        Set<String> groups = messageGroups.get(workerName);
+        groups.add(group);
+        messageGroups.put(workerName, groups);
+    }
+    
+    public void run() {
+
+      try {
+        log.info(workerName);
+        startSignal.await();
+        Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        MessageConsumer consumer = sess.createConsumer(queueName);
+
+        while(true) {
+          if(counters[0] == 0 && counters[1] == 0 && counters[2] == 0 ) {
+            doneSignal.countDown();
+            log.info(workerName + " done...");
+            break;
+          }
+          
+          Message msg = consumer.receive(500);
+          if(msg == null)
+            continue;
+
+          String group = msg.getStringProperty("JMSXGroupID");
+          boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+
+          if("A".equals(group)){
+        	--counters[0];
+            update(group);
+            Thread.sleep(500);
+          }
+          else if("B".equals(group)) {
+        	--counters[1];
+            update(group);
+            Thread.sleep(100);
+          }
+          else if("C".equals(group)) {
+        	--counters[2];
+            update(group);
+            Thread.sleep(10);
+          }
+          else {
+            log.warn("unknown group");
+          }
+          if (counters[0] != 0 || counters[1] != 0 || counters[2] != 0 ) {
+        	  msg.acknowledge();
+          }
+        }
+        consumer.close();
+        sess.close();
+      } catch (Exception e) {
+        e.printStackTrace();
+      }
+    }
+  }
+}



Mime
View raw message