bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [5/5] bookkeeper git commit: BOOKKEEPER-612: Region aware placement
Date Thu, 13 Oct 2016 05:51:09 GMT
BOOKKEEPER-612: Region aware placement

- Introduce the concept of a two level Network Topology with region as the first level and rack as the second level
- NodeBase, Node and NetworkTopology manage this two level hierarchy and position of individual nodes in this hierarchy
- An implementation of RegionawareEnsemblePlacementPolicy that distributes nodes across regions and within regions uses rack aware placement to place nodes

This is a stacked diff (opening to start a review), we would still merge the dependent pull request first.

Author: Robin Dhamankar <robindh@Robins-MacBook-Air.local>

Reviewers: Sijie Guo <sijie@apache.org>

Closes #56 from robindh/RegionAwarePlacement


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/bbd1eb8d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/bbd1eb8d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/bbd1eb8d

Branch: refs/heads/master
Commit: bbd1eb8d8560b03834175fbd996b85237df09f5c
Parents: 9dc05fc
Author: Robin Dhamankar <robin.dhamankar@gmail.com>
Authored: Wed Oct 12 22:50:42 2016 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Wed Oct 12 22:50:42 2016 -0700

----------------------------------------------------------------------
 bookkeeper-server/pom.xml                       |    5 +
 .../LocalBookieEnsemblePlacementPolicy.java     |   37 +-
 .../apache/bookkeeper/client/BookKeeper.java    |   72 +-
 .../apache/bookkeeper/client/BookieWatcher.java |   17 +-
 .../client/DefaultEnsemblePlacementPolicy.java  |   45 +-
 .../client/EnsemblePlacementPolicy.java         |   65 +-
 .../ITopologyAwareEnsemblePlacementPolicy.java  |  127 ++
 .../bookkeeper/client/LedgerCreateOp.java       |    4 +-
 .../apache/bookkeeper/client/LedgerHandle.java  |    9 +-
 .../RackawareEnsemblePlacementPolicy.java       |  609 ++-------
 .../RackawareEnsemblePlacementPolicyImpl.java   |  554 ++++++++
 .../RegionAwareEnsemblePlacementPolicy.java     |  602 +++++++++
 .../TopologyAwareEnsemblePlacementPolicy.java   |  467 +++++++
 .../bookkeeper/conf/ClientConfiguration.java    |   72 +-
 .../net/AbstractDNSToSwitchMapping.java         |    2 +-
 .../bookkeeper/net/BookieSocketAddress.java     |    2 +-
 .../net/CachedDNSToSwitchMapping.java           |    1 -
 .../java/org/apache/bookkeeper/net/DNS.java     |    4 +
 .../bookkeeper/net/DNSToSwitchMapping.java      |    1 -
 .../org/apache/bookkeeper/net/NetUtils.java     |   26 +-
 .../apache/bookkeeper/net/NetworkTopology.java  |  861 +-----------
 .../bookkeeper/net/NetworkTopologyImpl.java     |  880 ++++++++++++
 .../java/org/apache/bookkeeper/net/Node.java    |    4 +-
 .../org/apache/bookkeeper/net/NodeBase.java     |   15 +-
 .../bookkeeper/net/ScriptBasedMapping.java      |    2 +-
 .../net/StabilizeNetworkTopology.java           |  154 +++
 .../proto/PerChannelBookieClient.java           |    2 +-
 .../bookkeeper/util/BookKeeperConstants.java    |    2 +
 .../TestRackawareEnsemblePlacementPolicy.java   |  388 ++++--
 .../TestRegionAwareEnsemblePlacementPolicy.java | 1262 ++++++++++++++++++
 .../bookkeeper/client/UpdateLedgerCmdTest.java  |    2 +-
 .../bookkeeper/client/UpdateLedgerOpTest.java   |    6 +-
 .../bookkeeper/util/StaticDNSResolver.java      |   10 +
 33 files changed, 4837 insertions(+), 1472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/pom.xml
----------------------------------------------------------------------
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index e47d0d3..bd143f1 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -84,6 +84,11 @@
       <scope>compile</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-lang3</artifactId>
+      <version>3.3.2</version>
+    </dependency>
+    <dependency>
       <groupId>commons-configuration</groupId>
       <artifactId>commons-configuration</artifactId>
       <version>1.6</version>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index 2b18029..508511b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -17,16 +17,21 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import com.google.common.base.Optional;
+
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Set;
+import java.util.*;
 
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.apache.commons.configuration.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,8 +48,7 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
     private BookieSocketAddress bookieAddress;
 
     @Override
-    public EnsemblePlacementPolicy initialize(Configuration conf) {
-
+    public EnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer hashedWheelTimer, FeatureProvider featureProvider, StatsLogger statsLogger) {
         // Configuration will have already the bookie configuration inserted
         ServerConfiguration serverConf = new ServerConfiguration();
         serverConf.addConfiguration(conf);
@@ -70,7 +74,22 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        throw new BKNotEnoughBookiesException();
+    }
+
+    @Override
+    public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        return null;
+    }
+
+    @Override
+    public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        return null;
+    }
+
+    @Override
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
             Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         if (ensembleSize > 1) {
             throw new IllegalArgumentException("Local ensemble policy can only return 1 bookie");
@@ -79,10 +98,4 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
         return Lists.newArrayList(bookieAddress);
     }
 
