activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r518437 - in /activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/ src/main/java/org/apache/activemq/broker/ src/main/java/org/apache/activemq/broker/region/ src/main/java/org/apache/activemq/broker/region/cursors/ src/test/j...
Date Thu, 15 Mar 2007 01:45:50 GMT
Author: chirino
Date: Wed Mar 14 18:45:49 2007
New Revision: 518437

URL: http://svn.apache.org/viewvc?view=rev&rev=518437
Log:
- Propagate the AlwaysSyncSend setting from the ConnectionFactory to the Connection
- Got rid of the UseSyncSend property since AlwaysSyncSend was already there and did the same
thing.
- Updated VMPendingMessageCursor so that it updates the reference counters of the message
so that the usage managers are properly updated since the messages are being kept in memory.
- Updated the region Queue so that it decrements the usage in the case of a transaction.
- Enabled the ProducerFlowControlTest since it is now working.

Modified:
    activemq/trunk/activemq-core/pom.xml
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Mar 14 18:45:49 2007
@@ -267,7 +267,6 @@
             <exclude>**/nio/**</exclude>
 
             <exclude>**/AMQDeadlockTest3.*</exclude>
-            <exclude>**/ProducerFlowControlTest.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Wed Mar 14 18:45:49 2007
@@ -90,7 +90,6 @@
     private boolean nestedMapAndListEnabled = true;
     JMSStatsImpl factoryStats = new JMSStatsImpl();
     private boolean alwaysSyncSend;
-    private boolean useSyncSend=false;
     private boolean watchTopicAdvisories=true;
     private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE;
 
@@ -259,6 +258,7 @@
             connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
             connection.setDispatchAsync(isDispatchAsync());
             connection.setUseAsyncSend(isUseAsyncSend());
+            connection.setAlwaysSyncSend(isAlwaysSyncSend());
             connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
             connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
             connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
@@ -437,10 +437,6 @@
         this.useAsyncSend = useAsyncSend;
     }
     
-	public void setUseSyncSend(boolean forceSyncSend) {
-		this.useSyncSend = forceSyncSend;
-	}
-
 	public synchronized boolean isWatchTopicAdvisories() {
 		return watchTopicAdvisories;
 	}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Mar 14 18:45:49 2007
@@ -1151,6 +1151,8 @@
                 }
                 producerExchanges.put(id,result);
             }
+        } else {
+        	context = result.getConnectionContext();
         }
         return result;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar 14 18:45:49 2007
@@ -431,38 +431,41 @@
         if(store!=null&&message.isPersistent()){
             store.addMessage(context,message);
         }
-        message.incrementReferenceCount();
+        
         if(context.isInTransaction()){
+        	// If this is a transacted message.. increase the usage now so that a big TX does
not blow up
+        	// our memory.  This increment is decremented once the tx finishes..
+            message.incrementReferenceCount();
             context.getTransaction().addSynchronization(new Synchronization(){
-
                 public void afterCommit() throws Exception{
-                    //even though the message could be expired - it won't be from the store
-                    //and it's important to keep the store/cursor in step
-                    synchronized(messages){
-                        messages.addMessageLast(message);
-                    }
-                    // It could take while before we receive the commit
-                    // op, by that time the message could have expired..
-                    if(message.isExpired()){
-                        // TODO: remove message from store.
-                        if (log.isDebugEnabled()) {
-                            log.debug("Expired message: " + message);
-                        }
-                        if( producerExchange.getProducerState().getInfo().getWindowSize()
> 0 || !message.isResponseRequired() ) {
-                    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-            				context.getConnection().dispatchAsync(ack);	    	            	        		
-                        }
-                        return;
-                    }
-                    sendMessage(context,message);
+                	try { 
+                        // It could take while before we receive the commit
+                        // op, by that time the message could have expired..
+	                    if(message.isExpired()){
+	                        // TODO: remove message from store.
+	                        if (log.isDebugEnabled()) {
+	                            log.debug("Expired message: " + message);
+	                        }
+	                        if( producerExchange.getProducerState().getInfo().getWindowSize()
> 0 || !message.isResponseRequired() ) {
+	                    		ProducerAck ack = new ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+	            				context.getConnection().dispatchAsync(ack);	    	            	        	

+	                        }
+	                        return;
+	                    }
+	                    sendMessage(context,message);
+                	} finally {
+                        message.decrementReferenceCount();
+                	}
+                }
+                
+                @Override
+                public void afterRollback() throws Exception {
+                    message.decrementReferenceCount();
                 }
             });
         }else{
-            synchronized(messages){
-                messages.addMessageLast(message);
-            }
-            sendMessage(context,message);
-            
+        	// Add to the pending list, this takes care of incrementing the usage manager.
+            sendMessage(context,message);            
         }
 	}    
 
