brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [2/4] brooklyn-server git commit: Add findOpenSocketsOnNode to ReachableSocketFinder
Date Thu, 26 Jan 2017 16:20:45 GMT
Add findOpenSocketsOnNode to ReachableSocketFinder

This changes the semantics of ReachableSocketFinder. Previously it would
return as soon as any host+port was reached. Now it determines the
status of each socket in order. The checks are concurrent but, for
example, if the third of five sockets is reachable the status of the
first and second will also be determined.


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/de2bf644
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/de2bf644
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/de2bf644

Branch: refs/heads/master
Commit: de2bf6447517f64e56ed030553a53c69c1cf32e6
Parents: 6a31cec
Author: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Authored: Tue Dec 20 19:20:33 2016 +0000
Committer: Sam Corbett <sam.corbett@cloudsoftcorp.com>
Committed: Mon Jan 23 15:18:32 2017 +0000

----------------------------------------------------------------------
 .../apache/brooklyn/util/net/Networking.java    |   4 +
 .../util/net/ReachableSocketFinder.java         | 163 +++++++++++--------
 .../util/net/ReachableSocketFinderTest.java     | 103 ++++++++++--
 3 files changed, 183 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/de2bf644/utils/common/src/main/java/org/apache/brooklyn/util/net/Networking.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/net/Networking.java b/utils/common/src/main/java/org/apache/brooklyn/util/net/Networking.java
index fde462e..4e97d5c 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/util/net/Networking.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/net/Networking.java
@@ -539,6 +539,10 @@ public class Networking {
         }
     }
 