-    @Override
-    public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
-        throw new BKNotEnoughBookiesException();
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 75ab759..b683ca4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
 import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
@@ -36,11 +37,14 @@ import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
 import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeatureProvider;
 import org.apache.bookkeeper.meta.CleanupLedgerManager;
 import org.apache.bookkeeper.meta.LedgerIdGenerator;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -56,6 +60,7 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,6 +106,9 @@ public class BookKeeper implements AutoCloseable {
 
     final OrderedSafeExecutor mainWorkerPool;
     final ScheduledExecutorService scheduler;
+    final HashedWheelTimer requestTimer;
+    final boolean ownTimer;
+    final FeatureProvider featureProvider;
 
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
@@ -122,6 +130,9 @@ public class BookKeeper implements AutoCloseable {
         ZooKeeper zk = null;
         ClientSocketChannelFactory channelFactory = null;
         StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+        DNSToSwitchMapping dnsResolver = null;
+        HashedWheelTimer requestTimer = null;
+        FeatureProvider featureProvider = null;
 
         Builder(ClientConfiguration conf) {
             this.conf = conf;
@@ -142,9 +153,25 @@ public class BookKeeper implements AutoCloseable {
             return this;
         }
 
+
+        public Builder dnsResolver(DNSToSwitchMapping dnsResolver) {
+            this.dnsResolver = dnsResolver;
+            return this;
+        }
+
+        public Builder requestTimer(HashedWheelTimer requestTimer) {
+            this.requestTimer = requestTimer;
+            return this;
+        }
+
+        public Builder featureProvider(FeatureProvider featureProvider) {
+            this.featureProvider = featureProvider;
+            return this;
+        }
+
         public BookKeeper build() throws IOException, InterruptedException, KeeperException {
             Preconditions.checkNotNull(statsLogger, "No stats logger provided");
-            return new BookKeeper(conf, zk, channelFactory, statsLogger);
+            return new BookKeeper(conf, zk, channelFactory, statsLogger, dnsResolver, requestTimer, featureProvider);
         }
     }
 
@@ -183,7 +210,8 @@ public class BookKeeper implements AutoCloseable {
      */
     public BookKeeper(final ClientConfiguration conf)
             throws IOException, InterruptedException, KeeperException {
-        this(conf, null, null, NullStatsLogger.INSTANCE);
+        this(conf, null, null, NullStatsLogger.INSTANCE,
+                null, null, null);
     }
 
     private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException {
@@ -240,7 +268,8 @@ public class BookKeeper implements AutoCloseable {
      */
     public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
             throws IOException, InterruptedException, KeeperException {
-        this(conf, validateZooKeeper(zk), validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE);
+        this(conf, validateZooKeeper(zk), validateChannelFactory(channelFactory), NullStatsLogger.INSTANCE,
+                null, null, null);
     }
 
     /**
@@ -249,7 +278,10 @@ public class BookKeeper implements AutoCloseable {
     private BookKeeper(ClientConfiguration conf,
                        ZooKeeper zkc,
                        ClientSocketChannelFactory channelFactory,
-                       StatsLogger statsLogger)
+                       StatsLogger statsLogger,
+                       DNSToSwitchMapping dnsResolver,
+                       HashedWheelTimer requestTimer,
+                       FeatureProvider featureProvider)
             throws IOException, InterruptedException, KeeperException {
         this.conf = conf;
 
@@ -286,6 +318,23 @@ public class BookKeeper implements AutoCloseable {
             this.ownChannelFactory = false;
         }
 
+        if (null == requestTimer) {
+            this.requestTimer = new HashedWheelTimer(
+                    new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(),
+                    conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS,
+                    conf.getTimeoutTimerNumTicks());
+            this.ownTimer = true;
+        } else {
+            this.requestTimer = requestTimer;
+            this.ownTimer = false;
+        }
+
+        if (null == featureProvider) {
+            this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
+        } else {
+            this.featureProvider = featureProvider;
+        }
+
         // initialize scheduler
         ThreadFactoryBuilder tfb = new ThreadFactoryBuilder().setNameFormat(
                 "BookKeeperClientScheduler-%d");
@@ -297,7 +346,8 @@ public class BookKeeper implements AutoCloseable {
         initOpLoggers(this.statsLogger);
 
         // initialize the ensemble placement
-        this.placementPolicy = initializeEnsemblePlacementPolicy(conf);
+        this.placementPolicy = initializeEnsemblePlacementPolicy(conf,
+                dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger);
 
         // initialize main worker pool
         this.mainWorkerPool = OrderedSafeExecutor.newBuilder()
@@ -321,11 +371,16 @@ public class BookKeeper implements AutoCloseable {
         scheduleBookieHealthCheckIfEnabled();
     }
 
-    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf)
+    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf,
+                                                                      DNSToSwitchMapping dnsResolver,
+                                                                      HashedWheelTimer timer,
+                                                                      FeatureProvider featureProvider,
+                                                                      StatsLogger statsLogger)
         throws IOException {
         try {
             Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
-            return ReflectionUtils.newInstance(policyCls).initialize(conf);
+            return ReflectionUtils.newInstance(policyCls).initialize(conf, Optional.fromNullable(dnsResolver),
+                    timer, featureProvider, statsLogger);
         } catch (ConfigurationException e) {
             throw new IOException("Failed to initialize ensemble placement policy : ", e);
         }
@@ -1001,6 +1056,9 @@ public class BookKeeper implements AutoCloseable {
             LOG.warn("The mainWorkerPool did not shutdown cleanly");
         }
 
+        if (ownTimer) {
+            requestTimer.stop();
+        }
         if (ownChannelFactory) {
             channelFactory.releaseExternalResources();
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index 1e49cf8..b8d8951 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -255,17 +255,18 @@ class BookieWatcher implements Watcher, ChildrenCallback {
      * @return list of bookies for new ensemble.
      * @throws BKNotEnoughBookiesException
      */
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize)
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize)
             throws BKNotEnoughBookiesException {
         try {
             // we try to only get from the healthy bookies first
-            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, new HashSet<BookieSocketAddress>(
+            return placementPolicy.newEnsemble(ensembleSize,
+                    writeQuorumSize, ackQuorumSize, new HashSet<BookieSocketAddress>(
                     quarantinedBookies.asMap().keySet()));
         } catch (BKNotEnoughBookiesException e) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, EMPTY_SET);
+            return placementPolicy.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, EMPTY_SET);
         }
     }
 
@@ -278,19 +279,23 @@ class BookieWatcher implements Watcher, ChildrenCallback {
      * @return the bookie to replace.
      * @throws BKNotEnoughBookiesException
      */
-    public BookieSocketAddress replaceBookie(List<BookieSocketAddress> existingBookies, int bookieIdx)
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                             List<BookieSocketAddress> existingBookies, int bookieIdx,
+                                             Set<BookieSocketAddress> excludeBookies)
             throws BKNotEnoughBookiesException {
         BookieSocketAddress addr = existingBookies.get(bookieIdx);
         try {
             // we exclude the quarantined bookies also first
             Set<BookieSocketAddress> existingAndQuarantinedBookies = new HashSet<BookieSocketAddress>(existingBookies);
             existingAndQuarantinedBookies.addAll(quarantinedBookies.asMap().keySet());
-            return placementPolicy.replaceBookie(addr, existingAndQuarantinedBookies);
+            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    existingAndQuarantinedBookies, addr, excludeBookies);
         } catch (BKNotEnoughBookiesException e) {
             if (logger.isDebugEnabled()) {
                 logger.debug("Not enough healthy bookies available, using quarantined bookies");
             }
-            return placementPolicy.replaceBookie(addr, new HashSet<BookieSocketAddress>(existingBookies));
+            return placementPolicy.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    new HashSet<BookieSocketAddress>(existingBookies), addr, excludeBookies);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 5f2d2c3..640bdb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -18,14 +18,22 @@
 package org.apache.bookkeeper.client;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Optional;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.commons.configuration.Configuration;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
 
 /**
  * Default Ensemble Placement Policy, which picks bookies randomly
@@ -37,7 +45,7 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     private Set<BookieSocketAddress> knownBookies = new HashSet<BookieSocketAddress>();
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize,
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize,
             Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
         ArrayList<BookieSocketAddress> newBookies = new ArrayList<BookieSocketAddress>(ensembleSize);
         if (ensembleSize <= 0) {
@@ -62,9 +70,11 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
-        ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, excludeBookies);
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble,
+                                           BookieSocketAddress bookieToReplace,
+                                           Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
+        excludeBookies.addAll(currentEnsemble);
+        ArrayList<BookieSocketAddress> addresses = newEnsemble(1, 1, 1, excludeBookies);
         return addresses.get(0);
     }
 
@@ -81,7 +91,29 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     }
 
     @Override
-    public EnsemblePlacementPolicy initialize(Configuration conf) {
+    public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        return writeSet;
+    }
+
+    @Override
+    public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        List<Integer> retList = new ArrayList<Integer>(writeSet);
+        if (retList.size() < ensemble.size()) {
+            for (int i = 0; i < ensemble.size(); i++) {
+                if (!retList.contains(i)) {
+                    retList.add(i);
+                }
+            }
+        }
+        return retList;
+    }
+
+    @Override
+    public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
+                                              Optional<DNSToSwitchMapping> optionalDnsResolver,
+                                              HashedWheelTimer timer,
+                                              FeatureProvider featureProvider,
+                                              StatsLogger statsLogger) {
         return this;
     }
 
@@ -89,5 +121,4 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     public void uninitalize() {
         // do nothing
     }
-
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index a1d8ce3..2af8108 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -18,11 +18,20 @@
 package org.apache.bookkeeper.client;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Optional;
+
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.commons.configuration.Configuration;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
 
 /**
  * Encapsulation of the algorithm that selects a number of bookies from the cluster as an ensemble for storing
@@ -33,11 +42,17 @@ public interface EnsemblePlacementPolicy {
     /**
      * Initialize the policy.
      *
-     * @param conf
-     *          client configuration.
-     * @return initialized ensemble placement policy
+     * @param conf client configuration
+     * @param optionalDnsResolver dns resolver
+     * @param hashedWheelTimer timer
+     * @param featureProvider feature provider
+     * @param statsLogger stats logger
      */
-    public EnsemblePlacementPolicy initialize(Configuration conf);
+    public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
+                                              Optional<DNSToSwitchMapping> optionalDnsResolver,
+                                              HashedWheelTimer hashedWheelTimer,
+                                              FeatureProvider featureProvider,
+                                              StatsLogger statsLogger);
 
     /**
      * Uninitialize the policy
@@ -55,7 +70,7 @@ public interface EnsemblePlacementPolicy {
      * @return the dead bookies during this cluster change.
      */
     public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
