aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jsir...@apache.org
Subject aurora git commit: Slim the `ServerSet` interface.
Date Wed, 06 Apr 2016 19:11:29 GMT
Repository: aurora
Updated Branches:
  refs/heads/master ba174ba38 -> 103dae687


Slim the `ServerSet` interface.

This makes the path to a Curator implementation to satisfy the contract
with `LeaderRedirect` simpler by introducing `ServiceGroupMonitor`
which hides server set change events, just exposing the current active
set via a query method.  This is all the `LeaderRedirect` needs since
it is driven by user-generated events (HTTP requests).

Bugs closed: AURORA-1468

Reviewed at https://reviews.apache.org/r/45770/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/103dae68
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/103dae68
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/103dae68

Branch: refs/heads/master
Commit: 103dae6871eaa76914ab7fe17adaa174e93f537a
Parents: ba174ba
Author: John Sirois <jsirois@apache.org>
Authored: Wed Apr 6 13:11:25 2016 -0600
Committer: John Sirois <john.sirois@gmail.com>
Committed: Wed Apr 6 13:11:25 2016 -0600

----------------------------------------------------------------------
 .../aurora/common/net/pool/DynamicHostSet.java  |  6 +-
 .../aurora/common/zookeeper/ServerSet.java      | 29 +------
 .../aurora/common/zookeeper/ServerSetImpl.java  | 38 ++-------
 .../common/zookeeper/ServerSetImplTest.java     | 25 ++----
 .../scheduler/app/ServiceDiscoveryModule.java   | 57 ++++++++++---
 .../scheduler/app/ServiceGroupMonitor.java      | 46 +++++++++++
 .../scheduler/http/JettyServerModule.java       |  7 +-
 .../aurora/scheduler/http/LeaderRedirect.java   | 87 ++++++++------------
 .../aurora/scheduler/app/SchedulerIT.java       |  3 +-
 .../scheduler/http/AbstractJettyTest.java       | 30 +++----
 .../scheduler/http/LeaderRedirectTest.java      | 58 +++++++------
 .../aurora/scheduler/thrift/ThriftIT.java       | 10 +--
 12 files changed, 203 insertions(+), 193 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