+    public static Predicate<HostAndPort> isReachablePredicate() {
+        return new IsReachablePredicate();
+    }
+
     public static class IsReachablePredicate implements Predicate<HostAndPort> {
         @Override public boolean apply(HostAndPort input) {
             return Networking.isReachable(input);

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/de2bf644/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
b/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
index d488021..665a7b3 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
@@ -19,17 +19,17 @@ package org.apache.brooklyn.util.net;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.repeat.Repeater;
 import org.apache.brooklyn.util.time.Duration;
 import org.slf4j.Logger;
@@ -38,9 +38,10 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 
@@ -55,12 +56,7 @@ public class ReachableSocketFinder {
     private final ListeningExecutorService userExecutor;
 
     public ReachableSocketFinder(ListeningExecutorService userExecutor) {
-        this(
-                new Predicate<HostAndPort>() {
-                    @Override public boolean apply(HostAndPort input) {
-                        return Networking.isReachable(input);
-                    }}, 
-                userExecutor);
+        this(Networking.isReachablePredicate(), userExecutor);
     }
 
     public ReachableSocketFinder(Predicate<? super HostAndPort> socketTester, ListeningExecutorService
userExecutor) {
@@ -69,41 +65,23 @@ public class ReachableSocketFinder {
     }
 
     /**
-     * 
+     * Returns the first element of sockets that is reachable.
      * @param sockets The host-and-ports to test
      * @param timeout Max time to try to connect to the ip:port
      * 
      * @return The reachable ip:port
-     * @throws NoSuchElementException If no ports accessible within the given time
-     * @throws NullPointerException  If the sockets or duration is null
+     * @throws NoSuchElementException If no ports are accessible within the given time
+     * @throws NullPointerException  If sockets or timeout is null
      * @throws IllegalStateException  If the sockets to test is empty
      */
-    public HostAndPort findOpenSocketOnNode(final Collection<? extends HostAndPort>
sockets, Duration timeout) {
+    public HostAndPort findOpenSocketOnNode(final Iterable<? extends HostAndPort> sockets,
Duration timeout) {
         checkNotNull(sockets, "sockets");
-        checkState(sockets.size() > 0, "No hostAndPort sockets supplied");
-        
+        checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets supplied");
+        checkNotNull(timeout, "timeout");
         LOG.debug("blocking on any reachable socket in {} for {}", sockets, timeout);
-
-        final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
-        boolean passed = Repeater.create("socket-reachable")
-                .limitTimeTo(timeout)
-                .backoffTo(Duration.FIVE_SECONDS)
-                .until(new Callable<Boolean>() {
-                        @Override
-                        public Boolean call() {
-                            Optional<HostAndPort> reachableSocket = tryReachable(sockets,
Duration.FIVE_SECONDS);
-                            if (reachableSocket.isPresent()) {
-                                result.compareAndSet(null, reachableSocket.get());
-                                return true;
-                            }
-                            return false;
-                        }})
-                .run();
-
-        if (passed) {
-            LOG.debug("<< socket {} opened", result);
-            assert result.get() != null;
-            return result.get();
+        Iterator<HostAndPort> iter = findOpenSocketsOnNode(sockets, timeout).iterator();
+        if (iter.hasNext()) {
+            return iter.next();
         } else {
             LOG.warn("No sockets in {} reachable after {}", sockets, timeout);
             throw new NoSuchElementException("could not connect to any socket in " + sockets);
@@ -111,44 +89,89 @@ public class ReachableSocketFinder {
     }
 
     /**
-     * Checks if any any of the given HostAndPorts are reachable. It checks them all concurrently,
and
-     * returns the first that is reachable (or Optional.absent).
+     * Returns an iterable of the elements in sockets that are reachable. The order of elements
+     * in the iterable corresponds to the order of the elements in the input, not the order
in which their
+     * reachability was determined. Iterators are unmodifiable and are evaluated lazily.
+     *
+     * @param sockets The host-and-ports to test
+     * @param timeout Max time to try to connect to each ip:port
+     * @return An iterable containing all sockets that are reachable according to {@link
#socketTester}.
+     * @throws NullPointerException  If sockets or timeout is null
+     * @throws IllegalStateException  If the sockets to test is empty
      */
-    private Optional<HostAndPort> tryReachable(Collection<? extends HostAndPort>
sockets, Duration timeout) {
-        final AtomicReference<HostAndPort> reachableSocket = new AtomicReference<HostAndPort>();
-        final CountDownLatch latch = new CountDownLatch(1);
-        List<ListenableFuture<?>> futures = Lists.newArrayList();
+    public Iterable<HostAndPort> findOpenSocketsOnNode(final Iterable<? extends
HostAndPort> sockets, Duration timeout) {
+        checkNotNull(sockets, "sockets");
+        checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets supplied");
+        checkNotNull(timeout, "timeout");
+        return Optional.presentInstances(tryReachable(sockets, timeout));
+    }
+
+    /**
+     * @return A lazily computed Iterable containing present values for the elements of sockets
that are
+     * reachable according to {@link #socketTester} and absent values for those not. Checks
are concurrent
+     * and the elements in the Iterable are ordered according to their position in sockets.
+     */
+    private Iterable<Optional<HostAndPort>> tryReachable(Iterable<? extends
HostAndPort> sockets, final Duration timeout) {
+        final List<ListenableFuture<Optional<HostAndPort>>> futures = Lists.newArrayList();
+        final AtomicReference<Stopwatch> sinceFirstCompleted = new AtomicReference<>();
+
         for (final HostAndPort socket : sockets) {
-            futures.add(userExecutor.submit(new Runnable() {
+            futures.add(userExecutor.submit(new Callable<Optional<HostAndPort>>()
{
+                @Override
+                public Optional<HostAndPort> call() {
+                    // Whether the socket was reachable (vs. the result of call, which is
whether the repeater is done).
+                    final AtomicBoolean theResultWeCareAbout = new AtomicBoolean();
+                    Repeater.create("socket-reachable")
+                            .limitTimeTo(timeout)
+                            .backoffTo(Duration.FIVE_SECONDS)
+                            .until(new Callable<Boolean>() {
+                                @Override
+                                public Boolean call() throws TimeoutException {
+                                    boolean reachable = socketTester.apply(socket);
+                                    if (reachable) {
+                                        theResultWeCareAbout.set(true);
+                                        return true;
+                                    } else {
+                                        // Run another check if nobody else has completed
yet or another task has
+                                        // completed but this one is still in its grace period.
+                                        Stopwatch timerSinceFirst = sinceFirstCompleted.get();
+                                        return timerSinceFirst != null && Duration.FIVE_SECONDS.subtract(Duration.of(timerSinceFirst)).isNegative();
+                                    }
+                                }
+                            })
+                            .run();
+                    if (theResultWeCareAbout.get()) {
+                        sinceFirstCompleted.compareAndSet(null, Stopwatch.createStarted());
+                    }
+                    return theResultWeCareAbout.get() ? Optional.of(socket) : Optional.<HostAndPort>absent();
+                }
+            }));
+        }
+
+        return new Iterable<Optional<HostAndPort>>() {
+            @Override
+            public Iterator<Optional<HostAndPort>> iterator() {
+                return new AbstractIterator<Optional<HostAndPort>>() {
+                    int count = 0;
+
                     @Override
-                    public void run() {
-                        try {
-                            if (socketTester.apply(socket)) {
-                                reachableSocket.compareAndSet(null, socket);
-                                latch.countDown();
+                    protected Optional<HostAndPort> computeNext() {
+                        if (count < futures.size()) {
+                            final Future<Optional<HostAndPort>> future = futures.get(count++);
+                            try {
+                                return future.get(timeout.toUnit(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
+                            } catch (Exception e) {
+                                Exceptions.propagateIfInterrupt(e);
+                                LOG.trace("Suppressed exception checking reachability of
socket", e);
                             }
-                        } catch (RuntimeInterruptedException e) {
-                            throw e;
-                        } catch (RuntimeException e) {
-                            LOG.warn("Error checking reachability of ip:port "+socket, e);
+                            future.cancel(true);
+                            return Optional.absent();
+                        } else {
+                            return endOfData();
                         }
-                    }}));
-        }
-        
-        ListenableFuture<List<Object>> compoundFuture = Futures.successfulAsList(futures);
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        try {
-            while (reachableSocket.get() == null && !compoundFuture.isDone() &&
timeout.isLongerThan(stopwatch)) {
-                latch.await(50, TimeUnit.MILLISECONDS);
-            }            
-            return Optional.fromNullable(reachableSocket.get());
-            
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
-        } finally {
-            for (Future<?> future : futures) {
-                future.cancel(true);
+                    }
+                };
             }
-        }
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/de2bf644/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
----------------------------------------------------------------------
diff --git a/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
b/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
index 16228ef..b7bfb14 100644
--- a/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
+++ b/utils/common/src/test/java/org/apache/brooklyn/util/net/ReachableSocketFinderTest.java
@@ -18,6 +18,7 @@ package org.apache.brooklyn.util.net;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.net.ServerSocket;
@@ -25,11 +26,13 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.javalang.JavaClassNames;
 import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
@@ -38,6 +41,8 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -48,25 +53,23 @@ public class ReachableSocketFinderTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(ReachableSocketFinderTest.class);
 
-    private HostAndPort socket1;
-    private HostAndPort socket2;
+    private final HostAndPort socket1 = HostAndPort.fromParts("1.1.1.1", 1111);
+    private final HostAndPort socket2 = HostAndPort.fromParts("1.1.1.2", 1112);
+    private final HostAndPort socket3 = HostAndPort.fromParts("1.1.1.3", 1113);
+    private final Predicate<HostAndPort> socketTester = new Predicate<HostAndPort>()
{
+        @Override
+        public boolean apply(HostAndPort input) {
+            return Boolean.TRUE.equals(reachabilityResults.get(input));
+        }};
+
     private Map<HostAndPort, Boolean> reachabilityResults;
     private ListeningExecutorService executor;
-    private Predicate<HostAndPort> socketTester;
     private ReachableSocketFinder finder;
 
     @BeforeMethod(alwaysRun=true)
     public void setUp() throws Exception {
-        socket1 = HostAndPort.fromParts("1.1.1.1", 1111);
-        socket2 = HostAndPort.fromParts("1.1.1.2", 1112);
         reachabilityResults = Maps.newConcurrentMap();
         executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
-        socketTester = new Predicate<HostAndPort>() {
-            @Override public boolean apply(HostAndPort input) {
-                return Boolean.TRUE.equals(reachabilityResults.get(input));
-            }
-        };
-        
         finder = new ReachableSocketFinder(socketTester, executor);
     }
 
@@ -76,19 +79,24 @@ public class ReachableSocketFinderTest {
     }
     
     @Test(expectedExceptions=IllegalStateException.class)
-    public void testWhenNoSocketsThrowsIllegalState() throws Exception {
+    public void testFindWhenNoSocketsThrowsIllegalState() throws Exception {
         finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(), Duration.TEN_SECONDS);
     }
+
+    @Test(expectedExceptions=IllegalStateException.class)
+    public void testFindAllWhenNoSocketsThrowsIllegalState() throws Exception {
+        finder.findOpenSocketsOnNode(ImmutableList.<HostAndPort>of(), Duration.TEN_SECONDS);
+    }
     
     @Test
     public void testReturnsReachableSocket() throws Exception {
         reachabilityResults.put(socket1, true);
         reachabilityResults.put(socket2, false);
-        assertEquals(finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1,
socket2), Duration.TEN_SECONDS), socket1);
+        assertEquals(finder.findOpenSocketOnNode(ImmutableList.of(socket1, socket2), Duration.ONE_SECOND),
socket1);
         
         reachabilityResults.put(socket1, false);
         reachabilityResults.put(socket2, true);
-        assertEquals(finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1,
socket2), Duration.TEN_SECONDS), socket2);
+        assertEquals(finder.findOpenSocketOnNode(ImmutableList.of(socket1, socket2), Duration.ONE_SECOND),
socket2);
     }
     
     @Test
@@ -97,18 +105,79 @@ public class ReachableSocketFinderTest {
         reachabilityResults.put(socket2, false);
         final ListenableFuture<HostAndPort> future = executor.submit(new Callable<HostAndPort>()
{
                 @Override public HostAndPort call() throws Exception {
-                    return finder.findOpenSocketOnNode(ImmutableList.<HostAndPort>of(socket1,
socket2), Duration.TEN_SECONDS);
+                    return finder.findOpenSocketOnNode(ImmutableList.of(socket1, socket2),
Duration.TEN_SECONDS);
                 }});
 
         // Should keep trying
-        Asserts.succeedsContinually(new Runnable() {
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", Duration.millis(100)), new
Runnable() {
             @Override public void run() {
                 assertFalse(future.isDone());
             }});
 
         // When port is reached, it completes
         reachabilityResults.put(socket1, true);
-        assertEquals(future.get(30, TimeUnit.SECONDS), socket1);
+        assertEquals(future.get(5, TimeUnit.SECONDS), socket1);
+    }
+
+    @Test
+    public void testGetReachableSockets() throws Exception {
+        reachabilityResults.put(socket1, true);
+        reachabilityResults.put(socket2, false);
+        reachabilityResults.put(socket3, true);
+        final Iterable<HostAndPort> expected = ImmutableList.of(socket1, socket3);
+        final Iterable<HostAndPort> actual = finder.findOpenSocketsOnNode(
+                ImmutableList.of(socket1, socket2, socket3), Duration.millis(250));
+        assertEquals(actual, expected, "expected=" + expected + ", actual=" + Iterables.toString(actual));
+    }
+
+    @Test
+    public void testGetAllReachableSocketsEmpty() throws Exception {
+        final Iterable<HostAndPort> expected = ImmutableList.of();
+        final Iterable<HostAndPort> actual = finder.findOpenSocketsOnNode(ImmutableList.of(socket2),
Duration.millis(1));
+        assertEquals(actual, expected, "expected=" + expected + ", actual=" + Iterables.toString(actual));
+    }
+
+    @Test
+    public void testReachableSocketsIteratesInInputOrder() throws Exception {
+        // i.e. not in the order that reachability was determined.
+        reachabilityResults.put(socket1, false);
+        reachabilityResults.put(socket2, true);
+        final Iterable<HostAndPort> actual = finder.findOpenSocketsOnNode(ImmutableList.of(socket1,
socket2), Duration.TEN_SECONDS);
+        final Iterable<HostAndPort> expected = ImmutableList.of(socket1, socket2);
+        final Future<?> future = executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                // This will block until the check for socket1 completes or times out.
+                assertEquals(actual, expected, "expected=" + expected + ", actual=" + Iterables.toString(actual));
+            }
+        });
+
+        // Should keep trying
+        Asserts.succeedsContinually(ImmutableMap.of("timeout", Duration.ONE_SECOND), new
Runnable() {
+            @Override public void run() {
+                assertFalse(future.isDone());
+            }});
+
+        // When port is reached, it completes. Demonstrates grace period.
+        reachabilityResults.put(socket1, true);
+
+        Asserts.succeedsEventually(ImmutableMap.of("timeout", Duration.ONE_SECOND), new Runnable()
{
+            @Override public void run() {
+                assertTrue(future.isDone());
+            }});
+    }
+
+    // Wouldn't need to be integration if grace period were configurable.
+    @Test(groups = "Integration")
+    public void testSocketResultIgnoredIfGracePeriodExpiresAfterFirstResultAvailable() {
+        reachabilityResults.put(socket1, false);
+        reachabilityResults.put(socket2, true);
+
+        final Iterable<HostAndPort> actual = finder.findOpenSocketsOnNode(ImmutableList.of(socket1,
socket2), Duration.FIVE_MINUTES);
+        // Sleep through the grace period.
+        Time.sleep(Duration.seconds(10));
+        reachabilityResults.put(socket1, true);
+        assertEquals(actual, ImmutableList.of(socket2));
     }
     
     // Mark as integration, as can't rely (in Apache infra) for a port to stay unused during
test!


Mime
View raw message