@@ -982,8 +985,9 @@
     
       
     private void sendMessage(final ConnectionContext context,Message msg) throws Exception{
-        
-        
+        synchronized(messages){
+            messages.addMessageLast(msg);
+        }
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         pageInMessages(false);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Wed Mar 14 18:45:49 2007
@@ -21,8 +21,10 @@
  * @version $Revision$
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
-    private LinkedList list = new LinkedList();
-    private Iterator iter = null;
+    private LinkedList<MessageReference> list = new LinkedList<MessageReference>();
+    private Iterator<MessageReference> iter = null;
+    private MessageReference last;
+    
     /**
      * @return true if there are no pending messages
      */
@@ -36,6 +38,7 @@
      */
     public void reset(){
         iter = list.listIterator();
+        last=null;
     }
     
     /**
@@ -44,6 +47,7 @@
      * @param node
      */
     public void addMessageLast(MessageReference node){
+    	node.incrementReferenceCount();
         list.addLast(node);
     }
     
@@ -53,6 +57,7 @@
      * @param node
      */
     public void addMessageFirst(MessageReference node){
+    	node.incrementReferenceCount();
         list.addFirst(node);
     }
 
@@ -68,7 +73,8 @@
      * @return the next pending message
      */
     public MessageReference next(){
-        return (MessageReference) iter.next();
+    	last = (MessageReference) iter.next();
+    	return last;
     }
 
     /**
@@ -76,6 +82,9 @@
      * 
      */
     public void remove(){
+    	if( last!=null ) {
+    		last.decrementReferenceCount();
+    	}
         iter.remove();
     }
 
@@ -95,9 +104,10 @@
     }
     
     public void remove(MessageReference node){
-        for(Iterator i=list.iterator();i.hasNext();){
-            MessageReference ref=(MessageReference)i.next();
+        for(Iterator<MessageReference> i=list.iterator();i.hasNext();){
+            MessageReference ref=i.next();
             if(node.getMessageId().equals(ref.getMessageId())){
+            	ref.decrementReferenceCount();
                 i.remove();
                 break;
             }
@@ -109,7 +119,7 @@
      * @param maxItems
      * @return a list of paged in messages
      */
-    public LinkedList pageInList(int maxItems) {
+    public LinkedList<MessageReference> pageInList(int maxItems) {
         return list;
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java Wed
Mar 14 18:45:49 2007
@@ -285,7 +285,7 @@
 		acf.setUseCompression(false);
 		acf.setOptimizeAcknowledge(false);
 		acf.setOptimizedMessageDispatch(true);
-		acf.setUseSyncSend(true);
+		acf.setAlwaysSyncSend(true);
 		return acf;
 	}
 

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Wed Mar 14 18:45:49 2007
@@ -17,6 +17,7 @@
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransport;
@@ -31,21 +32,19 @@
 
     public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
-        factory.setUseSyncSend(true);
+        factory.setAlwaysSyncSend(true);
         connection = (ActiveMQConnection) factory.createConnection();
         connections.add(connection);
     	connection.start();
 
-    	// Test sending to Queue A
-    	// 1st send should not block.
-    	fillQueue(queueA);
-    	
     	Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
     	MessageConsumer consumer = session.createConsumer(queueB);
 
-    	// Test sending to Queue B it should block. 
-    	// Since even though  the it's queue limits have not been reached, the connection
-    	// is blocked.
+    	// Test sending to Queue A
+    	// 1st send should not block.  But the rest will.
+    	fillQueue(queueA);
+
+    	// Test sending to Queue B it should not block. 
     	CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
     	assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
     	
@@ -61,6 +60,32 @@
     	msg.acknowledge();
     }
 
+    public void testSimpleSendReceive() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+    	connection.start();
+
+    	Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+    	MessageConsumer consumer = session.createConsumer(queueA);
+
+    	// Test sending to Queue B it should not block. 
+    	CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+    	assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
+    	
+    	TextMessage msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 1", msg.getText());
+    	msg.acknowledge();
+    	
+    	pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+    	assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
+    	
+    	msg = (TextMessage) consumer.receive();
+    	assertEquals("Message 2", msg.getText());
+    	msg.acknowledge();
+    }
+
     public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception {
         ConnectionFactory factory = createConnectionFactory();
         connection = (ActiveMQConnection) factory.createConnection();
@@ -143,6 +168,7 @@
         PolicyEntry policy = new PolicyEntry();
         policy.setMemoryLimit(1);
         policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
         policyMap.setDefaultEntry(policy);        
         service.setDestinationPolicy(policyMap);
         



Mime
View raw message