activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1475734 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-broker/src/main/java/org/apache/activemq/transaction/ activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/ activemq-un...
Date Thu, 25 Apr 2013 12:47:21 GMT
Author: gtully
Date: Thu Apr 25 12:47:20 2013
New Revision: 1475734

URL: http://svn.apache.org/r1475734
Log:
https://issues.apache.org/jira/browse/AMQ-4485 - ensure cursor updates in same order as store
orderindex via beforeCompletion with index lock. The before completion tracks ordered work
that first thread completes as a unit. All updates to a destination are combined to a single
sync, such that there is no cursor contention between transactions

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
  (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
    activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
Thu Apr 25 12:47:20 2013
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.DelayQueue;
@@ -83,6 +84,7 @@ import org.apache.activemq.thread.Task;
 import org.apache.activemq.thread.TaskRunner;
 import org.apache.activemq.thread.TaskRunnerFactory;
 import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.transaction.Transaction;
 import org.apache.activemq.usage.Usage;
 import org.apache.activemq.usage.UsageListener;
 import org.apache.activemq.util.BrokerSupport;
@@ -734,6 +736,120 @@ public class Queue extends BaseDestinati
         }
     }
 
+    final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction,
SendSync>();
+    private volatile LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
+
+    // roll up all message sends
+    class SendSync extends Synchronization {
+
+        class MessageContext {
+            public Message message;
+            public ConnectionContext context;
+
+            public MessageContext(ConnectionContext context, Message message) {
+                this.context = context;
+                this.message = message;
+            }
+        }
+
+        final Transaction transaction;
+        List<MessageContext> additions = new ArrayList<MessageContext>();
+
+        public SendSync(Transaction transaction) {
+            this.transaction = transaction;
+        }
+
+        public void add(ConnectionContext context, Message message) {
+            additions.add(new MessageContext(context, message));
+        }
+
+        @Override
+        public void beforeCommit() throws Exception {
+            synchronized (sendLock) {
+                orderIndexUpdates.addLast(transaction);
+            }
+        }
+
+        @Override
+        public void afterCommit() throws Exception {
+            LinkedList<Transaction> orderedWork = null;
+            // use existing object to sync orderIndexUpdates that can be reassigned
+            synchronized (sendLock) {
+                if (transaction == orderIndexUpdates.peek()) {
+                    orderedWork = orderIndexUpdates;
+                    orderIndexUpdates = new LinkedList<Transaction>();
+
+                    // talking all the ordered work means that earlier
+                    // and later threads do nothing.
+                    // this avoids contention/race on the sendLock that
+                    // guards the actual work.
+                }
+            }
+            // do the ordered work
+            if (orderedWork != null) {
+                sendLock.lockInterruptibly();
+                try {
+                    for (Transaction tx : orderedWork) {
+                        sendSyncs.get(tx).processSend();
+                    }
+                } finally {
+                    sendLock.unlock();
+                }
+                for (Transaction tx : orderedWork) {
+                    sendSyncs.get(tx).processSent();
+                }
+                sendSyncs.remove(transaction);
+            }
+        }
+
+        // called with sendLock
+        private void processSend() throws Exception {
+
+            for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext();
) {
+                MessageContext messageContext = iterator.next();
+                // It could take while before we receive the commit
+                // op, by that time the message could have expired..
+                if (broker.isExpired(messageContext.message)) {
+                    broker.messageExpired(messageContext.context, messageContext.message,
null);
+                    destinationStatistics.getExpired().increment();
+                    iterator.remove();
+                    continue;
+                }
+                sendMessage(messageContext.message);
+                messageContext.message.decrementReferenceCount();
+            }
+        }
+
+        private void processSent() throws Exception {
+            for (MessageContext messageContext : additions) {
+                messageSent(messageContext.context, messageContext.message);
+            }
+        }
+
+        @Override
+        public void afterRollback() throws Exception {
+            try {
+                for (MessageContext messageContext : additions) {
+                    messageContext.message.decrementReferenceCount();
+                }
+            } finally {
+                sendSyncs.remove(transaction);
+            }
+        }
+    }
+
+    // called while holding the sendLock
+    private void registerSendSync(Message message, ConnectionContext context) {
+        final Transaction transaction = context.getTransaction();
+        Queue.SendSync currentSync = sendSyncs.get(transaction);
+        if (currentSync == null) {
+            currentSync = new Queue.SendSync(transaction);
+            transaction.addSynchronization(currentSync);
+            sendSyncs.put(transaction, currentSync);
+        }
+        currentSync.add(context, message);
+    }
+
     void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message)
throws IOException,
             Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
@@ -759,30 +875,7 @@ public class Queue extends BaseDestinati
                 // our memory. This increment is decremented once the tx finishes..
                 message.incrementReferenceCount();
 
