brooklyn-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sjcorbett <...@git.apache.org>
Subject [GitHub] brooklyn-server pull request #497: Add findOpenSocketsOnNode to ReachableSoc...
Date Tue, 24 Jan 2017 14:49:31 GMT
Github user sjcorbett commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/497#discussion_r97561966
  
    --- Diff: utils/common/src/main/java/org/apache/brooklyn/util/net/ReachableSocketFinder.java
---
    @@ -69,86 +65,113 @@ public ReachableSocketFinder(Predicate<? super HostAndPort>
socketTester, Listen
         }
     
         /**
    -     * 
    +     * Returns the first element of sockets that is reachable.
          * @param sockets The host-and-ports to test
          * @param timeout Max time to try to connect to the ip:port
          * 
          * @return The reachable ip:port
    -     * @throws NoSuchElementException If no ports accessible within the given time
    -     * @throws NullPointerException  If the sockets or duration is null
    +     * @throws NoSuchElementException If no ports are accessible within the given time
    +     * @throws NullPointerException  If sockets or timeout is null
          * @throws IllegalStateException  If the sockets to test is empty
          */
    -    public HostAndPort findOpenSocketOnNode(final Collection<? extends HostAndPort>
sockets, Duration timeout) {
    +    public HostAndPort findOpenSocketOnNode(final Iterable<? extends HostAndPort>
sockets, Duration timeout) {
             checkNotNull(sockets, "sockets");
    -        checkState(sockets.size() > 0, "No hostAndPort sockets supplied");
    -        
    +        checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets supplied");
    +        checkNotNull(timeout, "timeout");
             LOG.debug("blocking on any reachable socket in {} for {}", sockets, timeout);
    -
    -        final AtomicReference<HostAndPort> result = new AtomicReference<HostAndPort>();
    -        boolean passed = Repeater.create("socket-reachable")
    -                .limitTimeTo(timeout)
    -                .backoffTo(Duration.FIVE_SECONDS)
    -                .until(new Callable<Boolean>() {
    -                        @Override
    -                        public Boolean call() {
    -                            Optional<HostAndPort> reachableSocket = tryReachable(sockets,
Duration.FIVE_SECONDS);
    -                            if (reachableSocket.isPresent()) {
    -                                result.compareAndSet(null, reachableSocket.get());
    -                                return true;
    -                            }
    -                            return false;
    -                        }})
    -                .run();
    -
    -        if (passed) {
    -            LOG.debug("<< socket {} opened", result);
    -            assert result.get() != null;
    -            return result.get();
    +        Iterator<HostAndPort> iter = findOpenSocketsOnNode(sockets, timeout).iterator();
    +        if (iter.hasNext()) {
    +            return iter.next();
             } else {
                 LOG.warn("No sockets in {} reachable after {}", sockets, timeout);
                 throw new NoSuchElementException("could not connect to any socket in " +
sockets);
             }
         }
     
         /**
    -     * Checks if any any of the given HostAndPorts are reachable. It checks them all
concurrently, and
    -     * returns the first that is reachable (or Optional.absent).
    +     * Returns an iterable of the elements in sockets that are reachable. The order of
elements
    +     * in the iterable corresponds to the order of the elements in the input, not the
order in which their
    +     * reachability was determined. Iterators are unmodifiable and are evaluated lazily.
    +     *
    +     * @param sockets The host-and-ports to test
    +     * @param timeout Max time to try to connect to each ip:port
    +     * @return An iterable containing all sockets that are reachable according to {@link
#socketTester}.
    +     * @throws NullPointerException  If sockets or timeout is null
    +     * @throws IllegalStateException  If the sockets to test is empty
          */
    -    private Optional<HostAndPort> tryReachable(Collection<? extends HostAndPort>
sockets, Duration timeout) {
    -        final AtomicReference<HostAndPort> reachableSocket = new AtomicReference<HostAndPort>();
    -        final CountDownLatch latch = new CountDownLatch(1);
    -        List<ListenableFuture<?>> futures = Lists.newArrayList();
    +    public Iterable<HostAndPort> findOpenSocketsOnNode(final Iterable<? extends
HostAndPort> sockets, Duration timeout) {
    +        checkNotNull(sockets, "sockets");
    +        checkState(!Iterables.isEmpty(sockets), "No hostAndPort sockets supplied");
    +        checkNotNull(timeout, "timeout");
    +        return Optional.presentInstances(tryReachable(sockets, timeout));
    +    }
    +
    +    /**
    +     * @return A lazily computed Iterable containing present values for the elements
of sockets that are
    +     * reachable according to {@link #socketTester} and absent values for those not.
Checks are concurrent
    +     * and the elements in the Iterable are ordered according to their position in sockets.
    +     */
    +    private Iterable<Optional<HostAndPort>> tryReachable(Iterable<? extends
HostAndPort> sockets, final Duration timeout) {
    +        final List<ListenableFuture<Optional<HostAndPort>>> futures
= Lists.newArrayList();
    +        final AtomicReference<Stopwatch> sinceFirstCompleted = new AtomicReference<>();
    +
             for (final HostAndPort socket : sockets) {
    -            futures.add(userExecutor.submit(new Runnable() {
    +            futures.add(userExecutor.submit(new Callable<Optional<HostAndPort>>()
{
    +                @Override
    +                public Optional<HostAndPort> call() {
    +                    // Whether the socket was reachable (vs. the result of call, which
is whether the repeater is done).
    +                    final AtomicBoolean theResultWeCareAbout = new AtomicBoolean();
    +                    Repeater.create("socket-reachable")
    +                            .limitTimeTo(timeout)
    +                            .backoffTo(Duration.FIVE_SECONDS)
    +                            .until(new Callable<Boolean>() {
    +                                @Override
    +                                public Boolean call() throws TimeoutException {
    +                                    boolean reachable = socketTester.apply(socket);
    +                                    if (reachable) {
    +                                        theResultWeCareAbout.set(true);
    +                                        return true;
    +                                    } else {
    +                                        // Run another check if nobody else has completed
yet or another task has
    +                                        // completed but this one is still in its grace
period.
    +                                        Stopwatch timerSinceFirst = sinceFirstCompleted.get();
    +                                        return timerSinceFirst != null && Duration.FIVE_SECONDS.subtract(Duration.of(timerSinceFirst)).isNegative();
    +                                    }
    +                                }
    +                            })
    +                            .run();
    +                    if (theResultWeCareAbout.get()) {
    +                        sinceFirstCompleted.compareAndSet(null, Stopwatch.createStarted());
    +                    }
    +                    return theResultWeCareAbout.get() ? Optional.of(socket) : Optional.<HostAndPort>absent();
    +                }
    +            }));
    +        }
    +
    +        return new Iterable<Optional<HostAndPort>>() {
    +            @Override
    +            public Iterator<Optional<HostAndPort>> iterator() {
    +                return new AbstractIterator<Optional<HostAndPort>>() {
    +                    int count = 0;
    +
                         @Override
    -                    public void run() {
    -                        try {
    -                            if (socketTester.apply(socket)) {
    -                                reachableSocket.compareAndSet(null, socket);
    -                                latch.countDown();
    +                    protected Optional<HostAndPort> computeNext() {
    +                        if (count < futures.size()) {
    +                            final Future<Optional<HostAndPort>> future =
futures.get(count++);
    +                            try {
    +                                return future.get(timeout.toUnit(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
    +                            } catch (Exception e) {
    +                                Exceptions.propagateIfInterrupt(e);
    --- End diff --
    
    `e` is an `Exception` rather than a `Throwable` so I don't think this is a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message