activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r957899 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/ test/java/org/apache/activemq/broker/region/cursors/ test/java/org/apache/activemq/store/jdbc/
Date Fri, 25 Jun 2010 11:49:02 GMT
Author: gtully
Date: Fri Jun 25 11:49:01 2010
New Revision: 957899

URL: http://svn.apache.org/viewvc?rev=957899&view=rev
Log:
fix intermittent failure in JDBCNegativeQueueTest, sync store add and cursor add such that
cursor is always in order w.r.t the store. Hense, when the cursor cache is exhausted, resuming
from the store is just fine. Without the sync, moving from cache to store can result in duplicate
messages, out of order messages and on occasion missing messages. disabling the cache resolved
all of the above but kills performance. Syncing the sore add and cursor add is the correct
solution, but it has an impact on concurrent transaction completion for a destination, paralell
completion across destinations can still continue, so there is still some batching potential

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=957899&r1=957898&r2=957899&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
Fri Jun 25 11:49:01 2010
@@ -555,8 +555,13 @@ public abstract class BaseDestination im
     }
     
     protected final void waitForSpace(ConnectionContext context, Usage<?> usage, int
highWaterMark, String warning) throws IOException, InterruptedException, ResourceAllocationException
{
+        if (systemUsage.isSendFailIfNoSpace()) {
+            getLog().debug("sendFailIfNoSpace, forcing exception on send: " + warning);
+            throw new ResourceAllocationException(warning);
+        }
         if (systemUsage.getSendFailIfNoSpaceAfterTimeout() != 0) {
             if (!usage.waitForSpace(systemUsage.getSendFailIfNoSpaceAfterTimeout(), highWaterMark))
{
+                getLog().debug("sendFailIfNoSpaceAfterTimeout expired, forcing exception
on send: " + warning);
                 throw new ResourceAllocationException(warning);
             }
         } else {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=957899&r1=957898&r2=957899&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Fri Jun 25 11:49:01 2010
@@ -38,6 +38,8 @@ import java.util.concurrent.ExecutorServ
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
@@ -100,7 +102,7 @@ public class Queue extends BaseDestinati
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new MessageGroupHashBucketFactory();
-    private final Object sendLock = new Object();
+    private final Lock sendLock = new ReentrantLock();
     private ExecutorService executor;
     protected final Map<MessageId, Runnable> messagesWaitingForSpace = Collections
             .synchronizedMap(new LinkedHashMap<MessageId, Runnable>());
@@ -446,7 +448,7 @@ public class Queue extends BaseDestinati
                     redeliveredWaitingDispatch.add(qmr);
                 }
                 if (!redeliveredWaitingDispatch.isEmpty()) {
-                    doDispatch(new ArrayList());
+                    doDispatch(new ArrayList<QueueMessageReference>());
                 }
             }
             if (!(this.optimizedDispatch || isSlave())) {
@@ -596,57 +598,57 @@ public class Queue extends BaseDestinati
             Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         Future<Object> result = null;
-        synchronized (sendLock) {
-            if (store != null && message.isPersistent()) {
-                if (systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark())) {
-
-                    String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark()
+ "% of "
-                            + systemUsage.getStoreUsage().getLimit() + ". Stopping producer
("
-                            + message.getProducerId() + ") to prevent flooding "
-                            + getActiveMQDestination().getQualifiedName() + "."
-                            + " See http://activemq.apache.org/producer-flow-control.html
for more info";
-
-                    if (systemUsage.isSendFailIfNoSpace()) {
-                        throw new ResourceAllocationException(logMessage);
-                    }
-
-                    waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(),
logMessage);
-                }
+        
+        checkUsage(context, message);
+        sendLock.lockInterruptibly();
+        try {
+            if (store != null && message.isPersistent()) {        
                 message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
                 result = store.asyncAddQueueMessage(context, message);
             }
-        }
-        if (context.isInTransaction()) {
-            // If this is a transacted message.. increase the usage now so that
-            // a big TX does not blow up
-            // our memory. This increment is decremented once the tx finishes..
-            message.incrementReferenceCount();
-            context.getTransaction().addSynchronization(new Synchronization() {
-                @Override
-                public void afterCommit() throws Exception {
-                    try {
-                        // It could take while before we receive the commit
-                        // op, by that time the message could have expired..
-                        if (broker.isExpired(message)) {
-                            broker.messageExpired(context, message);
-                            destinationStatistics.getExpired().increment();
-                            return;
+            if (context.isInTransaction()) {
+                // If this is a transacted message.. increase the usage now so that
+                // a big TX does not blow up
+                // our memory. This increment is decremented once the tx finishes..
+                message.incrementReferenceCount();
+            
+                context.getTransaction().addSynchronization(new Synchronization() {
+                    @Override
+                    public void beforeCommit() throws Exception {
+                        sendLock.lockInterruptibly();
+                    }
+                    @Override
+                    public void afterCommit() throws Exception {
+                        try {
+                            // It could take while before we receive the commit
+                            // op, by that time the message could have expired..
+                            if (broker.isExpired(message)) {
+                                broker.messageExpired(context, message);
+                                destinationStatistics.getExpired().increment();
+                                return;
+                            }
+                            sendMessage(message);
+                        } finally {
+                            sendLock.unlock();
+                            message.decrementReferenceCount();
                         }
-                        sendMessage(context, message);
-                    } finally {
+                        messageSent(context, message);
+                    }
+                    @Override
+                    public void afterRollback() throws Exception {
                         message.decrementReferenceCount();
                     }
-                }
-
-                @Override
-                public void afterRollback() throws Exception {
-                    message.decrementReferenceCount();
-                }
-            });
-        } else {
-            // Add to the pending list, this takes care of incrementing the
-            // usage manager.
-            sendMessage(context, message);
+                });
+            } else {
+                // Add to the pending list, this takes care of incrementing the
+                // usage manager.
+                sendMessage(message);
+            }
+        } finally {
+            sendLock.unlock();
+        }
+        if (!context.isInTransaction()) {
+            messageSent(context, message);
         }
         if (result != null && !result.isCancelled()) {
             try {
@@ -658,6 +660,26 @@ public class Queue extends BaseDestinati
         }
     }
 
+    private void checkUsage(ConnectionContext context, Message message) throws ResourceAllocationException,
IOException, InterruptedException {
+        if (message.isPersistent()) {
+            if (store != null && systemUsage.getStoreUsage().isFull(getStoreUsageHighWaterMark()))
{
+                final String logMessage = "Usage Manager Store is Full, " + getStoreUsageHighWaterMark()
+ "% of "
+                    + systemUsage.getStoreUsage().getLimit() + ". Stopping producer ("
+                    + message.getProducerId() + ") to prevent flooding "
+                    + getActiveMQDestination().getQualifiedName() + "."
+                    + " See http://activemq.apache.org/producer-flow-control.html for more
info";
+
+                waitForSpace(context, systemUsage.getStoreUsage(), getStoreUsageHighWaterMark(),
logMessage);
+            }
+        } else if (messages.getSystemUsage() != null && systemUsage.getTempUsage().isFull())
{
+            final String logMessage = "Usage Manager Temp Store is Full. Stopping producer
(" + message.getProducerId()
+                + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "."
+                + " See http://activemq.apache.org/producer-flow-control.html for more info";
+            
+            waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
+        }
+    }
+
     private void expireMessages() {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Expiring messages ..");
@@ -1458,23 +1480,13 @@ public class Queue extends BaseDestinati
         return answer;
     }
 
-    final void sendMessage(final ConnectionContext context, Message msg) throws Exception
{
-        if (!msg.isPersistent() && messages.getSystemUsage() != null) {
-            if (systemUsage.getTempUsage().isFull()) {
-                final String logMessage = "Usage Manager Temp Store is Full. Stopping producer
(" + msg.getProducerId()
-                        + ") to prevent flooding " + getActiveMQDestination().getQualifiedName()
+ "."
-                        + " See http://activemq.apache.org/producer-flow-control.html for
more info";
-                if (systemUsage.isSendFailIfNoSpace()) {
-                    throw new ResourceAllocationException(logMessage);
-                }
-
-                waitForSpace(context, messages.getSystemUsage().getTempUsage(), logMessage);
-            }
-
-        }
+    final void sendMessage(final Message msg) throws Exception {
         synchronized (messages) {
             messages.addMessageLast(msg);
         }
+    }
+    
+    final void messageSent(final ConnectionContext context, final Message msg) throws Exception
{     
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         messageDelivered(context, msg);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=957899&r1=957898&r2=957899&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Fri Jun 25 11:49:01 2010
@@ -18,7 +18,6 @@ package org.apache.activemq.broker.regio
 
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.Properties;
@@ -52,6 +51,8 @@ import org.apache.activemq.usage.StoreUs
 import org.apache.activemq.usage.SystemUsage;
 import org.apache.activemq.usage.TempUsage;
 import org.apache.activemq.util.Wait;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Modified CursorSupport Unit test to reproduce the negative queue issue.
@@ -77,7 +78,8 @@ import org.apache.activemq.util.Wait;
  * 
  */
 public class NegativeQueueTest extends TestCase {
-
+    private static final Log LOG = LogFactory.getLog(NegativeQueueTest.class);
+    
     public static SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd,hh:mm:ss:SSS");
     
     private static final String QUEUE_1_NAME = "conn.test.queue.1";
@@ -140,6 +142,7 @@ public class NegativeQueueTest extends T
     }
     
     public void blastAndConsume() throws Exception {
+        LOG.info(getName());
         ConnectionFactory factory = createConnectionFactory();
         
         //get proxy queues for statistics lookups
@@ -208,7 +211,7 @@ public class NegativeQueueTest extends T
             consumer.setMessageListener(new SessionAwareMessageListener(consumerSession,
latch2, consumerList2));
         }
         
-        latch2.await(300000, TimeUnit.MILLISECONDS);
+        assertTrue("got all expected messages on 2", latch2.await(300000, TimeUnit.MILLISECONDS));
         producerConnection.close();
         for(int ix=0; ix<NUM_CONSUMERS; ix++){
             consumerConnections1[ix].close();
@@ -299,6 +302,8 @@ public class NegativeQueueTest extends T
         
         // disable the cache to be sure setBatch is the problem
         // will get lots of duplicates
+        // real problem is sync between cursor and store add - leads to out or order messages
+        // in the cursor so setBatch can break.
         // policy.setUseCache(false);
         
         PolicyMap pMap = new PolicyMap();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java?rev=957899&r1=957898&r2=957899&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
Fri Jun 25 11:49:01 2010
@@ -21,20 +21,23 @@ public class JDBCNegativeQueueTest exten
         dataSource = new EmbeddedDataSource();
         dataSource.setDatabaseName("derbyDb");
         dataSource.setCreateDatabase("create");
-        jdbc.setDataSource(dataSource);
+        jdbc.setDataSource(dataSource);     
         answer.setPersistenceAdapter(jdbc);
     }
 
     protected void tearDown() throws Exception {
-        /*Connection conn = dataSource.getConnection();
-        printQuery(conn, "Select * from ACTIVEMQ_MSGS", System.out); */
+        if (DEBUG) {
+            printQuery("Select * from ACTIVEMQ_MSGS", System.out);
+        }
         super.tearDown();
     }
     
     
-    private void printQuery(Connection c, String query, PrintStream out)
+    private void printQuery(String query, PrintStream out)
             throws SQLException {
-        printQuery(c.prepareStatement(query), out);
+        Connection conn = dataSource.getConnection();
+        printQuery(conn.prepareStatement(query), out);
+        conn.close();
     }
 
     private void printQuery(PreparedStatement s, PrintStream out)
@@ -69,7 +72,4 @@ public class JDBCNegativeQueueTest exten
             }
         }
     }
-
-    
-    
 }



Mime
View raw message