-            Set<BookieSocketAddress> readOnlyBookies);
+                                                     Set<BookieSocketAddress> readOnlyBookies);
 
     /**
      * Choose <i>numBookies</i> bookies for ensemble. If the count is more than the number of available
@@ -70,8 +85,8 @@ public interface EnsemblePlacementPolicy {
      * @return list of bookies chosen as targets.
      * @throws BKNotEnoughBookiesException if not enough bookies available.
      */
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                                      Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
 
     /**
      * Choose a new bookie to replace <i>bookieToReplace</i>. If no bookie available in the cluster,
@@ -84,6 +99,36 @@ public interface EnsemblePlacementPolicy {
      * @return the bookie chosen as target.
      * @throws BKNotEnoughBookiesException
      */
-    public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                             Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
+                                             Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException;
+
+    /**
+     * Reorder the read sequence of a given write quorum <i>writeSet</i>.
+     *
+     * @param ensemble
+     *          Ensemble to read entries.
+     * @param writeSet
+     *          Write quorum to read entries.
+     * @param bookieFailureHistory
+     *          Observed failures on the bookies
+     * @return read sequence of bookies
+     */
+    public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble,
+                                             List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory);
+
+
+    /**
+     * Reorder the read last add confirmed sequence of a given write quorum <i>writeSet</i>.
+     *
+     * @param ensemble
+     *          Ensemble to read entries.
+     * @param writeSet
+     *          Write quorum to read entries.
+     * @param bookieFailureHistory
+     *          Observed failures on the bookies
+     * @return read sequence of bookies
+     */
+    public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble,
+                                                List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory);
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
new file mode 100644
index 0000000..535fffe
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ITopologyAwareEnsemblePlacementPolicy.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.Node;
+
+import java.util.ArrayList;
+import java.util.Set;
+
+/**
+ * Interface for topology aware ensemble placement policy.
+ */
+public interface ITopologyAwareEnsemblePlacementPolicy<T extends Node> extends EnsemblePlacementPolicy {
+    /**
+     * Predicate used when choosing an ensemble.
+     */
+    public static interface Predicate<T extends Node> {
+        boolean apply(T candidate, Ensemble<T> chosenBookies);
+    }
+
+    /**
+     * Ensemble used to hold the result of an ensemble selected for placement.
+     */
+    public static interface Ensemble<T extends Node> {
+
+        /**
+         * Append the new bookie node to the ensemble only if the ensemble doesnt
+         * already contain the same bookie
+         *
+         * @param node
+         *          new candidate bookie node.
+         * @return
+         *          true if the node was added
+         */
+        public boolean addNode(T node);
+
+        /**
+         * @return list of addresses representing the ensemble
+         */
+        public ArrayList<BookieSocketAddress> toList();
+
+        /**
+         * Validates if an ensemble is valid
+         *
+         * @return true if the ensemble is valid; false otherwise
+         */
+        public boolean validate();
+
+    }
+
+    /**
+     * Create an ensemble with parent ensemble.
+     *
+     * @param ensembleSize
+     *          ensemble size
+     * @param writeQuorumSize
+     *          write quorum size
+     * @param ackQuorumSize
+     *          ack quorum size
+     * @param excludeBookies
+     *          exclude bookies
+     * @param parentEnsemble
+     *          parent ensemble
+     * @return list of bookies forming the ensemble
+     * @throws BKException.BKNotEnoughBookiesException
+     */
+    ArrayList<BookieSocketAddress> newEnsemble(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieSocketAddress> excludeBookies,
+            Ensemble<T> parentEnsemble,
+            Predicate<T> parentPredicate)
+            throws BKException.BKNotEnoughBookiesException;
+
+    /**
+     * Select a node from a given network location.
+     *
+     * @param networkLoc
+     *          network location
+     * @param excludeBookies
+     *          exclude bookies set
+     * @param predicate
+     *          predicate to apply
+     * @param ensemble
+     *          ensemble
+     * @return the selected bookie.
+     * @throws BKException.BKNotEnoughBookiesException
+     */
+    T selectFromNetworkLocation(String networkLoc,
+                                Set<Node> excludeBookies,
+                                Predicate<T> predicate,
+                                Ensemble<T> ensemble)
+            throws BKException.BKNotEnoughBookiesException;
+
+    /**
+     * Handle bookies that left.
+     *
+     * @param leftBookies
+     *          bookies that left
+     */
+    void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies);
+
+    /**
+     * Handle bookies that joined
+     *
+     * @param joinedBookies
+     *          bookies that joined.
+     */
+    void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies);
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
index 3626ce0..e88df31 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java
@@ -105,7 +105,9 @@ class LedgerCreateOp implements GenericCallback<Void> {
         ArrayList<BookieSocketAddress> ensemble;
         try {
             ensemble = bk.bookieWatcher
-                    .newEnsemble(metadata.getEnsembleSize(), metadata.getWriteQuorumSize());
+                    .newEnsemble(metadata.getEnsembleSize(),
+                            metadata.getWriteQuorumSize(),
+                            metadata.getAckQuorumSize());
         } catch (BKNotEnoughBookiesException e) {
             LOG.error("Not enough bookies to create ledger");
             createComplete(e.getCode(), null);

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
index 3c8d475..06f84eb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java
@@ -26,6 +26,7 @@ import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Enumeration;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -1005,9 +1006,13 @@ public class LedgerHandle implements AutoCloseable {
 
         // avoid parallel ensemble changes to same ensemble.
         synchronized (metadata) {
-            newBookie = bk.bookieWatcher.replaceBookie(metadata.currentEnsemble, bookieIndex);
-
             newEnsemble.addAll(metadata.currentEnsemble);
+            newBookie = bk.bookieWatcher.replaceBookie(metadata.getEnsembleSize(),
+                    metadata.getWriteQuorumSize(),
+                    metadata.getAckQuorumSize(), newEnsemble,
+                    bookieIndex, new HashSet<>(Arrays.asList(addr)));
+
+
             newEnsemble.set(bookieIndex, newBookie);
 
             if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index 7b15d9e..f42e42a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -17,556 +17,177 @@
  */
 package org.apache.bookkeeper.client;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
-import org.apache.bookkeeper.conf.Configurable;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
-import org.apache.bookkeeper.net.NetworkTopology;
 import org.apache.bookkeeper.net.Node;
-import org.apache.bookkeeper.net.NodeBase;
-import org.apache.bookkeeper.net.ScriptBasedMapping;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.Configuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.jboss.netty.util.HashedWheelTimer;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicyImpl
+        implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> {
 
-/**
- * Simple rackware ensemble placement policy.
- *
- * Make most of the class and methods as protected, so it could be extended to implement other algorithms.
- */
-public class RackawareEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
-
-    static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicy.class);
-
-    public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
-
-    /**
-     * Predicate used when choosing an ensemble.
-     */
-    protected static interface Predicate {
-        boolean apply(BookieNode candidate, Ensemble chosenBookies);
-    }
-
-    /**
-     * Ensemble used to hold the result of an ensemble selected for placement.
-     */
-    protected static interface Ensemble {
-
-        /**
-         * Append the new bookie node to the ensemble.
-         *
-         * @param node
-         *          new candidate bookie node.
-         */
-        public void addBookie(BookieNode node);
+    RackawareEnsemblePlacementPolicyImpl slave = null;
 
-        /**
-         * @return list of addresses representing the ensemble
-         */
-        public ArrayList<BookieSocketAddress> toList();
+    RackawareEnsemblePlacementPolicy() {
+        super();
     }
 
-    protected static class TruePredicate implements Predicate {
-
-        public static final TruePredicate instance = new TruePredicate();
-
-        @Override
-        public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
-            return true;
-        }
-
+    RackawareEnsemblePlacementPolicy(boolean enforceDurability) {
+        super(enforceDurability);
     }
 
-    protected static class EnsembleForReplacement implements Ensemble {
-
-        public static final EnsembleForReplacement instance = new EnsembleForReplacement();
-        static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
-
-        @Override
-        public void addBookie(BookieNode node) {
-            // do nothing
-        }
-
-        @Override
-        public ArrayList<BookieSocketAddress> toList() {
-            return EMPTY_LIST;
+    @Override
+    protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsResolver,
+                                                          HashedWheelTimer timer,
+                                                          boolean reorderReadsRandom,
+                                                          int stabilizePeriodSeconds,
+                                                          StatsLogger statsLogger) {
+        if (stabilizePeriodSeconds > 0) {
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, statsLogger);
+            slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
+            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, statsLogger);
+        } else {
+            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, statsLogger);
+            slave = null;
         }
-
+        return this;
     }
 
-    /**
-     * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
-     * which ensures that a write quorum should be covered by at least two racks.
-     */
-    protected static class RRRackCoverageEnsemble implements Predicate, Ensemble {
-
-        class QuorumCoverageSet {
-            Set<String> racks = new HashSet<String>();
-            int seenBookies = 0;
-
-            boolean apply(BookieNode candidate) {
-                if (seenBookies + 1 == writeQuorumSize) {
-                    return racks.size() > (racks.contains(candidate.getNetworkLocation()) ? 1 : 0);
-                }
-                return true;
-            }
-
-            void addBookie(BookieNode candidate) {
-                ++seenBookies;
-                racks.add(candidate.getNetworkLocation());
-            }
-        }
-
-        final int ensembleSize;
-        final int writeQuorumSize;
-        final ArrayList<BookieNode> chosenNodes;
-        private final QuorumCoverageSet[] quorums;
-
-        protected RRRackCoverageEnsemble(int ensembleSize, int writeQuorumSize) {
-            this.ensembleSize = ensembleSize;
-            this.writeQuorumSize = writeQuorumSize;
-            this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
-            this.quorums = new QuorumCoverageSet[ensembleSize];
-        }
-
-        @Override
-        public boolean apply(BookieNode candidate, Ensemble ensemble) {
-            if (ensemble != this) {
-                return false;
-            }
-            // candidate position
-            int candidatePos = chosenNodes.size();
-            int startPos = candidatePos - writeQuorumSize + 1;
-            for (int i = startPos; i <= candidatePos; i++) {
-                int idx = (i + ensembleSize) % ensembleSize;
-                if (null == quorums[idx]) {
-                    quorums[idx] = new QuorumCoverageSet();
-                }
-                if (!quorums[idx].apply(candidate)) {
-                    return false;
-                }
-            }
-            return true;
-        }
-
-        @Override
-        public void addBookie(BookieNode node) {
-            int candidatePos = chosenNodes.size();
-            int startPos = candidatePos - writeQuorumSize + 1;
-            for (int i = startPos; i <= candidatePos; i++) {
-                int idx = (i + ensembleSize) % ensembleSize;
-                if (null == quorums[idx]) {
-                    quorums[idx] = new QuorumCoverageSet();
-                }
-                quorums[idx].addBookie(node);
-            }
-            chosenNodes.add(node);
-        }
-
-        @Override
-        public ArrayList<BookieSocketAddress> toList() {
-            ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize);
-            for (BookieNode bn : chosenNodes) {
-                addresses.add(bn.getAddr());
-            }
-            return addresses;
-        }
-
-        @Override
-        public String toString() {
-            return chosenNodes.toString();
+    @Override
+    public void uninitalize() {
+        super.uninitalize();
+        if (null != slave) {
+            slave.uninitalize();
         }
-
     }
 
-    protected static class BookieNode implements Node {
-
-        private final BookieSocketAddress addr; // identifier of a bookie node.
-
-        private int level; // the level in topology tree
-        private Node parent; // its parent in topology tree
-        private String location = NetworkTopology.DEFAULT_RACK; // its network location
-        private final String name;
-
-        BookieNode(BookieSocketAddress addr, String networkLoc) {
-            this.addr = addr;
-            this.name = addr.toString();
-            setNetworkLocation(networkLoc);
-        }
-
-        public BookieSocketAddress getAddr() {
-            return addr;
-        }
-
-        @Override
-        public int getLevel() {
-            return level;
-        }
-
-        @Override
-        public void setLevel(int level) {
-            this.level = level;
-        }
-
-        @Override
-        public Node getParent() {
-            return parent;
-        }
-
-        @Override
-        public void setParent(Node parent) {
-            this.parent = parent;
-        }
-
-        @Override
-        public String getName() {
-            return name;
-        }
-
-        @Override
-        public String getNetworkLocation() {
-            return location;
-        }
-
-        @Override
-        public void setNetworkLocation(String location) {
-            this.location = location;
-        }
-
-        @Override
-        public int hashCode() {
-            return name.hashCode();
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (!(obj instanceof BookieNode)) {
-                return false;
-            }
-            BookieNode other = (BookieNode) obj;
-            return getName().equals(other.getName());
-        }
-
-        @Override
-        public String toString() {
-            return String.format("<Bookie:%s>", name);
+    @Override
+    public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies, Set<BookieSocketAddress> readOnlyBookies) {
+        Set<BookieSocketAddress> deadBookies = super.onClusterChanged(writableBookies, readOnlyBookies);
+        if (null != slave) {
+            deadBookies = slave.onClusterChanged(writableBookies, readOnlyBookies);
         }
-
+        return deadBookies;
     }
 
-    static class DefaultResolver implements DNSToSwitchMapping {
-
-        @Override
-        public List<String> resolve(List<String> names) {
-            List<String> rNames = new ArrayList<String>(names.size());
-            for (@SuppressWarnings("unused") String name : names) {
-                rNames.add(NetworkTopology.DEFAULT_RACK);
+    @Override
+    public ArrayList<BookieSocketAddress> newEnsemble(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieSocketAddress> excludeBookies)
+            throws BKException.BKNotEnoughBookiesException {
+        try {
+            return super.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize, excludeBookies);
             }
-            return rNames;
         }
-
-        @Override
-        public void reloadCachedMappings() {
-            // nop
-        }
-
-    };
-
-    // for now, we just maintain the writable bookies' topology
-    private final NetworkTopology topology;
-    private DNSToSwitchMapping dnsResolver;
-    private final Map<BookieSocketAddress, BookieNode> knownBookies;
-    private BookieNode localNode;
-    private final ReentrantReadWriteLock rwLock;
-
-    public RackawareEnsemblePlacementPolicy() {
-        topology = new NetworkTopology();
-        knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
-
-        rwLock = new ReentrantReadWriteLock();
-    }
-
-    private BookieNode createBookieNode(BookieSocketAddress addr) {
-        return new BookieNode(addr, resolveNetworkLocation(addr));
     }
 
     @Override
-    public EnsemblePlacementPolicy initialize(Configuration conf) {
-        String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
+    public BookieSocketAddress replaceBookie(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Collection<BookieSocketAddress> currentEnsemble,
+            BookieSocketAddress bookieToReplace,
+            Set<BookieSocketAddress> excludeBookies)
+            throws BKException.BKNotEnoughBookiesException {
         try {
-            dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
-            if (dnsResolver instanceof Configurable) {
-                ((Configurable) dnsResolver).setConf(conf);
+            return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                    currentEnsemble, bookieToReplace, excludeBookies);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize,
+                        currentEnsemble, bookieToReplace, excludeBookies);
             }
-        } catch (RuntimeException re) {
-            LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
-            dnsResolver = new DefaultResolver();
         }
-
-        BookieNode bn;
-        try {
-            bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
-        } catch (UnknownHostException e) {
-            LOG.error("Failed to get local host address : ", e);
-            bn = null;
-        }
-        localNode = bn;
-        LOG.info("Initialize rackaware ensemble placement policy @ {} : {}.", localNode,
-                dnsResolver.getClass().getName());
-        return this;
     }
 
     @Override
-    public void uninitalize() {
-        // do nothing
-    }
-
-    private String resolveNetworkLocation(BookieSocketAddress addr) {
-        List<String> names = new ArrayList<String>(1);
-        if (dnsResolver instanceof CachedDNSToSwitchMapping) {
-            names.add(addr.getSocketAddress().getAddress().getHostAddress());
-        } else {
-            names.add(addr.getSocketAddress().getHostName());
-        }
-        // resolve network addresses
-        List<String> rNames = dnsResolver.resolve(names);
-        String netLoc;
-        if (null == rNames) {
-            LOG.warn("Failed to resolve network location for {}, using default rack for them : {}.", names,
-                    NetworkTopology.DEFAULT_RACK);
-            netLoc = NetworkTopology.DEFAULT_RACK;
-        } else {
-            netLoc = rNames.get(0);
-        }
-        return netLoc;
+    public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble,
+                                             List<Integer> writeSet,
+                                             Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        return super.reorderReadSequence(ensemble, writeSet, bookieFailureHistory);
     }
 
     @Override
-    public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
-            Set<BookieSocketAddress> readOnlyBookies) {
-        rwLock.writeLock().lock();
-        try {
-            ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, deadBookies;
-            Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
-            // left bookies : bookies in known bookies, but not in new writable bookie cluster.
-            leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy();
-            // joined bookies : bookies in new writable bookie cluster, but not in known bookies
-            joinedBookies = Sets.difference(writableBookies, oldBookieSet).immutableCopy();
-            // dead bookies.
-            deadBookies = Sets.difference(leftBookies, readOnlyBookies).immutableCopy();
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(
-                        "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
-                        new Object[] { leftBookies, joinedBookies, deadBookies });
-            }
-
-            // node left
-            for (BookieSocketAddress addr : leftBookies) {
-                BookieNode node = knownBookies.remove(addr);
-                topology.remove(node);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
-                }
-            }
-
-            // node joined
-            for (BookieSocketAddress addr : joinedBookies) {
-                BookieNode node = createBookieNode(addr);
-                topology.add(node);
-                knownBookies.put(addr, node);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
-                }
-            }
-
-            return deadBookies;
-        } finally {
-            rwLock.writeLock().unlock();
-        }
-    }
-
-    private Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) {
-        Set<Node> nodes = new HashSet<Node>();
-        for (BookieSocketAddress addr : excludeBookies) {
-            BookieNode bn = knownBookies.get(addr);
-            if (null == bn) {
-                bn = createBookieNode(addr);
-            }
-            nodes.add(bn);
-        }
-        return nodes;
+    public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble,
+                                                List<Integer> writeSet,
+                                                Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        return super.reorderReadLACSequence(ensemble, writeSet, bookieFailureHistory);
     }
 
     @Override
