activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1177345 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/failover/FailoverTransport.java test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
Date Thu, 29 Sep 2011 16:02:49 GMT
Author: tabish
Date: Thu Sep 29 16:02:49 2011
New Revision: 1177345

URL: http://svn.apache.org/viewvc?rev=1177345&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3516

Add fix along with a unit test to ensure it stays fixed.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
  (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1177345&r1=1177344&r2=1177345&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Thu Sep 29 16:02:49 2011
@@ -455,14 +455,6 @@ public class FailoverTransport implement
         this.maxCacheSize = maxCacheSize;
     }
 
-    /**
-     * @return Returns true if the command is one sent when a connection is
-     *         being closed.
-     */
-    private boolean isShutdownCommand(Command command) {
-        return (command != null && (command.isShutdownInfo() || command instanceof
RemoveInfo));
-    }
-
     public void oneway(Object o) throws IOException {
 
         Command command = (Command) o;
@@ -471,22 +463,22 @@ public class FailoverTransport implement
 
             synchronized (reconnectMutex) {
 
-                if (isShutdownCommand(command) && connectedTransport.get() == null)
{
+                if (command != null && connectedTransport.get() == null) {
                     if (command.isShutdownInfo()) {
-                        // Skipping send of ShutdownInfo command when not
-                        // connected.
+                        // Skipping send of ShutdownInfo command when not connected.
                         return;
-                    }
-                    if (command instanceof RemoveInfo || command.isMessageAck()) {
-                        // Simulate response to RemoveInfo command or ack (as it
-                        // will be stale)
+                    } else if (command instanceof RemoveInfo || command.isMessageAck()) {
+                        // Simulate response to RemoveInfo command or MessageAck (as it will
be stale)
                         stateTracker.track(command);
-                        Response response = new Response();
-                        response.setCorrelationId(command.getCommandId());
-                        myTransportListener.onCommand(response);
+                    	if (command.isResponseRequired()) {
+	                        Response response = new Response();
+	                        response.setCorrelationId(command.getCommandId());
+	                        myTransportListener.onCommand(response);
+                    	}
                         return;
                     }
                 }
+
                 // Keep trying until the message is sent.
                 for (int i = 0; !disposed; i++) {
                     try {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java?rev=1177345&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
(added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
Thu Sep 29 16:02:49 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.failover;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.state.ConnectionStateTracker;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class FailoverTransportTest {
+
+    protected Transport transport;
+    protected FailoverTransport failoverTransport;
+    private int commandsReceived;
+
+	@Before
+	public void setUp() throws Exception {
+	}
+
+	@After
+	public void tearDown() throws Exception {
+        if (transport != null) {
+            transport.stop();
+        }
+    }
+
+	@Test(timeout=30000)
+	public void testCommandsIgnoredWhenOffline() throws Exception {
+		this.transport = createTransport();
+
+		assertNotNull(failoverTransport);
+
+		ConnectionStateTracker tracker = failoverTransport.getStateTracker();
+		assertNotNull(tracker);
+
+		ConnectionId id = new ConnectionId("1");
+		ConnectionInfo connection = new ConnectionInfo(id);
+
+		// Track a connection
+		tracker.track(connection);
+
+		try {
+			this.transport.oneway(new RemoveInfo(new ConnectionId("1")));
+		} catch(Exception e) {
+			fail("Should not have failed to remove this known connection");
+		}
+
+		try {
+			this.transport.oneway(new RemoveInfo(new ConnectionId("2")));
+		} catch(Exception e) {
+			fail("Should not have failed to remove this unknown connection");
+		}
+
+		this.transport.oneway(new MessageAck());
+		this.transport.oneway(new ShutdownInfo());
+	}
+
+	@Test(timeout=30000)
+	public void testResponsesSentWhenRequestForIgnoredCommands() throws Exception {
+		this.transport = createTransport();
+		assertNotNull(failoverTransport);
+		MessageAck ack = new MessageAck();
+		assertNotNull("Should have received a Response", this.transport.request(ack));
+		RemoveInfo info = new RemoveInfo(new ConnectionId("2"));
+		assertNotNull("Should have received a Response", this.transport.request(info));
+	}
+
+    protected Transport createTransport() throws Exception {
+    	Transport transport = TransportFactory.connect(
+    			new URI("failover://(tcp://doesNotExist:1234)"));
+        transport.setTransportListener(new TransportListener() {
+
+            public void onCommand(Object command) {
+            	commandsReceived++;
+            }
+
+            public void onException(IOException error) {
+            }
+
+            public void transportInterupted() {
+            }
+
+            public void transportResumed() {
+            }
+        });
+        transport.start();
+
+        this.failoverTransport = transport.narrow(FailoverTransport.class);
+
+        return transport;
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransportTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message