activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1350657 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/vm/VMTransport.java test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
Date Fri, 15 Jun 2012 15:35:04 GMT
Author: tabish
Date: Fri Jun 15 15:35:04 2012
New Revision: 1350657

URL: http://svn.apache.org/viewvc?rev=1350657&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-3873

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java?rev=1350657&r1=1350656&r2=1350657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
Fri Jun 15 15:35:04 2012
@@ -106,7 +106,9 @@ public class VMTransport implements Tran
     public void dispatch(VMTransport transport, BlockingQueue<Object> pending, Object
command) {
         TransportListener transportListener = transport.getTransportListener();
         if (transportListener != null) {
-            synchronized (started) {
+            // Lock here on the target transport's started since we want to wait for its
start()
+            // method to finish dispatching out of the queue before we do our own.
+            synchronized (transport.started) {
 
                 // Ensure that no additional commands entered the queue in the small time
window
                 // before the start method locks the dispatch lock and the oneway method
was in

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java?rev=1350657&r1=1350656&r2=1350657&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
Fri Jun 15 15:35:04 2012
@@ -32,6 +32,8 @@ import org.apache.activemq.command.BaseC
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.ShutdownInfo;
 import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.transport.MutexTransport;
+import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.Wait;
@@ -107,6 +109,47 @@ public class VMTransportThreadSafeTest {
         }
     }
 
+    private class VMResponderTransportListener implements TransportListener {
+
+        protected final Queue<DummyCommand> received;
+
+        private final Transport peer;
+
+        public VMResponderTransportListener(Queue<DummyCommand> receiveQueue, Transport
peer) {
+            this.received = receiveQueue;
+            this.peer = peer;
+        }
+
+        @Override
+        public void onCommand(Object command) {
+
+            if (command instanceof ShutdownInfo) {
+                return;
+            } else {
+                received.add((DummyCommand) command);
+
+                if (peer != null) {
+                    try {
+                        peer.oneway(command);
+                    } catch (IOException e) {
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void onException(IOException error) {
+        }
+
+        @Override
+        public void transportInterupted() {
+        }
+
+        @Override
+        public void transportResumed() {
+        }
+    }
+
     private class SlowVMTestTransportListener extends VMTestTransportListener {
 
         private final TimeUnit delayUnit;
@@ -714,4 +757,118 @@ public class VMTransportThreadSafeTest {
         return endTime - startTime;
     }
 
+    @Test(timeout=120000)
+    public void testTwoWayTrafficWithMutexTransportSync1() throws Exception {
+
+        for (int i = 0; i < 20; ++i) {
+            doTestTwoWayTrafficWithMutexTransport(false, false);
+        }
+    }
+
+    @Test(timeout=120000)
+    public void testTwoWayTrafficWithMutexTransportSync2() throws Exception {
+
+        for (int i = 0; i < 20; ++i) {
+            doTestTwoWayTrafficWithMutexTransport(true, false);
+        }
+    }
+
+    @Test(timeout=120000)
+    public void testTwoWayTrafficWithMutexTransportSync3() throws Exception {
+
+        for (int i = 0; i < 20; ++i) {
+            doTestTwoWayTrafficWithMutexTransport(false, true);
+        }
+    }
+
+    @Test(timeout=120000)
+    public void testTwoWayTrafficWithMutexTransportSync4() throws Exception {
+
+        for (int i = 0; i < 20; ++i) {
+            doTestTwoWayTrafficWithMutexTransport(false, false);
+        }
+    }
+
+    public void doTestTwoWayTrafficWithMutexTransport(boolean localAsync, boolean remoteAsync)
throws Exception {
+
+        final VMTransport vmlocal = new VMTransport(new URI(location1));
+        final VMTransport vmremote = new VMTransport(new URI(location2));
+
+        final MutexTransport local = new MutexTransport(vmlocal);
+        final MutexTransport remote = new MutexTransport(vmremote);
+
+        final AtomicInteger sequenceId = new AtomicInteger();
+
+        vmlocal.setAsync(localAsync);
+        vmremote.setAsync(remoteAsync);
+
+        vmlocal.setPeer(vmremote);
+        vmremote.setPeer(vmlocal);
+
+        local.setTransportListener(new VMTestTransportListener(localReceived));
+        remote.setTransportListener(new VMResponderTransportListener(remoteReceived, remote));
+
+        final int messageCount = 200000;
+
+        Thread localSend = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                for(int i = 0; i < messageCount; ++i) {
+                    try {
+                        local.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+                    } catch (Exception e) {
+                    }
+                }
+            }
+        });
+
+        Thread remoteSend = new Thread(new Runnable() {
+
+            @Override
+            public void run() {
+                for(int i = 0; i < messageCount; ++i) {
+                    try {
+                        remote.oneway(new DummyCommand(sequenceId.incrementAndGet()));
+                    } catch (Exception e) {
+                    }
+                }
+            }
+        });
+
+        localSend.start();
+        remoteSend.start();
+
+        Thread.sleep(10);
+
+        local.start();
+        remote.start();
+
+        // Wait for both to finish and then check that each side go the correct amount
+        localSend.join();
+        remoteSend.join();
+
+        assertTrue("Remote should have received ("+messageCount+") but got ()" + remoteReceived.size(),
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return remoteReceived.size() == messageCount;
+            }
+        }));
+
+        assertTrue("Local should have received ("+messageCount*2+") but got ()" + localReceived.size(),
Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return localReceived.size() == messageCount*2;
+            }
+        }));
+
+        LOG.debug("All messages sent,stop all");
+
+        local.stop();
+        remote.stop();
+
+        localReceived.clear();
+        remoteReceived.clear();
+    }
+
 }



Mime
View raw message