activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dej...@apache.org
Subject svn commit: r1196709 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ProducerBrokerExchange.java test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Date Wed, 02 Nov 2011 17:23:15 GMT
Author: dejanb
Date: Wed Nov  2 17:23:15 2011
New Revision: 1196709

URL: http://svn.apache.org/viewvc?rev=1196709&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3576 - producer exchange sync last seq id

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java?rev=1196709&r1=1196708&r2=1196709&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ProducerBrokerExchange.java
Wed Nov  2 17:23:15 2011
@@ -23,6 +23,8 @@ import org.apache.activemq.state.Produce
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * Holds internal state in the broker for a MessageProducer
  * 
@@ -36,7 +38,7 @@ public class ProducerBrokerExchange {
     private Region region;
     private ProducerState producerState;
     private boolean mutable = true;
-    private long lastSendSequenceNumber = -1;
+    private AtomicLong lastSendSequenceNumber = new AtomicLong(-1);
     
     public ProducerBrokerExchange() {
     }
@@ -129,18 +131,19 @@ public class ProducerBrokerExchange {
      */
     public boolean canDispatch(Message messageSend) {
         boolean canDispatch = true;
-        if (lastSendSequenceNumber > 0) {
-            if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber)
{
+        if (lastSendSequenceNumber.get() > 0) {
+            if (messageSend.getMessageId().getProducerSequenceId() <= lastSendSequenceNumber.get())
{
                 canDispatch = false;
-                LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId()
+ "] with producerSequenceId [" 
+                LOG.debug("suppressing duplicate message send [" + messageSend.getMessageId()
+ "] with producerSequenceId ["
                         + messageSend.getMessageId().getProducerSequenceId() + "] less than
last stored: "  + lastSendSequenceNumber);
             }
         }
+        lastSendSequenceNumber.set(messageSend.getMessageId().getProducerSequenceId());
         return canDispatch;
     }
 
     public void setLastStoredSequenceId(long l) {
-        lastSendSequenceNumber = l;
+        lastSendSequenceNumber.set(l);
         LOG.debug("last stored sequence id set: " + l);
     }
 }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java?rev=1196709&r1=1196708&r2=1196709&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Wed Nov  2 17:23:15 2011
@@ -16,23 +16,12 @@
  */
 package org.apache.activemq.transport;
 
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
 import junit.framework.Test;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import org.apache.activemq.transport.stomp.Stomp;
 import org.apache.activemq.transport.stomp.StompConnection;
 import org.apache.activemq.util.SocketProxy;
@@ -40,15 +29,27 @@ import org.apache.activemq.util.URISuppo
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.*;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
 public class SoWriteTimeoutTest extends JmsTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(SoWriteTimeoutTest.class);
 
     final int receiveBufferSize = 16*1024;
-    public String brokerTransportScheme = "nio";
+    public String brokerTransportScheme = "tcp";
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = super.createBroker();
-        broker.addConnector(brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="+
receiveBufferSize);
+        broker.setPersistent(true);
+        KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
+        adapter.setConcurrentStoreAndDispatchQueues(false);
+        broker.setPersistenceAdapter(adapter);
+        broker.addConnector(brokerTransportScheme + "://localhost:0?wireFormat.maxInactivityDuration=0");
         if ("nio".equals(brokerTransportScheme)) {
             broker.addConnector("stomp+" + brokerTransportScheme + "://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="
+ receiveBufferSize + "&trace=true");
         }
@@ -147,6 +148,55 @@ public class SoWriteTimeoutTest extends 
         }
     }
 
+    public void testClientWriteTimeout() throws Exception {
+        final ActiveMQQueue dest = new ActiveMQQueue("testClientWriteTimeout");
+        messageTextPrefix = initMessagePrefix(80*1024);
+
+        URI tcpBrokerUri = URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
+        LOG.info("consuming using uri: " + tcpBrokerUri);
+
+
+         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcpBrokerUri);
+        Connection c = factory.createConnection();
+        c.start();
+        Session session = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(dest);
+
+        SocketProxy proxy = new SocketProxy();
+        proxy.setTarget(tcpBrokerUri);
+        proxy.open();
+
+        ActiveMQConnectionFactory pFactory = new ActiveMQConnectionFactory("failover:(" +
proxy.getUrl() + "?soWriteTimeout=500)?jms.useAsyncSend=true&trackMessages=true");
+        final Connection pc = pFactory.createConnection();
+        pc.start();
+        System.out.println("Pausing proxy");
+        proxy.pause();
+
+        final int messageCount = 20;
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    System.out.println("sending messages");
+                    sendMessages(pc, dest, messageCount);
+                    System.out.println("messages sent");
+                } catch (Exception ignored) {
+                    ignored.printStackTrace();
+                }
+            }
+        });
+
+        // wait for timeout and reconnect
+        TimeUnit.SECONDS.sleep(7);
+        System.out.println("go on");
+        proxy.goOn();
+        for (int i=0; i<messageCount; i++) {
+            assertNotNull("Got message after reconnect", consumer.receive(5000));
+        }
+        //broker.getAdminView().get
+    }
+
     private String initMessagePrefix(int i) {
         byte[] content = new byte[i];
         return new String(content);



Mime
View raw message