-    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
-        rwLock.readLock().lock();
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
+                                                    int writeQuorumSize,
+                                                    int ackQuorumSize,
+                                                    Set<BookieSocketAddress> excludeBookies,
+                                                    Ensemble<BookieNode> parentEnsemble,
+                                                    Predicate<BookieNode> parentPredicate)
+            throws BKException.BKNotEnoughBookiesException {
         try {
-            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
-            RRRackCoverageEnsemble ensemble = new RRRackCoverageEnsemble(ensembleSize, writeQuorumSize);
-            BookieNode prevNode = null;
-            int numRacks = topology.getNumOfRacks();
-            // only one rack, use the random algorithm.
-            if (numRacks < 2) {
-                List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes,
-                        EnsembleForReplacement.instance);
-                ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
-                for (BookieNode bn : bns) {
-                    addrs.add(bn.addr);
-                }
-                return addrs;
+            return super.newEnsemble(
+                    ensembleSize,
+                    writeQuorumSize,
+                    ackQuorumSize,
+                    excludeBookies,
+                    parentEnsemble,
+                    parentPredicate);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.newEnsemble(ensembleSize, writeQuorumSize, ackQuorumSize,
+                        excludeBookies, parentEnsemble, parentPredicate);
             }
-            // pick nodes by racks, to ensure there is at least two racks per write quorum.
-            for (int i = 0; i < ensembleSize; i++) {
-                String curRack;
-                if (null == prevNode) {
-                    if (null == localNode) {
-                        curRack = NodeBase.ROOT;
-                    } else {
-                        curRack = localNode.getNetworkLocation();
-                    }
-                } else {
-                    curRack = "~" + prevNode.getNetworkLocation();
-                }
-                prevNode = selectFromRack(curRack, excludeNodes, ensemble, ensemble);
-            }
-            return ensemble.toList();
-        } finally {
-            rwLock.readLock().unlock();
         }
     }
 
     @Override
