qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-44
Date Fri, 24 Apr 2015 21:14:18 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 2a5f89a1c -> 14e4a92b4


https://issues.apache.org/jira/browse/QPIDJMS-44

Add in support for handling connection redirection errors and auto
reconnect for the failover provider when a redirect is detected.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/14e4a92b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/14e4a92b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/14e4a92b

Branch: refs/heads/master
Commit: 14e4a92b4621a746586f5b8911d1cef6f1673d2b
Parents: 2a5f89a
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Apr 24 17:14:05 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Apr 24 17:14:05 2015 -0400

----------------------------------------------------------------------
 .../provider/ProviderRedirectedException.java   |  64 ++++++++
 .../jms/provider/amqp/AmqpAbstractResource.java |  46 ++++++
 .../qpid/jms/provider/amqp/AmqpProvider.java    |   2 -
 .../jms/provider/failover/FailoverProvider.java |  14 +-
 .../qpid/jms/JmsDefaultConnectionListener.java  |  46 ++++++
 .../integration/ConnectionIntegrationTest.java  |  56 ++++++-
 .../FailedConnectionsIntegrationTest.java       | 119 ++++++++++++++
 .../provider/failover/FailoverRedirectTest.java | 159 +++++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  64 +++++++-
 9 files changed, 565 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
