qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject qpid-jms git commit: https://issues.apache.org/jira/browse/QPIDJMS-58 https://issues.apache.org/jira/browse/QPIDJMS-59 https://issues.apache.org/jira/browse/QPIDJMS-60
Date Fri, 29 May 2015 19:14:02 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master cfb65ee61 -> 07d1637d9


https://issues.apache.org/jira/browse/QPIDJMS-58
https://issues.apache.org/jira/browse/QPIDJMS-59
https://issues.apache.org/jira/browse/QPIDJMS-60

URI Pool updated to provide thread safe operations.  Added method
addFirst and used that to ensure that redirect URIs are always used on
the next connect attempt.  Updated the remove method to use the same
logic as add to find the matching URI to remove.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/07d1637d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/07d1637d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/07d1637d

Branch: refs/heads/master
Commit: 07d1637d959002cef9c7eb5466f0185618db29bc
Parents: cfb65ee
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri May 29 15:13:44 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri May 29 15:13:44 2015 -0400

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java |   2 +-
 .../jms/provider/failover/FailoverUriPool.java  | 108 ++++++++++++++----
 .../provider/failover/FailoverUriPoolTest.java  | 112 +++++++++++++++++++
 3 files changed, 199 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/07d1637d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 6abfd49..eef6a9a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -511,7 +511,7 @@ public class FailoverProvider extends DefaultProviderListener implements