index 837d15c..df469ef 100644
--- a/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
+++ b/commons/src/main/java/org/apache/aurora/common/net/pool/DynamicHostSet.java
@@ -34,7 +34,7 @@ public interface DynamicHostSet<T> {
    * @return A command which, when executed, will stop monitoring the host set.
    * @throws MonitorException if there is a problem monitoring the host set
    */
-  Command watch(final HostChangeMonitor<T> monitor) throws MonitorException;
+  Command watch(HostChangeMonitor<T> monitor) throws MonitorException;
 
   /**
    * An interface to an object that is interested in receiving notification whenever the
host set
@@ -52,10 +52,6 @@ public interface DynamicHostSet<T> {
   }
 
   class MonitorException extends Exception {
-    public MonitorException(String msg) {
-      super(msg);
-    }
-
     public MonitorException(String msg, Throwable cause) {
       super(msg, cause);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
index fe6229e..6e32083 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSet.java
@@ -16,17 +16,13 @@ package org.apache.aurora.common.zookeeper;
 import java.net.InetSocketAddress;
 import java.util.Map;
 
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.zookeeper.Group.JoinException;
 
 /**
- * A logical set of servers registered in ZooKeeper.  Intended to be used by both servers
in a
- * common service and their clients.
- *
- * TODO(William Farner): Explore decoupling this from thrift.
+ * A logical set of servers registered in ZooKeeper.  Intended to be used by servers in a
+ * common service to advertise their presence to server-set protocol-aware clients.
  */
-public interface ServerSet extends DynamicHostSet<ServiceInstance> {
+public interface ServerSet {
   /**
    * Attempts to join a server set for this logical service group.
    *
@@ -42,21 +38,6 @@ public interface ServerSet extends DynamicHostSet<ServiceInstance>
{
       throws JoinException, InterruptedException;
 
   /**
-   * Attempts to join a server set for this logical service group.
-   *
-   * @param endpoint the primary service endpoint
-   * @param additionalEndpoints and additional endpoints keyed by their logical name
-   * @param shardId Unique shard identifier for this member of the service.
-   * @return an EndpointStatus object that allows the endpoint to adjust its status
-   * @throws JoinException if there was a problem joining the server set
-   * @throws InterruptedException if interrupted while waiting to join the server set
-   */
-  EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      int shardId) throws JoinException, InterruptedException;
-
-  /**
    * A handle to a service endpoint's status data that allows updating it to track current
events.
    */
   interface EndpointStatus {
@@ -75,9 +56,5 @@ public interface ServerSet extends DynamicHostSet<ServiceInstance>
{
     public UpdateException(String message, Throwable cause) {
       super(message, cause);
     }
-
-    public UpdateException(String message) {
-      super(message);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
----------------------------------------------------------------------
diff --git a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
index eca1351..8b385b8 100644
--- a/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
+++ b/commons/src/main/java/org/apache/aurora/common/zookeeper/ServerSetImpl.java
@@ -30,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicates;
 import com.google.common.base.Supplier;
@@ -50,6 +49,7 @@ import com.google.gson.Gson;
 
 import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.io.Codec;
+import org.apache.aurora.common.net.pool.DynamicHostSet;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.common.thrift.Status;
@@ -65,9 +65,9 @@ import org.slf4j.LoggerFactory;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
- * ZooKeeper-backed implementation of {@link ServerSet}.
+ * ZooKeeper-backed implementation of {@link ServerSet} and {@link DynamicHostSet}.
  */
-public class ServerSetImpl implements ServerSet {
+public class ServerSetImpl implements ServerSet, DynamicHostSet<ServiceInstance> {
   private static final Logger LOG = LoggerFactory.getLogger(ServerSetImpl.class);
 
   private final ZooKeeperClient zkClient;
@@ -134,31 +134,12 @@ public class ServerSetImpl implements ServerSet {
       Map<String, InetSocketAddress> additionalEndpoints)
       throws Group.JoinException, InterruptedException {
 
-    LOG.warn("Joining a ServerSet without a shard ID is deprecated and will soon break.");
-    return join(endpoint, additionalEndpoints, Optional.<Integer>absent());
-  }
-
-  @Override
-  public EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      int shardId) throws Group.JoinException, InterruptedException {
-
-    return join(endpoint, additionalEndpoints, Optional.of(shardId));
-  }
-
-  private EndpointStatus join(
-      InetSocketAddress endpoint,
-      Map<String, InetSocketAddress> additionalEndpoints,
-      Optional<Integer> shardId) throws Group.JoinException, InterruptedException {
-
     checkNotNull(endpoint);
     checkNotNull(additionalEndpoints);
 
-    final MemberStatus memberStatus =
-        new MemberStatus(endpoint, additionalEndpoints, shardId);
+    MemberStatus memberStatus = new MemberStatus(endpoint, additionalEndpoints);
     Supplier<byte[]> serviceInstanceSupplier = memberStatus::serializeServiceInstance;
-    final Group.Membership membership = group.join(serviceInstanceSupplier);
+    Group.Membership membership = group.join(serviceInstanceSupplier);
 
     return () -> memberStatus.leave(membership);
   }
@@ -178,16 +159,13 @@ public class ServerSetImpl implements ServerSet {
   private class MemberStatus {
     private final InetSocketAddress endpoint;
     private final Map<String, InetSocketAddress> additionalEndpoints;
-    private final Optional<Integer> shardId;
 
     private MemberStatus(
         InetSocketAddress endpoint,
-        Map<String, InetSocketAddress> additionalEndpoints,
-        Optional<Integer> shardId) {
+        Map<String, InetSocketAddress> additionalEndpoints) {
 
       this.endpoint = endpoint;
       this.additionalEndpoints = additionalEndpoints;
-      this.shardId = shardId;
     }
 
     synchronized void leave(Group.Membership membership) throws UpdateException {
@@ -205,10 +183,6 @@ public class ServerSetImpl implements ServerSet {
           Maps.transformValues(additionalEndpoints, ServerSets.TO_ENDPOINT),
           Status.ALIVE);
 
-      if (shardId.isPresent()) {
-        serviceInstance.setShard(shardId.get());
-      }
-
       LOG.debug("updating endpoint data to:\n\t" + serviceInstance);
       try {
         return ServerSets.serializeServiceInstance(serviceInstance, codec);

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
----------------------------------------------------------------------
diff --git a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
index 56cc32d..37be70b 100644
--- a/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
+++ b/commons/src/test/java/org/apache/aurora/common/zookeeper/ServerSetImplTest.java
@@ -77,13 +77,12 @@ public class ServerSetImplTest extends BaseZooKeeperTest {
 
     ServerSetImpl server = createServerSet();
     ServerSet.EndpointStatus status = server.join(
-        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080),
0);
+        InetSocketAddress.createUnresolved("foo", 1234), makePortMap("http-admin", 8080));
 
     ServiceInstance serviceInstance = new ServiceInstance(
         new Endpoint("foo", 1234),
         ImmutableMap.of("http-admin", new Endpoint("foo", 8080)),
-        Status.ALIVE)
-        .setShard(0);
+        Status.ALIVE);
 
     assertChangeFired(serviceInstance);
 
@@ -166,34 +165,26 @@ public class ServerSetImplTest extends BaseZooKeeperTest {
     ServiceInstance instance1 = new ServiceInstance(
         new Endpoint("foo", 1000),
         ImmutableMap.of("http-admin1", new Endpoint("foo", 8080)),
-        Status.ALIVE)
-        .setShard(0);
+        Status.ALIVE);
     ServiceInstance instance2 = new ServiceInstance(
         new Endpoint("foo", 1001),
         ImmutableMap.of("http-admin2", new Endpoint("foo", 8081)),
-        Status.ALIVE)
-        .setShard(1);
+        Status.ALIVE);
     ServiceInstance instance3 = new ServiceInstance(
         new Endpoint("foo", 1002),
         ImmutableMap.of("http-admin3", new Endpoint("foo", 8082)),
-        Status.ALIVE)
-        .setShard(2);
+        Status.ALIVE);
 
-    server1.join(
-        InetSocketAddress.createUnresolved("foo", 1000),
-        server1Ports,
-        0);
+    server1.join(InetSocketAddress.createUnresolved("foo", 1000), server1Ports);
     assertEquals(ImmutableList.of(instance1), ImmutableList.copyOf(serverSetBuffer.take()));
 
     ServerSet.EndpointStatus status2 = server2.join(
         InetSocketAddress.createUnresolved("foo", 1001),
-        server2Ports,
-        1);
+        server2Ports);
     assertEquals(ImmutableList.of(instance1, instance2),
         ImmutableList.copyOf(serverSetBuffer.take()));
 
-    server3.join(
-        InetSocketAddress.createUnresolved("foo", 1002), server3Ports, 2);
+    server3.join(InetSocketAddress.createUnresolved("foo", 1002), server3Ports);
     assertEquals(ImmutableList.of(instance1, instance2, instance3),
         ImmutableList.copyOf(serverSetBuffer.take()));
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
index 240164f..73695cd 100644
--- a/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceDiscoveryModule.java
@@ -14,15 +14,19 @@
 package org.apache.aurora.scheduler.app;
 
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 
+import javax.inject.Inject;
 import javax.inject.Singleton;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 
+import org.apache.aurora.common.base.Command;
 import org.apache.aurora.common.net.pool.DynamicHostSet;
 import org.apache.aurora.common.thrift.ServiceInstance;
-import org.apache.aurora.common.zookeeper.ServerSet;
 import org.apache.aurora.common.zookeeper.ServerSetImpl;
 import org.apache.aurora.common.zookeeper.SingletonService;
 import org.apache.aurora.common.zookeeper.SingletonServiceImpl;
@@ -38,21 +42,54 @@ import static java.util.Objects.requireNonNull;
 /**
  * Binding module for utilities to advertise the network presence of the scheduler.
  */
-public class ServiceDiscoveryModule extends AbstractModule {
+class ServiceDiscoveryModule extends AbstractModule {
+
+  private static class ServerSetMonitor implements ServiceGroupMonitor {
+    private Optional<Command> closeCommand = Optional.empty();
+    private final DynamicHostSet<ServiceInstance> serverSet;
+    private final AtomicReference<ImmutableSet<ServiceInstance>> services =
+        new AtomicReference<>(ImmutableSet.of());
+
+    // NB: We only take a ServerSetImpl instead of a DynamicHostSet<ServiceInstance>
here to
+    // simplify binding.
+    @Inject
+    ServerSetMonitor(ServerSetImpl serverSet) {
+      this.serverSet = requireNonNull(serverSet);
+    }
+
+    @Override
+    public void start() throws MonitorException {
+      try {
+        closeCommand = Optional.of(serverSet.watch(services::set));
+      } catch (DynamicHostSet.MonitorException e) {
+        throw new MonitorException("Unable to watch scheduler host set.", e);
+      }
+    }
+
+    @Override
+    public void close() {
+      closeCommand.ifPresent(Command::execute);
+    }
+
+    @Override
+    public ImmutableSet<ServiceInstance> get() {
+      return services.get();
+    }
+  }
 
   private static final Logger LOG = LoggerFactory.getLogger(ServiceDiscoveryModule.class);
 
   private final String serverSetPath;
   private final Credentials zkCredentials;
 
-  public ServiceDiscoveryModule(String serverSetPath, Credentials zkCredentials) {
+  ServiceDiscoveryModule(String serverSetPath, Credentials zkCredentials) {
     this.serverSetPath = requireNonNull(serverSetPath);
     this.zkCredentials = requireNonNull(zkCredentials);
   }
 
   @Override
   protected void configure() {
-    // provider-only module.
+    bind(ServiceGroupMonitor.class).to(ServerSetMonitor.class).in(Singleton.class);
   }
 
   @Provides
@@ -68,22 +105,16 @@ public class ServiceDiscoveryModule extends AbstractModule {
 
   @Provides
   @Singleton
-  ServerSet provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) {
+  ServerSetImpl provideServerSet(ZooKeeperClient client, List<ACL> zooKeeperAcls) {
     return new ServerSetImpl(client, zooKeeperAcls, serverSetPath);
   }
 
-  @Provides
-  @Singleton
-  DynamicHostSet<ServiceInstance> provideSchedulerHostSet(ServerSet serverSet) {
-    // Used for a type re-binding of the serverset.
-    return serverSet;
-  }
-
+  // NB: We only take a ServerSetImpl instead of a ServerSet here to simplify binding.
   @Provides
   @Singleton
   SingletonService provideSingletonService(
       ZooKeeperClient client,
-      ServerSet serverSet,
+      ServerSetImpl serverSet,
       List<ACL> zookeeperAcls) {
 
     return new SingletonServiceImpl(

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
new file mode 100644
index 0000000..a1329fd
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/app/ServiceGroupMonitor.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.app;
+
+import java.io.Closeable;
+import java.util.function.Supplier;
+
+import com.google.common.collect.ImmutableSet;
+
+import org.apache.aurora.common.thrift.ServiceInstance;
+
+/**
+ * Monitors a service group's membership and supplies a live view of the most recent set.
+ */
+public interface ServiceGroupMonitor extends Supplier<ImmutableSet<ServiceInstance>>,
Closeable {
+
+  /**
+   * Indicates a problem initiating monitoring of a service group.
+   */
+  class MonitorException extends Exception {
+    public MonitorException(String message, Throwable cause) {
+      super(message, cause);
+    }
+  }
+
+  /**
+   * Starts monitoring the service group.
+   *
+   * When the service group membership no longer needs to be maintained, this monitor should
be
+   * {@link #close() closed}.
+   *
+   * @throws MonitorException if there is a problem initiating monitoring of the service
group.
+   */
+  void start() throws MonitorException;
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
index 5b5cde5..60667fc 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/JettyServerModule.java
@@ -13,6 +13,7 @@
  */
 package org.apache.aurora.scheduler.http;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.EnumSet;
@@ -64,8 +65,8 @@ import org.apache.aurora.common.net.http.handlers.ThreadStackPrinter;
 import org.apache.aurora.common.net.http.handlers.TimeSeriesDataSource;
 import org.apache.aurora.common.net.http.handlers.VarsHandler;
 import org.apache.aurora.common.net.http.handlers.VarsJsonHandler;
-import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.apache.aurora.scheduler.http.api.ApiModule;
 import org.apache.aurora.scheduler.http.api.security.HttpSecurityModule;
 import org.apache.aurora.scheduler.thrift.ThriftModule;
@@ -295,8 +296,8 @@ public class JettyServerModule extends AbstractModule {
     }
 
     @Override
-    protected void shutDown() {
-      // Nothing to do here - we await VM shutdown.
+    protected void shutDown() throws IOException {
+      redirector.close();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
index ef71290..3847fb8 100644
--- a/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
+++ b/src/main/java/org/apache/aurora/scheduler/http/LeaderRedirect.java
@@ -13,7 +13,8 @@
  */
 package org.apache.aurora.scheduler.http;
 
-import java.util.concurrent.atomic.AtomicReference;
+import java.io.Closeable;
+import java.io.IOException;
 
 import javax.inject.Inject;
 import javax.servlet.http.HttpServletRequest;
@@ -23,13 +24,11 @@ import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Atomics;
 
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
-import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,8 +38,9 @@ import static java.util.Objects.requireNonNull;
  * Redirect logic for finding the leading scheduler in the event that this process is not
the
  * leader.
  */
-public class LeaderRedirect {
-  public enum LeaderStatus {
+class LeaderRedirect implements Closeable {
+
+  enum LeaderStatus {
     /**
      * This instance is currently the leading scheduler.
      */
@@ -57,22 +57,15 @@ public class LeaderRedirect {
     NOT_LEADING,
   }
 
-  // TODO(wfarner): Should we tie this directly to the producer of the node (HttpModule?
 It seems
-  // like the right thing to do, but would introduce an otherwise unnecessary dependency.
-  @VisibleForTesting
-  static final String HTTP_PORT_NAME = "http";
-
   private static final Logger LOG = LoggerFactory.getLogger(LeaderRedirect.class);
 
   private final HttpService httpService;
-  private final DynamicHostSet<ServiceInstance> schedulers;
-
-  private final AtomicReference<ServiceInstance> leader = Atomics.newReference();
+  private final ServiceGroupMonitor serviceGroupMonitor;
 
   @Inject
-  LeaderRedirect(HttpService httpService, DynamicHostSet<ServiceInstance> schedulers)
{
+  LeaderRedirect(HttpService httpService, ServiceGroupMonitor serviceGroupMonitor) {
     this.httpService = requireNonNull(httpService);
-    this.schedulers = requireNonNull(schedulers);
+    this.serviceGroupMonitor = requireNonNull(serviceGroupMonitor);
   }
 
   /**
@@ -81,17 +74,19 @@ public class LeaderRedirect {
    * @throws MonitorException If monitoring failed to initialize.
    */
   public void monitor() throws MonitorException {
-    schedulers.watch(new SchedulerMonitor());
+    serviceGroupMonitor.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    serviceGroupMonitor.close();
   }
 
   private Optional<HostAndPort> getLeaderHttp() {
-    ServiceInstance leadingScheduler = leader.get();
-    if (leadingScheduler == null) {
-      return Optional.absent();
-    }
+    Optional<ServiceInstance> leadingScheduler = getLeader();
 
-    if (leadingScheduler.isSetServiceEndpoint()) {
-      Endpoint leaderHttp = leadingScheduler.getServiceEndpoint();
+    if (leadingScheduler.isPresent() && leadingScheduler.get().isSetServiceEndpoint())
{
+      Endpoint leaderHttp = leadingScheduler.get().getServiceEndpoint();
       if (leaderHttp != null && leaderHttp.isSetHost() && leaderHttp.isSetPort())
{
         return Optional.of(HostAndPort.fromParts(leaderHttp.getHost(), leaderHttp.getPort()));
       }
@@ -136,13 +131,13 @@ public class LeaderRedirect {
    * @return a {@code LeaderStatus} indicating whether there is an elected leader (and if
so, if
    * this instance is the leader).
    */
-  public LeaderStatus getLeaderStatus() {
-    ServiceInstance leadingScheduler = leader.get();
-    if (leadingScheduler == null) {
+  LeaderStatus getLeaderStatus() {
+    Optional<ServiceInstance> leadingScheduler = getLeader();
+    if (!leadingScheduler.isPresent()) {
       return LeaderStatus.NO_LEADER;
     }
 
-    if (!leadingScheduler.isSetServiceEndpoint()) {
+    if (!leadingScheduler.get().isSetServiceEndpoint()) {
       LOG.warn("Leader service instance seems to be incomplete: " + leadingScheduler);
       return LeaderStatus.NO_LEADER;
     }
@@ -164,7 +159,7 @@ public class LeaderRedirect {
    * @param req HTTP request.
    * @return An optional redirect destination to route the request to the leading scheduler.
    */
-  public Optional<String> getRedirectTarget(HttpServletRequest req) {
+  Optional<String> getRedirectTarget(HttpServletRequest req) {
     Optional<HostAndPort> redirectTarget = getRedirect();
     if (redirectTarget.isPresent()) {
       HostAndPort target = redirectTarget.get();
@@ -192,28 +187,18 @@ public class LeaderRedirect {
     }
   }
 
-  /**
-   * Monitor to track scheduler leader changes.
-   */
-  private class SchedulerMonitor implements HostChangeMonitor<ServiceInstance> {
-    @Override
-    public void onChange(ImmutableSet<ServiceInstance> hostSet) {
-      switch (hostSet.size()) {
-        case 0:
-          LOG.warn("No schedulers in host set, will not redirect despite not being leader.");
-          leader.set(null);
-          break;
-
-        case 1:
-          LOG.info("Found leader scheduler at " + hostSet);
-          leader.set(Iterables.getOnlyElement(hostSet));
-          break;
-
-        default:
-          LOG.error("Multiple schedulers detected, will not redirect: " + hostSet);
-          leader.set(null);
-          break;
-      }
+  private Optional<ServiceInstance> getLeader() {
+    ImmutableSet<ServiceInstance> hostSet = serviceGroupMonitor.get();
+    switch (hostSet.size()) {
+      case 0:
+        LOG.warn("No serviceGroupMonitor in host set, will not redirect despite not being
leader.");
+        return Optional.absent();
+      case 1:
+        LOG.info("Found leader scheduler at " + hostSet);
+        return Optional.of(Iterables.getOnlyElement(hostSet));
+      default:
+        LOG.error("Multiple serviceGroupMonitor detected, will not redirect: " + hostSet);
+        return Optional.absent();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 918a3da..5b77750 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -44,7 +44,6 @@ import org.apache.aurora.common.application.Lifecycle;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Data;
 import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.common.zookeeper.ServerSet;
 import org.apache.aurora.common.zookeeper.ServerSetImpl;
 import org.apache.aurora.common.zookeeper.ZooKeeperClient;
 import org.apache.aurora.common.zookeeper.ZooKeeperClient.Credentials;
@@ -229,7 +228,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
 
   private void awaitSchedulerReady() throws Exception {
     executor.submit(() -> {
-      ServerSet schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
+      ServerSetImpl schedulerService = new ServerSetImpl(zkClient, SERVERSET_PATH);
       final CountDownLatch schedulerReady = new CountDownLatch(1);
       schedulerService.watch(hostSet -> {
         if (!hostSet.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
index 19c8a1f..561b134 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/AbstractJettyTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.http;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.ServletContextListener;
 import javax.ws.rs.core.MediaType;
@@ -27,7 +28,6 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Module;
-import com.google.inject.TypeLiteral;
 import com.google.inject.util.Modules;
 import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.WebResource;
@@ -36,9 +36,6 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.api.json.JSONConfiguration;
 
 import org.apache.aurora.GuavaUtils.ServiceManagerIface;
-import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
 import org.apache.aurora.common.quantity.Amount;
 import org.apache.aurora.common.quantity.Time;
 import org.apache.aurora.common.stats.StatsProvider;
@@ -50,6 +47,7 @@ import org.apache.aurora.gen.ServerInfo;
 import org.apache.aurora.scheduler.AppStartup;
 import org.apache.aurora.scheduler.SchedulerServicesModule;
 import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.async.AsyncModule;
 import org.apache.aurora.scheduler.cron.CronJobManager;
 import org.apache.aurora.scheduler.http.api.GsonMessageBodyHandler;
@@ -63,12 +61,11 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.entities.IServerInfo;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.apache.aurora.scheduler.testing.FakeStatsProvider;
-import org.easymock.Capture;
 import org.junit.Before;
 
 import static org.apache.aurora.scheduler.http.JettyServerModule.makeServletContextListener;
-import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertNotNull;
 
 /**
@@ -81,7 +78,7 @@ public abstract class AbstractJettyTest extends EasyMockTest {
   private Injector injector;
   protected StorageTestUtil storage;
   protected HostAndPort httpServer;
-  private Capture<HostChangeMonitor<ServiceInstance>> schedulerWatcher;
+  private AtomicReference<ImmutableSet<ServiceInstance>> schedulers;
 
   /**
    * Subclasses should override with a module that configures the servlets they are testing.
@@ -95,8 +92,8 @@ public abstract class AbstractJettyTest extends EasyMockTest {
   @Before
   public void setUpBase() throws Exception {
     storage = new StorageTestUtil(this);
-    DynamicHostSet<ServiceInstance> schedulers =
-        createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
+
+    ServiceGroupMonitor serviceGroupMonitor = createMock(ServiceGroupMonitor.class);
 
     injector = Guice.createInjector(
         new StatsModule(),
@@ -122,7 +119,7 @@ public abstract class AbstractJettyTest extends EasyMockTest {
                     Amount.of(1L, Time.MILLISECONDS),
                     bindMock(BackoffStrategy.class),
                     RateLimiter.create(1000)));
-            bind(new TypeLiteral<DynamicHostSet<ServiceInstance>>() { }).toInstance(schedulers);
+            bind(ServiceGroupMonitor.class).toInstance(serviceGroupMonitor);
             bindMock(CronJobManager.class);
             bindMock(LockManager.class);
             bindMock(OfferManager.class);
@@ -136,17 +133,22 @@ public abstract class AbstractJettyTest extends EasyMockTest {
           }
         },
         new JettyServerModule(false));
-    schedulerWatcher = createCapture();
-    expect(schedulers.watch(capture(schedulerWatcher))).andReturn(createMock(Command.class));
+
+    schedulers = new AtomicReference<>(ImmutableSet.of());
+
+    serviceGroupMonitor.start();
+    expectLastCall();
+
+    expect(serviceGroupMonitor.get()).andAnswer(schedulers::get).anyTimes();
   }
 
   protected void setLeadingScheduler(String host, int port) {
-    schedulerWatcher.getValue().onChange(
+    schedulers.set(
         ImmutableSet.of(new ServiceInstance().setServiceEndpoint(new Endpoint(host, port))));
   }
 
   protected void unsetLeadingSchduler() {
-    schedulerWatcher.getValue().onChange(ImmutableSet.of());
+    schedulers.set(ImmutableSet.of());
   }
 
   protected void replayAndStart() {

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
index 3678266..a16058f 100644
--- a/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/http/LeaderRedirectTest.java
@@ -14,6 +14,7 @@
 package org.apache.aurora.scheduler.http;
 
 import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.servlet.http.HttpServletRequest;
 
@@ -23,19 +24,17 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.net.HostAndPort;
 
-import org.apache.aurora.common.net.pool.DynamicHostSet;
-import org.apache.aurora.common.net.pool.DynamicHostSet.HostChangeMonitor;
-import org.apache.aurora.common.net.pool.DynamicHostSet.MonitorException;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
 import org.apache.aurora.common.thrift.Endpoint;
 import org.apache.aurora.common.thrift.ServiceInstance;
-import org.easymock.Capture;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor.MonitorException;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.apache.aurora.scheduler.http.LeaderRedirect.LeaderStatus;
-import static org.easymock.EasyMock.capture;
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 
 public class LeaderRedirectTest extends EasyMockTest {
@@ -46,63 +45,74 @@ public class LeaderRedirectTest extends EasyMockTest {
       endpoint -> new ServiceInstance()
           .setServiceEndpoint(new Endpoint(endpoint.getHostText(), endpoint.getPort()));
 
-  private Capture<HostChangeMonitor<ServiceInstance>> monitorCapture;
-
+  private AtomicReference<ImmutableSet<ServiceInstance>> schedulers;
+  private ServiceGroupMonitor serviceGroupMonitor;
   private LeaderRedirect leaderRedirector;
 
   @Before
   public void setUp() throws MonitorException {
-    DynamicHostSet<ServiceInstance> schedulers =
-        createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
+    schedulers = new AtomicReference<>(ImmutableSet.of());
+    serviceGroupMonitor = createMock(ServiceGroupMonitor.class);
 
     HttpService http = createMock(HttpService.class);
     expect(http.getAddress()).andStubReturn(HostAndPort.fromParts("localhost", HTTP_PORT));
 
-    leaderRedirector = new LeaderRedirect(http, schedulers);
-
-    monitorCapture = new Capture<>();
-    expect(schedulers.watch(capture(monitorCapture))).andReturn(null);
+    leaderRedirector = new LeaderRedirect(http, serviceGroupMonitor);
   }
 
-  private void replayAndMonitor() throws Exception {
+  private void replayAndMonitor(int expectedGetCalls) throws Exception {
+    serviceGroupMonitor.start();
+    expectLastCall();
+
+    expect(serviceGroupMonitor.get()).andAnswer(() -> schedulers.get()).times(expectedGetCalls);
+
     control.replay();
     leaderRedirector.monitor();
   }
 
   @Test
   public void testLeader() throws Exception {
-    replayAndMonitor();
+    replayAndMonitor(3);
     publishSchedulers(localPort(HTTP_PORT));
 
     assertEquals(Optional.absent(), leaderRedirector.getRedirect());
+
+    // NB: LEADING takes 2 tests of the server group membership to calculate; thus we expect
3
+    // server group get calls, 1 for the getRedirect() above and 2 here.
     assertEquals(LeaderStatus.LEADING, leaderRedirector.getLeaderStatus());
   }
 
   @Test
   public void testNotLeader() throws Exception {
-    replayAndMonitor();
+    replayAndMonitor(3);
 
     HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT);
     publishSchedulers(remote);
 
     assertEquals(Optional.of(remote), leaderRedirector.getRedirect());
+
+    // NB: NOT_LEADING takes 2 tests of the server group membership to calculate; thus we
expect 3
+    // server group get calls, 1 for the getRedirect() above and 2 here.
     assertEquals(LeaderStatus.NOT_LEADING, leaderRedirector.getLeaderStatus());
   }
 
   @Test
   public void testLeaderOnSameHost() throws Exception {
-    replayAndMonitor();
+    replayAndMonitor(3);
 
     HostAndPort local = localPort(555);
     publishSchedulers(local);
 
     assertEquals(Optional.of(local), leaderRedirector.getRedirect());
+
+    // NB: NOT_LEADING takes 2 tests of the server group membership to calculate; thus we
expect 3
+    // server group get calls, 1 for the getRedirect() above and 2 here.
     assertEquals(LeaderStatus.NOT_LEADING, leaderRedirector.getLeaderStatus());
   }
 
   @Test
   public void testNoLeaders() throws Exception {
-    replayAndMonitor();
+    replayAndMonitor(2);
 
     assertEquals(Optional.absent(), leaderRedirector.getRedirect());
     assertEquals(LeaderStatus.NO_LEADER, leaderRedirector.getLeaderStatus());
@@ -110,7 +120,7 @@ public class LeaderRedirectTest extends EasyMockTest {
 
   @Test
   public void testMultipleLeaders() throws Exception {
-    replayAndMonitor();
+    replayAndMonitor(2);
 
     publishSchedulers(HostAndPort.fromParts("foobar", 500), HostAndPort.fromParts("baz",
800));
 
@@ -120,7 +130,7 @@ public class LeaderRedirectTest extends EasyMockTest {
 
   @Test
   public void testBadServiceInstance() throws Exception {
-    replayAndMonitor();
+    replayAndMonitor(2);
 
     publishSchedulers(ImmutableSet.of(new ServiceInstance()));
 
@@ -143,7 +153,7 @@ public class LeaderRedirectTest extends EasyMockTest {
   public void testRedirectTargetNoAttribute() throws Exception {
     HttpServletRequest mockRequest = mockRequest(null, null);
 
-    replayAndMonitor();
+    replayAndMonitor(1);
 
     HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT);
     publishSchedulers(remote);
@@ -157,7 +167,7 @@ public class LeaderRedirectTest extends EasyMockTest {
   public void testRedirectTargetWithAttribute() throws Exception {
     HttpServletRequest mockRequest = mockRequest("/the/original/path", null);
 
-    replayAndMonitor();
+    replayAndMonitor(1);
 
     HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT);
     publishSchedulers(remote);
@@ -171,7 +181,7 @@ public class LeaderRedirectTest extends EasyMockTest {
   public void testRedirectTargetQueryString() throws Exception {
     HttpServletRequest mockRequest = mockRequest(null, "bar=baz");
 
-    replayAndMonitor();
+    replayAndMonitor(1);
 
     HostAndPort remote = HostAndPort.fromParts("foobar", HTTP_PORT);
     publishSchedulers(remote);
@@ -187,7 +197,7 @@ public class LeaderRedirectTest extends EasyMockTest {
   }
 
   private void publishSchedulers(ImmutableSet<ServiceInstance> instances) {
-    monitorCapture.getValue().onChange(instances);
+    schedulers.set(instances);
   }
 
   private static HostAndPort localPort(int port) {

http://git-wip-us.apache.org/repos/asf/aurora/blob/103dae68/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
index a39226c..d5648c9 100644
--- a/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/thrift/ThriftIT.java
@@ -24,13 +24,10 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.google.inject.Key;
 import com.google.inject.Provides;
-import com.google.inject.TypeLiteral;
 
 import org.apache.aurora.common.application.ShutdownStage;
 import org.apache.aurora.common.base.Command;
-import org.apache.aurora.common.net.pool.DynamicHostSet;
 import org.apache.aurora.common.testing.easymock.EasyMockTest;
-import org.apache.aurora.common.thrift.ServiceInstance;
 import org.apache.aurora.gen.AuroraAdmin;
 import org.apache.aurora.gen.Container;
 import org.apache.aurora.gen.Container._Fields;
@@ -46,6 +43,7 @@ import org.apache.aurora.gen.TaskQuery;
 import org.apache.aurora.scheduler.TierModule;
 import org.apache.aurora.scheduler.app.AppModule;
 import org.apache.aurora.scheduler.app.LifecycleModule;
+import org.apache.aurora.scheduler.app.ServiceGroupMonitor;
 import org.apache.aurora.scheduler.app.local.FakeNonVolatileStorage;
 import org.apache.aurora.scheduler.base.TaskTestUtil;
 import org.apache.aurora.scheduler.configuration.ConfigurationManager.ConfigurationManagerSettings;
@@ -104,9 +102,9 @@ public class ThriftIT extends EasyMockTest {
 
             bind(NonVolatileStorage.class).to(FakeNonVolatileStorage.class);
 
-            DynamicHostSet<ServiceInstance> schedulers =
-                createMock(new Clazz<DynamicHostSet<ServiceInstance>>() { });
-            bind(new TypeLiteral<DynamicHostSet<ServiceInstance>>() { }).toInstance(schedulers);
+            ServiceGroupMonitor schedulers = createMock(ServiceGroupMonitor.class);
+            bind(ServiceGroupMonitor.class).toInstance(schedulers);
+
             bindMock(DriverFactory.class);
             bind(DriverSettings.class).toInstance(new DriverSettings(
                 "fakemaster",


Mime
View raw message