activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1196804 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/state/ test/java/org/apache/activemq/transport/
Date Wed, 02 Nov 2011 21:17:41 GMT
Author: gtully
Date: Wed Nov  2 21:17:41 2011
New Revision: 1196804

URL: http://svn.apache.org/viewvc?rev=1196804&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3576 - only update audit sequence on valid message,
apply test to isolated class where broker side timeout is absent

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1196804&r1=1196803&r2=1196804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Wed Nov  2 21:17:41 2011
@@ -138,7 +138,9 @@ public class ProducerBrokerExchange {
                         + messageSend.getMessageId().getProducerSequenceId() + "] less than
last stored: "  + lastSendSequenceNumber);
             }
         }
-        lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
+        if (canDispatch) {
+            lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
+        }
         return canDispatch;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java?rev=1196804&r1=1196803&r2=1196804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/state/ConnectionStateTracker.java
Wed Nov  2 21:17:41 2011
@@ -80,6 +80,9 @@ public class ConnectionStateTracker exte
                 } else if (eldest.getValue() instanceof MessagePull) {
                     currentCacheSize -= MESSAGE_PULL_SIZE;
                 }
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("removing tracked message: " + eldest.getKey());
+                }
             }
             return result;
         }
@@ -167,7 +170,7 @@ public class ConnectionStateTracker exte
         //now flush messages
         for (Command msg:messageCache.values()) {
             if (LOG.isDebugEnabled()) {
-                LOG.debug("command: " + msg.getCommandId());
+                LOG.debug("command: " + (msg.isMessage() ? ((Message) msg).getMessageId()
: msg));
             }
             transport.oneway(msg);
         }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java?rev=1196804&r1=1196803&r2=1196804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutClientTest.java
Wed Nov  2 21:17:41 2011
@@ -28,30 +28,33 @@ import org.apache.activemq.ActiveMQConne
 import org.apache.activemq.JmsTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.util.SocketProxy;
 import org.apache.activemq.util.URISupport;
+import org.apache.activemq.util.Wait;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SoWriteTimeoutClientTest extends JmsTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutClientTest.class);
 
-    public String brokerTransportScheme = "tcp";
-
     protected BrokerService createBroker() throws Exception {
         BrokerService broker =  new BrokerService();
-        broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0");
+        broker.setDeleteAllMessagesOnStartup(true);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setConcurrentStoreAndDispatchQueues(false);
+        broker.setPersistenceAdapter(adapter);
+        broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=0");
         return broker;
     }
 
-    public void x_testSendWithClientWriteTimeout() throws Exception {
+    public void testSendWithClientWriteTimeout() throws Exception {
         final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
         messageTextPrefix = initMessagePrefix(80*1024);
 
         URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
         LOG.info("consuming using uri: " + tcpBrokerUri);
 
-
          ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
         Connection c = factory.createConnection();
         c.start();
@@ -62,7 +65,7 @@ public class SoWriteTimeoutClientTest ex
         proxy.setTarget(tcpBrokerUri);
         proxy.open();
 
-        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" +
proxy.getUrl() + "?soWriteTimeout=500)?jms.useAsyncSend=true");
+        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" +
proxy.getUrl() + "?soWriteTimeout=4000&sleep=500)?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=6638400");
         final Connection pc = pFactory.createConnection();
         pc.start();
         proxy.pause();
@@ -81,11 +84,20 @@ public class SoWriteTimeoutClientTest ex
         });
 
         // wait for timeout and reconnect
-        TimeUnit.SECONDS.sleep(20);
+        TimeUnit.SECONDS.sleep(8);
         proxy.goOn();
         for (int i=0; i<messageCount; i++) {
             assertNotNull("Got message " + i  + " after reconnect", consumer.receive(5000));
         }
+
+        assertTrue("no pending messages when done", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+
+                LOG.info("current total message count: " + broker.getAdminView().getTotalMessageCount());
+                return broker.getAdminView().getTotalMessageCount() == 0;
+            }
+        }));
     }
 
     private String initMessagePrefix(int i) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java?rev=1196804&r1=1196803&r2=1196804&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Wed Nov  2 21:17:41 2011
@@ -16,6 +16,15 @@
  */
 package org.apache.activemq.transport;
 
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
 import junit.framework.Test;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTestSupport;
@@ -29,27 +38,20 @@ import org.apache.activemq.util.URISuppo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.*;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.URI;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
 public class SoWriteTimeoutTest extends JmsTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
 
     final int receiveBufferSize = 16*1024;
-    public String brokerTransportScheme = "tcp";
+    public String brokerTransportScheme = "nio";
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
         broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
         KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
         adapter.setConcurrentStoreAndDispatchQueues(false);
         broker.setPersistenceAdapter(adapter);
-        broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0");
+        broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0&transport.soWriteTimeout=1000&transport.sleep=1000");
         if ("nio".equals(brokerTransportScheme)) {
             broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="
+ receiveBufferSize + "&trace=true");
         }
@@ -148,51 +150,6 @@ public class SoWriteTimeoutTest extends 
         }
     }
 
-    public void testClientWriteTimeout() throws Exception {
-        final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
-        messageTextPrefix = initMessagePrefix(80*1024);
-
-        URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
-        LOG.info("consuming using uri: " + tcpBrokerUri);
-
-
-         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
-        Connection c = factory.createConnection();
-        c.start();
-        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(dest);
-
-        SocketProxy proxy = new SocketProxy();
-        proxy.setTarget(tcpBrokerUri);
-        proxy.open();
-
-        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" +
proxy.getUrl() + "?soWriteTimeout=500)?jms.useAsyncSend=true&trackMessages=true");
-        final Connection pc = pFactory.createConnection();
-        pc.start();
-        proxy.pause();
-
-        final int messageCount = 20;
-        ExecutorService executorService = Executors.newCachedThreadPool();
-        executorService.execute(new Runnable() {
-            @Override
-            public void run() {
-                try {
-                    sendMessages(pc, dest, messageCount);
-                } catch (Exception ignored) {
-                    ignored.printStackTrace();
-                }
-            }
-        });
-
-        // wait for timeout and reconnect
-        TimeUnit.SECONDS.sleep(7);
-        proxy.goOn();
-        for (int i=0; i<messageCount; i++) {
-            assertNotNull("Got message after reconnect", consumer.receive(5000));
-        }
-        //broker.getAdminView().get
-    }
-
     private String initMessagePrefix(int i) {
         byte[] content = new byte[i];
         return new String(content);



Mime
View raw message