-    public BookieSocketAddress replaceBookie(BookieSocketAddress bookieToReplace,
-            Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
-        rwLock.readLock().lock();
+    public BookieNode selectFromNetworkLocation(
+            String networkLoc,
+            Set<Node> excludeBookies,
+            Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble)
+            throws BKException.BKNotEnoughBookiesException {
         try {
-            BookieNode bn = knownBookies.get(bookieToReplace);
-            if (null == bn) {
-                bn = createBookieNode(bookieToReplace);
-            }
-
-            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
-            // add the bookie to replace in exclude set
-            excludeNodes.add(bn);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
-                        excludeNodes);
+            return super.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
+        } catch (BKException.BKNotEnoughBookiesException bnebe) {
+            if (slave == null) {
+                throw bnebe;
+            } else {
+                return slave.selectFromNetworkLocation(networkLoc, excludeBookies, predicate, ensemble);
             }
-            // pick a candidate from same rack to replace
-            BookieNode candidate = selectFromRack(bn.getNetworkLocation(), excludeNodes,
-                    TruePredicate.instance, EnsembleForReplacement.instance);
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
-            }
-            return candidate.addr;
-        } finally {
-            rwLock.readLock().unlock();
-        }
-    }
-
-    protected BookieNode selectFromRack(String networkLoc, Set<Node> excludeBookies, Predicate predicate,
-            Ensemble ensemble) throws BKNotEnoughBookiesException {
-        // select one from local rack
-        try {
-            return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
-        } catch (BKNotEnoughBookiesException e) {
-            LOG.warn("Failed to choose a bookie from {} : "
-                     + "excluded {}, fallback to choose bookie randomly from the cluster.",
-                     networkLoc, excludeBookies);
-            // randomly choose one from whole cluster, ignore the provided predicate.
-            return selectRandom(1, excludeBookies, ensemble).get(0);
         }
     }
 