Provide
             if (cause instanceof ProviderRedirectedException) {
                 ProviderRedirectedException redirect = (ProviderRedirectedException) cause;
                 try {
-                    uris.add(new URI(failedURI.getScheme() + "://" + redirect.getNetworkHost()
+ ":" + redirect.getPort()));
+                    uris.addFirst(new URI(failedURI.getScheme() + "://" + redirect.getNetworkHost()
+ ":" + redirect.getPort()));
                 } catch (URISyntaxException ex) {
                     LOG.warn("Could not construct redirection URI from remote provided information");
                 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/07d1637d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
index 4b4ac2f..8cb335c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.qpid.jms.util.URISupport;
 import org.slf4j.Logger;
@@ -41,7 +42,7 @@ public class FailoverUriPool {
 
     private final LinkedList<URI> uris;
     private final Map<String, String> nestedOptions;
-    private boolean randomize = DEFAULT_RANDOMIZE_ENABLED;
+    private final AtomicBoolean randomize = new AtomicBoolean(DEFAULT_RANDOMIZE_ENABLED);
 
     public FailoverUriPool() {
         this.uris = new LinkedList<URI>();
@@ -67,14 +68,18 @@ public class FailoverUriPool {
      * @return the current size of the URI pool.
      */
     public int size() {
-        return uris.size();
+        synchronized (uris) {
+            return uris.size();
+        }
     }
 
     /**
      * @return true if the URI pool is empty.
      */
     public boolean isEmpty() {
-        return uris.isEmpty();
+        synchronized (uris) {
+            return uris.isEmpty();
+        }
     }
 
     /**
@@ -86,9 +91,11 @@ public class FailoverUriPool {
      */
     public URI getNext() {
         URI next = null;
-        if (!uris.isEmpty()) {
-            next = uris.removeFirst();
-            uris.addLast(next);
+        synchronized (uris) {
+            if (!uris.isEmpty()) {
+                next = uris.removeFirst();
+                uris.addLast(next);
+            }
         }
 
         return next;
@@ -101,7 +108,9 @@ public class FailoverUriPool {
      */
     public void connected() {
         if (isRandomize()) {
-            Collections.shuffle(uris);
+            synchronized (uris) {
+                Collections.shuffle(uris);
+            }
         }
     }
 
@@ -109,20 +118,23 @@ public class FailoverUriPool {
      * @return true if this pool returns the URI values in random order.
      */
     public boolean isRandomize() {
-        return randomize;
+        return randomize.get();
     }
 
     /**
      * Sets whether the URIs that are returned by this pool are returned in random
      * order or not.  If false the URIs are returned in FIFO order.
      *
-     * @param randomize
+     * @param random
      *        true to have the URIs returned in a random order.
      */
-    public void setRandomize(boolean randomize) {
-        this.randomize = randomize;
-        if (randomize) {
-            Collections.shuffle(uris);
+    public void setRandomize(boolean random) {
+        if (randomize.compareAndSet(!random, random)) {
+            if (random) {
+                synchronized (uris) {
+                    Collections.shuffle(uris);
+                }
+            }
         }
     }
 
@@ -134,16 +146,52 @@ public class FailoverUriPool {
      *        The new URI to add to the pool.
      */
     public void add(URI uri) {
-        if (uri != null && !contains(uri)) {
-            if (!nestedOptions.isEmpty()) {
-                try {
-                    uri = URISupport.applyParameters(uri, nestedOptions);
-                } catch (URISyntaxException e) {
-                    LOG.debug("Failed to add nested options to uri: {}", uri);
+        if (uri == null) {
+            return;
+        }
+
+        synchronized (uris) {
+            if (!contains(uri)) {
+                if (!nestedOptions.isEmpty()) {
+                    try {
+                        uri = URISupport.applyParameters(uri, nestedOptions);
+                    } catch (URISyntaxException e) {
+                        LOG.debug("Failed to add nested options to uri: {}", uri);
+                    }
                 }
+
+                uris.add(uri);
             }
+        }
+    }
 
-            this.uris.add(uri);
+    /**
+     * Adds a new URI to the pool if not already contained within.  The URI will have
+     * any nested options that have been configured added to its existing set of options.
+     *
+     * The URI is added to the head of the pooled URIs and will be the next value that
+     * is returned from the pool.
+     *
+     * @param uri
+     *        The new URI to add to the pool.
+     */
+    public void addFirst(URI uri) {
+        if (uri == null) {
+            return;
+        }
+
+        synchronized (uris) {
+            if (!contains(uri)) {
+                if (!nestedOptions.isEmpty()) {
+                    try {
+                        uri = URISupport.applyParameters(uri, nestedOptions);
+                    } catch (URISyntaxException e) {
+                        LOG.debug("Failed to add nested options to uri: {}", uri);
+                    }
+                }
+
+                uris.addFirst(uri);
+            }
         }
     }
 
@@ -152,9 +200,23 @@ public class FailoverUriPool {
      *
      * @param uri
      *        The URI to attempt to remove from the pool.
+     *
+     * @returns true if the given URI was removed from the pool.
      */
-    public void remove(URI uri) {
-        this.uris.remove(uri);
+    public boolean remove(URI uri) {
+        if (uri == null) {
+            return false;
+        }
+
+        synchronized (uris) {
+            for (URI candidate : uris) {
+                if (compareURIs(uri, candidate)) {
+                    return uris.remove(candidate);
+                }
+            }
+        }
+
+        return false;
     }
 
     /**
@@ -167,6 +229,8 @@ public class FailoverUriPool {
         return nestedOptions;
     }
 
+    //----- Internal methods that require the locks be held ------------------//
+
     private boolean contains(URI newURI) {
         boolean result = false;
         for (URI uri : uris) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/07d1637d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
index de976f1..8b9452a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
@@ -236,6 +236,41 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
     }
 
     @Test
+    public void testAddFirst() throws URISyntaxException {
+        URI newUri = new URI("tcp://192.168.2." + (uris.size() + 1) + ":5672");
+
+        FailoverUriPool pool = new FailoverUriPool(uris, null);
+        pool.setRandomize(false);
+        pool.addFirst(newUri);
+
+        assertEquals(newUri, pool.getNext());
+
+        for (int i = 0; i < uris.size(); ++i) {
+            assertNotEquals(newUri, pool.getNext());
+        }
+
+        assertEquals(newUri, pool.getNext());
+    }
+
+    @Test
+    public void testAddFirstHandlesNulls() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool(uris, null);
+        pool.setRandomize(false);
+        pool.addFirst(null);
+
+        assertEquals(uris.size(), pool.size());
+    }
+
+    @Test
+    public void testAddFirstToEmptyPool() {
+        FailoverUriPool pool = new FailoverUriPool();
+        assertTrue(pool.isEmpty());
+        pool.addFirst(uris.get(0));
+        assertFalse(pool.isEmpty());
+        assertEquals(uris.get(0), pool.getNext());
+    }
+
+    @Test
     public void testRemoveURIFromPool() throws URISyntaxException {
         FailoverUriPool pool = new FailoverUriPool(uris, null);
         pool.setRandomize(false);
@@ -252,6 +287,66 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
     }
 
     @Test
+    public void testRemovedWhenQueryPresent() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool();
+
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://127.0.0.1:5672?transport.tcpNoDelay=true"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://localhost:5672?transport.tcpNoDelay=true"));
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://127.0.0.1:5672?transport.tcpNoDelay=true"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://localhost:5672?transport.tcpNoDelay=false"));
+        assertTrue(pool.isEmpty());
+    }
+
+    @Test
+    public void testRemoveWithHostResolution() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool();
+
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://127.0.0.1:5672"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://localhost:5672"));
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://127.0.0.1:5672"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://localhost:5673"));
+        assertFalse(pool.isEmpty());
+    }
+
+    @Test
+    public void testRemoveWhenUnresolvable() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool();
+
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://shouldbeunresolvable:5672"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://SHOULDBEUNRESOLVABLE:5672"));
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://shouldbeunresolvable:5672"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://shouldbeunresolvable:5673"));
+        assertFalse(pool.isEmpty());
+    }
+
+    @Test
+    public void testRemoveWhenQueryPresentAndUnresolveable() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool();
+
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://shouldbeunresolvable:5672?transport.tcpNoDelay=true"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://SHOULDBEUNRESOLVABLE:5672?transport.tcpNoDelay=true"));
+        assertTrue(pool.isEmpty());
+        pool.add(new URI("tcp://shouldbeunresolvable:5672?transport.tcpNoDelay=true"));
+        assertFalse(pool.isEmpty());
+        pool.remove(new URI("tcp://shouldbeunresolvable:5673?transport.tcpNoDelay=true"));
+        assertFalse(pool.isEmpty());
+    }
+
+    @Test
     public void testConnectedShufflesWhenRandomizing() {
         assertConnectedEffectOnPool(true, true);
     }
@@ -336,4 +431,21 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
             assertEquals("false", options.get("transport.tcpKeepAlive"));
         }
     }
+
+    @Test
+    public void testRemoveURIWhenNestedOptionsSet() throws URISyntaxException {
+        Map<String, String> nested = new HashMap<String, String>();
+
+        nested.put("transport.tcpNoDelay", "true");
+        nested.put("transport.tcpKeepAlive", "false");
+
+        FailoverUriPool pool = new FailoverUriPool(uris, nested);
+        assertNotNull(pool.getNestedOptions());
+        assertFalse(pool.getNestedOptions().isEmpty());
+
+        for (int i = 0; i < uris.size(); ++i) {
+            assertTrue(pool.remove(uris.get(i)));
+            assertEquals(uris.size() - (i + 1), pool.size());
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message