-                context.getTransaction().addSynchronization(new Synchronization() {
-                    @Override
-                    public void afterCommit() throws Exception {
-                        sendLock.lockInterruptibly();
-                        try {
-                            // It could take while before we receive the commit
-                            // op, by that time the message could have expired..
-                            if (broker.isExpired(message)) {
-                                broker.messageExpired(context, message, null);
-                                destinationStatistics.getExpired().increment();
-                                return;
-                            }
-                            sendMessage(message);
-                        } finally {
-                            sendLock.unlock();
-                            message.decrementReferenceCount();
-                        }
-                        messageSent(context, message);
-                    }
-                    @Override
-                    public void afterRollback() throws Exception {
-                        message.decrementReferenceCount();
-                    }
-                });
+                registerSendSync(message, context);
             } else {
                 // Add to the pending list, this takes care of incrementing the
                 // usage manager.

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
(original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java
Thu Apr 25 12:47:20 2013
@@ -128,7 +128,7 @@ public abstract class Transaction {
 
     @Override
     public String toString() {
-        return super.toString() + "[synchronizations=" + synchronizations + "]";
+        return "Local-" + getTransactionId() + "[synchronizations=" + synchronizations +
"]";
     }
 
     public abstract void commit(boolean onePhase) throws XAException, IOException;

Modified: activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Thu Apr 25 12:47:20 2013
@@ -43,7 +43,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.Stack;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -833,7 +832,7 @@ public abstract class MessageDatabase ex
                 lastRecoveryPosition = nextRecoveryPosition;
                 metadata.lastUpdate = lastRecoveryPosition;
                 JournalCommand<?> message = load(lastRecoveryPosition);
-                process(message, lastRecoveryPosition, (Runnable)null);
+                process(message, lastRecoveryPosition, (Runnable)null, (Runnable)null);
                 nextRecoveryPosition = journal.getNextLocation(lastRecoveryPosition);
             }
         } finally {
@@ -913,10 +912,7 @@ public abstract class MessageDatabase ex
      * the JournalMessage is used to update the index just like it would be done
      * during a recovery process.
      */
-    public Location store(JournalCommand<?> data, boolean sync, Runnable before,Runnable
after, Runnable onJournalStoreComplete) throws IOException {
-        if (before != null) {
-            before.run();
-        }
+    public Location store(JournalCommand<?> data, boolean sync, Runnable before, Runnable
after, Runnable onJournalStoreComplete) throws IOException {
         try {
             ByteSequence sequence = toByteSequence(data);
 
@@ -927,7 +923,7 @@ public abstract class MessageDatabase ex
                 long start = System.currentTimeMillis();
                 location = onJournalStoreComplete == null ? journal.write(sequence, sync)
:  journal.write(sequence, onJournalStoreComplete) ;
                 long start2 = System.currentTimeMillis();
-                process(data, location, after);
+                process(data, location, before, after);
 
                 long end = System.currentTimeMillis();
                 if( LOG_SLOW_ACCESS_TIME>0 && end-start > LOG_SLOW_ACCESS_TIME)
{
@@ -940,18 +936,7 @@ public abstract class MessageDatabase ex
                 checkpointLock.readLock().unlock();
             }
             if (after != null) {
-                Runnable afterCompletion = null;
-                synchronized (orderedTransactionAfters) {
-                    if (!orderedTransactionAfters.empty()) {
-                        afterCompletion = orderedTransactionAfters.pop();
-                    }
-                }
-                if (afterCompletion != null) {
-                    afterCompletion.run();
-                } else {
-                    // non persistent message case
-                    after.run();
-                }
+                after.run();
             }
 
             if (checkpointThread != null && !checkpointThread.isAlive()) {
@@ -1004,7 +989,7 @@ public abstract class MessageDatabase ex
      */
     void process(JournalCommand<?> data, final Location location, final Location inDoubtlocation)
throws IOException {
         if (inDoubtlocation != null && location.compareTo(inDoubtlocation) >=
0) {
-            process(data, location, (Runnable) null);
+            process(data, location, (Runnable) null, (Runnable) null);
         } else {
             // just recover producer audit
             data.visit(new Visitor() {
@@ -1022,7 +1007,7 @@ public abstract class MessageDatabase ex
     // from the recovery method too so they need to be idempotent
     // /////////////////////////////////////////////////////////////////
 
-    void process(JournalCommand<?> data, final Location location, final Runnable after)
throws IOException {
+    void process(JournalCommand<?> data, final Location location, final Runnable before,
final Runnable after) throws IOException {
         data.visit(new Visitor() {
             @Override
             public void visit(KahaAddMessageCommand command) throws IOException {
@@ -1041,7 +1026,7 @@ public abstract class MessageDatabase ex
 
             @Override
             public void visit(KahaCommitCommand command) throws IOException {
-                process(command, location, after);
+                process(command, location, before, after);
             }
 
             @Override
@@ -1153,17 +1138,8 @@ public abstract class MessageDatabase ex
         }
     }
 
-    private final Stack<Runnable> orderedTransactionAfters = new Stack<Runnable>();
-    private void push(Runnable after) {
-        if (after != null) {
-            synchronized (orderedTransactionAfters) {
-                orderedTransactionAfters.push(after);
-            }
-        }
-    }
-
     @SuppressWarnings("rawtypes")
-    protected void process(KahaCommitCommand command, Location location, final Runnable after)
throws IOException {
+    protected void process(KahaCommitCommand command, Location location, final Runnable before,
final Runnable after) throws IOException {
         TransactionId key = TransactionIdConversion.convert(command.getTransactionInfo());
         List<Operation> inflightTx;
         synchronized (inflightTransactions) {
@@ -1173,9 +1149,9 @@ public abstract class MessageDatabase ex
             }
         }
         if (inflightTx == null) {
-            if (after != null) {
-                // since we don't push this after and we may find another, lets run it now
-                after.run();
+            // only non persistent messages in this tx
+            if (before != null) {
+                before.run();
             }
             return;
         }
@@ -1183,6 +1159,10 @@ public abstract class MessageDatabase ex
         final List<Operation> messagingTx = inflightTx;
         this.indexLock.writeLock().lock();
         try {
+            // run before with the index lock so that queue can order cursor updates with
index updates
+            if (before != null) {
+                before.run();
+            }
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 @Override
                 public void execute(Transaction tx) throws IOException {
@@ -1192,7 +1172,6 @@ public abstract class MessageDatabase ex
                 }
             });
             metadata.lastUpdate = location;
-            push(after);
         } finally {
             this.indexLock.writeLock().unlock();
         }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/MessageExpirationTest.java
Thu Apr 25 12:47:20 2013
@@ -145,14 +145,8 @@ public class MessageExpirationTest exten
         connection.send(closeConnectionInfo(connectionInfo2));
     }
 
-    /**
-     * Small regression. Looks like persistent messages to a queue are not being
-     * timedout when in a long transaction. See:
-     * http://issues.apache.org/activemq/browse/AMQ-1269 Commenting out the
-     * DeliveryMode.PERSISTENT test combination for now.
-     */
     public void initCombosForTestMessagesInLongTransactionExpire() {
-        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
+        addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.PERSISTENT),
Integer.valueOf(DeliveryMode.NON_PERSISTENT)});
         addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE),
Byte.valueOf(ActiveMQDestination.TOPIC_TYPE),
                                                               Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE),
Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)});
     }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java?rev=1475734&r1=1475733&r2=1475734&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/NegativeQueueTest.java
Thu Apr 25 12:47:20 2013
@@ -88,7 +88,8 @@ public class NegativeQueueTest extends A
     private static final long MEMORY_USAGE = 400000000;
     private static final long TEMP_USAGE = 200000000;
     private static final long STORE_USAGE = 1000000000;
-    private static final int MESSAGE_COUNT = 1100;
+    // ensure we exceed the cache 70%
+    private static final int MESSAGE_COUNT = 2100;
 
     protected static final boolean TRANSACTED = true;
     protected static final boolean DEBUG = true;

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java?rev=1475734&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
(added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
Thu Apr 25 12:47:20 2013
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerPluginSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransactionBroker;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.transaction.Synchronization;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485Test extends TestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485Test.class);
+    BrokerService broker;
+    ActiveMQConnectionFactory factory;
+    final int messageCount = 20;
+    int memoryLimit = 40 * 1024;
+    final ActiveMQQueue destination = new ActiveMQQueue("QUEUE." + this.getClass().getName());
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
+    final CountDownLatch slowSendResume = new CountDownLatch(1);
+
+
+    protected void configureBroker(long memoryLimit) throws Exception {
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setAdvisorySupport(false);
+
+        PolicyEntry policy = new PolicyEntry();
+        policy.setExpireMessagesPeriod(0);
+        policy.setMemoryLimit(memoryLimit);
+        policy.setProducerFlowControl(false);
+        PolicyMap pMap = new PolicyMap();
+        pMap.setDefaultEntry(policy);
+        broker.setDestinationPolicy(pMap);
+
+        broker.setPlugins(new BrokerPlugin[] {new BrokerPluginSupport() {
+            @Override
+            public void send(ProducerBrokerExchange producerExchange, final Message messageSend)
throws Exception {
+                if (messageSend.isInTransaction() && messageSend.getProperty("NUM")
!= null) {
+                    final Integer num = (Integer) messageSend.getProperty("NUM");
+                    if (true) {
+                        TransactionBroker transactionBroker = (TransactionBroker)broker.getBroker().getAdaptor(TransactionBroker.class);
+                        transactionBroker.getTransaction(producerExchange.getConnectionContext(),
messageSend.getTransactionId(), false).addSynchronization(
+                                new Synchronization() {
+                                    public void afterCommit() throws Exception {
+                                        LOG.error("AfterCommit, NUM:" + num + ", " + messageSend.getMessageId()
+ ", tx: " + messageSend.getTransactionId());
+                                        if (num == 5) {
+                                            // we want to add to cursor after usage is exhausted
by message 20 and when
+                                            // all other messages have been processed
+                                            LOG.error("Pausing on latch in afterCommit for:
" + num + ", " + messageSend.getMessageId());
+                                            slowSendResume.await(20, TimeUnit.SECONDS);
+                                            LOG.error("resuming on latch afterCommit for:
" + num + ", " + messageSend.getMessageId());
+                                        } else if (messageCount + 1 == num) {
+                                            LOG.error("releasing latch. " + num + ", " +
messageSend.getMessageId());
+                                            slowSendResume.countDown();
+                                            // for message X, we need to delay so message
5 can setBatch
+                                            TimeUnit.SECONDS.sleep(5);
+                                            LOG.error("resuming afterCommit for: " + num
+ ", " + messageSend.getMessageId());
+                                        }
+                                    }
+                                });
+                    }
+                }
+                super.send(producerExchange, messageSend);
+            }
+        }
+        });
+
+    }
+
+
+    public void testOutOfOrderTransactionCompletionOnMemoryLimit() throws Exception {
+
+        Set<Integer> expected = new HashSet<Integer>();
+        final Vector<Session> sessionVector = new Vector<Session>();
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        for (int i = 1; i <= messageCount; i++) {
+           sessionVector.add(send(i, 1, true));
+           expected.add(i);
+        }
+
+        // get parallel commit so that the sync writes are batched
+        for (int i = 0; i < messageCount; i++) {
+            final int id = i;
+            executorService.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        sessionVector.get(id).commit();
+                    } catch (Exception fail) {
+                        exceptions.add(fail);
+                    }
+                }
+            });
+        }
+
+        final DestinationViewMBean queueViewMBean = (DestinationViewMBean)
+                broker.getManagementContext().newProxyInstance(broker.getAdminView().getQueues()[0],
DestinationViewMBean.class, false);
+
+        // not sure how many messages will get enqueued
+        TimeUnit.SECONDS.sleep(3);
+        if (false)
+        assertTrue("all " + messageCount + " on the q", Wait.waitFor(new Wait.Condition()
{
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("enqueueCount: " + queueViewMBean.getEnqueueCount());
+                return messageCount == queueViewMBean.getEnqueueCount();
+            }
+        }));
+
+        LOG.info("Big send to blow available destination usage before slow send resumes");
+        send(messageCount + 1, 35*1024, true).commit();
+
+
+        // consume and verify all received
+        Connection cosumerConnection = factory.createConnection();
+        cosumerConnection.start();
+        MessageConsumer consumer = cosumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(destination);
+        for (int i = 1; i <= messageCount + 1; i++) {
+            BytesMessage bytesMessage = (BytesMessage) consumer.receive(10000);
+            assertNotNull("Got message: " + i + ", " + expected, bytesMessage);
+            MessageId mqMessageId = ((ActiveMQBytesMessage) bytesMessage).getMessageId();
+            LOG.info("got: " + expected + ", "  + mqMessageId + ", NUM=" + ((ActiveMQBytesMessage)
bytesMessage).getProperty("NUM"));
+            expected.remove(((ActiveMQBytesMessage) bytesMessage).getProperty("NUM"));
+        }
+    }
+
+    private Session send(int id, int messageSize, boolean transacted) throws Exception {
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED
: Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        BytesMessage bytesMessage = session.createBytesMessage();
+        bytesMessage.writeBytes(new byte[messageSize]);
+        bytesMessage.setIntProperty("NUM", id);
+        producer.send(bytesMessage);
+        LOG.info("Sent:" + bytesMessage.getJMSMessageID() + " session tx: " + ((ActiveMQBytesMessage)
bytesMessage).getTransactionId());
+        return session;
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = new BrokerService();
+        broker.setBrokerName("thisOne");
+        configureBroker(memoryLimit);
+        broker.start();
+        factory = new ActiveMQConnectionFactory("vm://thisOne?jms.alwaysSyncSend=true");
+        factory.setWatchTopicAdvisories(false);
+
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+            broker = null;
+        }
+    }
+
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date



Mime
View raw message