-    protected String getRemoteRack(BookieNode node) {
-        return "~" + node.getNetworkLocation();
-    }
-
-    /**
-     * Choose random node under a given network path.
-     *
-     * @param netPath
-     *          network path
-     * @param excludeBookies
-     *          exclude bookies
-     * @param predicate
-     *          predicate to check whether the target is a good target.
-     * @param ensemble
-     *          ensemble structure
-     * @return chosen bookie.
-     */
-    protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate predicate,
-            Ensemble ensemble) throws BKNotEnoughBookiesException {
-        List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
-        Collections.shuffle(leaves);
-        for (Node n : leaves) {
-            if (excludeBookies.contains(n)) {
-                continue;
-            }
-            if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
-                continue;
-            }
-            BookieNode bn = (BookieNode) n;
-            // got a good candidate
-            ensemble.addBookie(bn);
-            // add the candidate to exclude set
-            excludeBookies.add(bn);
-            return bn;
+    @Override
+    public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+        super.handleBookiesThatLeft(leftBookies);
+        if (null != slave) {
+            slave.handleBookiesThatLeft(leftBookies);
         }
-        throw new BKNotEnoughBookiesException();
     }
 
-    /**
-     * Choose a random node from whole cluster.
-     *
-     * @param numBookies
-     *          number bookies to choose
-     * @param excludeBookies
-     *          bookies set to exclude.
-     * @param ensemble
-     *          ensemble to hold the bookie chosen.
-     * @return the bookie node chosen.
-     * @throws BKNotEnoughBookiesException
-     */
-    protected List<BookieNode> selectRandom(int numBookies, Set<Node> excludeBookies, Ensemble ensemble)
-            throws BKNotEnoughBookiesException {
-        List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values());
-        Collections.shuffle(allBookies);
-        List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
-        for (BookieNode bookie : allBookies) {
-            if (excludeBookies.contains(bookie)) {
-                continue;
-            }
-            ensemble.addBookie(bookie);
-            excludeBookies.add(bookie);
-            newBookies.add(bookie);
-            --numBookies;
-            if (numBookies == 0) {
-                return newBookies;
-            }
-        }
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
-                    numBookies, excludeBookies, allBookies });
+    @Override
+    public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
+        super.handleBookiesThatJoined(joinedBookies);
+        if (null != slave) {
+            slave.handleBookiesThatJoined(joinedBookies);
         }
-        throw new BKNotEnoughBookiesException();
     }
-
 }


Mime
View raw message