new file mode 100644
index 0000000..f521a38
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+
+/**
+ * {@link IOException} derivative that defines that the remote peer has requested that this
+ * connection be redirected to some alternative peer.
+ */
+public class ProviderRedirectedException extends IOException {
+
+    private static final long serialVersionUID = 5872211116061710369L;
+
+    private final String hostname;
+    private final String networkHost;
+    private final int port;
+
+    /**
+     * @param reason
+     */
+    public ProviderRedirectedException(String reason, String hostname, String networkHost,
int port) {
+        super(reason);
+
+        this.hostname = hostname;
+        this.networkHost = networkHost;
+        this.port = port;
+    }
+
+    /**
+     * @return the host name of the container being redirected to.
+     */
+    public String getHostname() {
+        return hostname;
+    }
+
+    /**
+     * @return the DNS host name or IP address of the peer this connection is being redirected
to.
+     */
+    public String getNetworkHost() {
+        return networkHost;
+    }
+
+    /**
+     * @return the port number on the peer this connection is being redirected to.
+     */
+    public int getPort() {
+        return port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 204f6fe..5216276 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import java.io.IOException;
+import java.util.Map;
 
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
@@ -25,8 +26,10 @@ import javax.jms.JMSSecurityException;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ConnectionError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.apache.qpid.proton.engine.Endpoint;
 import org.apache.qpid.proton.engine.EndpointState;
@@ -241,6 +244,8 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E
extends Endp
                 remoteError = new JMSSecurityException(message);
             } else if (error.equals(AmqpError.NOT_FOUND)) {
                 remoteError = new InvalidDestinationException(message);
+            } else if (error.equals(ConnectionError.REDIRECT)) {
+                remoteError = createRedirectException(error, message, getEndpoint().getRemoteCondition());
             } else {
                 remoteError = new JMSException(message);
             }
@@ -331,6 +336,47 @@ public abstract class AmqpAbstractResource<R extends JmsResource,
E extends Endp
     }
 
     /**
+     * When a redirect type exception is received this method is called to create the
+     * appropriate redirect exception type containing the error details needed.
+     *
+     * @param error
+     *        the Symbol that defines the redirection error type.
+     * @param message
+     *        the basic error message that should used or amended for the returned exception.
+     * @param condition
+     *        the ErrorCondition that describes the redirection.
+     *
+     * @returns an Exception that captures the details of the redirection error.
+     */
+    @SuppressWarnings("unchecked")
+    protected Exception createRedirectException(Symbol error, String message, ErrorCondition
condition) {
+        Exception result = null;
+        Map<String, Object> info = condition.getInfo();
+
+        if (info == null) {
+            result = new IOException(message + " : Redirection information not set.");
+        } else {
+            String hostname = (String) info.get("hostname");
+
+            String networkHost = (String) info.get("network-host");
+            if (networkHost == null || networkHost.isEmpty()) {
+                result = new IOException(message + " : Redirection information not set.");
+            }
+
+            int port = 0;
+            try {
+                port = Integer.valueOf(info.get("port").toString());
+            } catch (Exception ex) {
+                result = new IOException(message + " : Redirection information not set.");
+            }
+
+            result = new ProviderRedirectedException(message, hostname, networkHost, port);
+        }
+
+        return result;
+    }
+
+    /**
      * When aborting the open operation, and there isnt an error condition,
      * provided by the peer, the returned exception will be used instead.
      * A subclass may override this method to provide alternative behaviour.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 7191ddb..2314269 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -734,12 +734,10 @@ public class AmqpProvider implements Provider, TransportListener {
                         amqpResource.processRemoteOpen(this);
                         break;
                     case LINK_REMOTE_CLOSE:
-                        LOG.info("Link closed: {}", protonEvent.getLink().getContext());
                         amqpResource = (AmqpResource) protonEvent.getLink().getContext();
                         amqpResource.processRemoteClose(this);
                         break;
                     case LINK_REMOTE_DETACH:
-                        LOG.info("Link detach: {}", protonEvent.getLink().getContext());
                         amqpResource = (AmqpResource) protonEvent.getLink().getContext();
                         amqpResource.processRemoteDetach(this);
                         break;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 4ee8bf3..924192e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -47,6 +48,7 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFactory;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
@@ -505,6 +507,16 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
         this.provider = null;
 
         if (reconnectAllowed()) {
+
+            if (cause instanceof ProviderRedirectedException) {
+                ProviderRedirectedException redirect = (ProviderRedirectedException) cause;
+                try {
+                    uris.add(new URI(failedURI.getScheme() + "://" + redirect.getNetworkHost()
+ ":" + redirect.getPort()));
+                } catch (URISyntaxException ex) {
+                    LOG.warn("Could not construct redirection URI from remote provided information");
+                }
+            }
+
             ProviderListener listener = this.listener;
             if (listener != null) {
                 listener.onConnectionInterrupted(failedURI);
@@ -890,7 +902,7 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
         public void run() {
             requests.put(id, this);
             if (provider == null) {
-                whenOffline(IOExceptionSupport.create(new IOException("Connection failed.")));
+                whenOffline(new IOException("Connection failed."));
             } else {
                 try {
                     LOG.debug("Executing Failover Task: {}", this);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
new file mode 100644
index 0000000..203677f
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+public class JmsDefaultConnectionListener implements JmsConnectionListener {
+
+    @Override
+    public void onConnectionEstablished(URI remoteURI) {
+    }
+
+    @Override
+    public void onConnectionFailure(Throwable error) {
+    }
+
+    @Override
+    public void onConnectionInterrupted(URI remoteURI) {
+    }
+
+    @Override
+    public void onConnectionRestored(URI remoteURI) {
+    }
+
+    @Override
+    public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 6d4e3f0..6027034 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -21,12 +21,16 @@
 package org.apache.qpid.jms.integration;
 
 import static org.hamcrest.Matchers.arrayContaining;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionMetaData;
@@ -38,10 +42,12 @@ import javax.jms.Queue;
 import javax.jms.Session;
 
 import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.provider.ProviderRedirectedException;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
 import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
 import org.apache.qpid.proton.amqp.transaction.TxnCapability;
 import org.junit.Test;
@@ -118,7 +124,7 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
             // Tell the test peer to close the connection when executing its last handler
             testPeer.remotelyCloseConnection(true);
 
-            //Add the exception listener
+            // Add the exception listener
             connection.setExceptionListener(new ExceptionListener() {
 
                 @Override
@@ -136,6 +142,54 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 10000)
+    public void testRemotelyEndConnectionWithRedirect() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final CountDownLatch done = new CountDownLatch(1);
+            final AtomicReference<JMSException> asyncError = new AtomicReference<JMSException>();
+
+            final String REDIRECTED_HOSTNAME = "vhost";
+            final String REDIRECTED_NETWORK_HOST = "localhost";
+            final int REDIRECTED_PORT = 5677;
+
+            // Don't set a ClientId, so that the underlying AMQP connection isn't established
yet
+            Connection connection = testFixture.establishConnecton(testPeer, false, null,
null, null, false);
+
+            // Tell the test peer to close the connection when executing its last handler
+            Map<String, Object> errorInfo = new HashMap<String, Object>();
+            errorInfo.put("hostname", REDIRECTED_HOSTNAME);
+            errorInfo.put("network-host", REDIRECTED_NETWORK_HOST);
+            errorInfo.put("port", 5677);
+
+            testPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Connection
redirected", errorInfo);
+
+            // Add the exception listener
+            connection.setExceptionListener(new ExceptionListener() {
+
+                @Override
+                public void onException(JMSException exception) {
+                    asyncError.set(exception);
+                    done.countDown();
+                }
+            });
+
+            // Trigger the underlying AMQP connection
+            connection.start();
+
+            assertTrue("Connection should report failure", done.await(5, TimeUnit.SECONDS));
+
+            assertTrue(asyncError.get() instanceof JMSException);
+            assertTrue(asyncError.get().getCause() instanceof ProviderRedirectedException);
+
+            ProviderRedirectedException redirect = (ProviderRedirectedException) asyncError.get().getCause();
+            assertEquals(REDIRECTED_HOSTNAME, redirect.getHostname());
+            assertEquals(REDIRECTED_NETWORK_HOST, redirect.getNetworkHost());
+            assertEquals(REDIRECTED_PORT, redirect.getPort());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     @Test(timeout = 5000)
     public void testRemotelyEndConnectionWithSessionWithConsumer() throws Exception {
         final String BREAD_CRUMB = "ErrorMessage";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
new file mode 100644
index 0000000..7660632
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.provider.ProviderRedirectedException;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
+import org.junit.Test;
+
+/**
+ * Test the handling of various error on connection that should return
+ * specific error types to the JMS client.
+ */
+public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
+
+    @Test(timeout = 5000)
+    public void testConnectWithInvalidClientId() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.rejectConnect(AmqpError.INVALID_FIELD, "Client ID already in use", null);
+            try {
+                establishAnonymousConnecton(testPeer, true);
+                fail("Should have thrown JMSException");
+            } catch (JMSException jmsEx) {
+            } catch (Exception ex) {
+                fail("Should have thrown JMSException: " + ex);
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testConnectSecurityViolation() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.rejectConnect(AmqpError.UNAUTHORIZED_ACCESS, "Anonymous connections
not allowed", null);
+            try {
+                establishAnonymousConnecton(testPeer, true);
+                fail("Should have thrown JMSSecurityException");
+            } catch (JMSException jmsEx) {
+            } catch (Exception ex) {
+                fail("Should have thrown JMSSecurityException: " + ex);
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 5000)
+    public void testConnectWithRedirect() throws Exception {
+        Map<String, Object> redirectInfo = new HashMap<String, Object>();
+
+        redirectInfo.put("hostname", "localhost");
+        redirectInfo.put("network-host", "127.0.0.1");
+        redirectInfo.put("port", 5672);
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.rejectConnect(ConnectionError.REDIRECT, "Server is full, go away", redirectInfo);
+            try {
+                establishAnonymousConnecton(testPeer, true);
+                fail("Should have thrown JMSException");
+            } catch (JMSException jmsex) {
+                assertTrue(jmsex.getCause() instanceof ProviderRedirectedException);
+                ProviderRedirectedException redirectEx = (ProviderRedirectedException) jmsex.getCause();
+                assertEquals("localhost", redirectEx.getHostname());
+                assertEquals("127.0.0.1", redirectEx.getNetworkHost());
+                assertEquals(5672, redirectEx.getPort());
+            } catch (Exception ex) {
+                fail("Should have thrown JMSException: " + ex);
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    Connection establishAnonymousConnecton(TestAmqpPeer testPeer, boolean setClientId) throws
JMSException {
+
+        final String remoteURI = "amqp://localhost:" + testPeer.getServerPort();
+
+        ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        Connection connection = factory.createConnection();
+
+        if (setClientId) {
+            // Set a clientId to provoke the actual AMQP connection process to occur.
+            connection.setClientID("clientName");
+        }
+
+        assertNull(testPeer.getThrowable());
+        return connection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
new file mode 100644
index 0000000..fb9e890
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.ConnectionError;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that when failover is used and the remote sends a redirect error, the
+ * failover transport obtains the new peer connection info and attempts to connect
+ * there.
+ */
+public class FailoverRedirectTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverRedirectTest.class);
+
+    @Test(timeout = 40000)
+    public void testFailoverHandlesRedirection() throws Exception {
+        try (TestAmqpPeer rejectingPeer = new TestAmqpPeer();
+             TestAmqpPeer redirectedPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch connected = new CountDownLatch(1);
+            final String redirectURI = createPeerURI(redirectedPeer);
+            LOG.info("Backup peer is at: {}", redirectURI);
+
+            redirectedPeer.expectAnonymousConnect(true);
+            redirectedPeer.expectBegin(true);
+
+            Map<String, Object> redirectInfo = new HashMap<String, Object>();
+
+            redirectInfo.put("hostname", "localhost");
+            redirectInfo.put("network-host", "localhost");
+            redirectInfo.put("port", redirectedPeer.getServerPort());
+
+            rejectingPeer.rejectConnect(ConnectionError.REDIRECT, "Server is full, go away",
redirectInfo);
+
+            final JmsConnection connection = establishAnonymousConnecton(rejectingPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (redirectURI.equals(remoteURI.toString())) {
+                        connected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            rejectingPeer.waitForAllHandlersToComplete(1000);
+            assertTrue("Should connect to backup peer", connected.await(15, TimeUnit.SECONDS));
+
+            redirectedPeer.expectClose();
+            connection.close();
+            redirectedPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 40000)
+    public void testFailoverHandlesRemotelyEndConnectionWithRedirection() throws Exception
{
+        try (TestAmqpPeer rejectingPeer = new TestAmqpPeer();
+             TestAmqpPeer redirectedPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            final String rejectingURI = createPeerURI(rejectingPeer);
+            final String redirectURI = createPeerURI(redirectedPeer);
+            LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI);
+
+            redirectedPeer.expectAnonymousConnect(true);
+            redirectedPeer.expectBegin(true);
+
+            Map<String, Object> redirectInfo = new HashMap<String, Object>();
+
+            redirectInfo.put("hostname", "localhost");
+            redirectInfo.put("network-host", "localhost");
+            redirectInfo.put("port", redirectedPeer.getServerPort());
+
+            rejectingPeer.expectAnonymousConnect(true);
+            rejectingPeer.expectBegin(true);
+            rejectingPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Server
is full, go away", redirectInfo);
+
+            final JmsConnection connection = establishAnonymousConnecton(rejectingPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (remoteURI.toString().equals(rejectingURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (remoteURI.toString().equals(redirectURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            rejectingPeer.waitForAllHandlersToComplete(1000);
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(15, TimeUnit.SECONDS));
+            assertTrue("Should connect to backup peer", connectedToBackup.await(15, TimeUnit.SECONDS));
+
+            redirectedPeer.expectClose();
+            connection.close();
+            redirectedPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws JMSException
{
+        final String remoteURI = "failover:(" + createPeerURI(testPeer) + ")";
+
+        ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        Connection connection = factory.createConnection();
+
+        return (JmsConnection) connection;
+    }
+
+    private String createPeerURI(TestAmqpPeer peer) {
+        return "amqp://localhost:" + peer.getServerPort();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/14e4a92b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 0c3bac5..7824b47 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.nullValue;
 import java.io.IOException;
 import java.net.Socket;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -458,6 +459,62 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(openMatcher);
     }
 
+    // TODO - Reject any incoming connection using the supplied information
+    public void rejectConnect(Symbol errorType, String errorMessage, Map<String, Object>
errorInfo) {
+
+        SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS"));
+        addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
+                                            new FrameSender(
+                                                    this, FrameType.SASL, 0,
+                                                    saslMechanismsFrame, null)));
+
+        addHandler(new SaslInitMatcher()
+            .withMechanism(equalTo(Symbol.valueOf("ANONYMOUS")))
+            .withInitialResponse(nullValue())
+            .onSuccess(new AmqpPeerRunnable()
+            {
+                @Override
+                public void run()
+                {
+                    TestAmqpPeer.this.sendFrame(
+                            FrameType.SASL, 0,
+                            new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)),
+                            null,
+                            false);
+                    _driverRunnable.expectHeader();
+                }
+            }));
+
+        addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
+
+        Map<Symbol, Object> properties = new HashMap<Symbol, Object>();
+        properties.put(Symbol.valueOf("amqp:connection-establishment-failed"), true);
+
+        OpenFrame open = new OpenFrame();
+        open.setProperties(properties);
+        open.setContainerId("test-amqp-peer-container-id");
+
+        addHandler(new OpenMatcher()
+            .withContainerId(notNullValue(String.class))
+            .onSuccess(new FrameSender(this, FrameType.AMQP, CONNECTION_CHANNEL, open, null)));
+
+        // Now generate the Close with the supplied error
+        final CloseFrame closeFrame = new CloseFrame();
+        if (errorType != null) {
+            org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
+            detachError.setCondition(errorType);
+            detachError.setDescription(errorMessage);
+            detachError.setInfo(errorInfo);
+            closeFrame.setError(detachError);
+        }
+
+        CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+        final FrameSender closeSender = new FrameSender(this, FrameType.AMQP, CONNECTION_CHANNEL,
closeFrame, null);
+        comp.add(closeSender);
+
+        addHandler(new CloseMatcher().withError(Matchers.nullValue()));
+    }
+
     public void expectClose()
     {
         addHandler(new CloseMatcher()
@@ -1296,10 +1353,14 @@ public class TestAmqpPeer implements AutoCloseable
     }
 
     public void remotelyCloseConnection(boolean expectCloseResponse) {
-        remotelyCloseConnection(expectCloseResponse, null, null);
+        remotelyCloseConnection(expectCloseResponse, null, null, null);
     }
 
     public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String
errorMessage) {
+        remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, null);
+    }
+
+    public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String
errorMessage, Map<String, Object> info) {
         synchronized (_handlersLock) {
             // Prepare a composite to insert this action at the end of the handler sequence
             CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
@@ -1310,6 +1371,7 @@ public class TestAmqpPeer implements AutoCloseable
                 org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new
org.apache.qpid.jms.test.testpeer.describedtypes.Error();
                 detachError.setCondition(errorType);
                 detachError.setDescription(errorMessage);
+                detachError.setInfo(info);
                 closeFrame.setError(detachError);
             }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message