bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [4/5] bookkeeper git commit: BOOKKEEPER-612: Region aware placement
Date Thu, 13 Oct 2016 05:51:08 GMT
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
new file mode 100644
index 0000000..3c41a7c
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.conf.Configurable;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetUtils;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NetworkTopologyImpl;
+import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.net.ScriptBasedMapping;
+import org.apache.bookkeeper.net.StabilizeNetworkTopology;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+/**
+ * Simple rackware ensemble placement policy.
+ *
+ * Make most of the class and methods as protected, so it could be extended to implement other algorithms.
+ */
+class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy {
+
+    static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
+
+    public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
+    public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
+
+    static final int RACKNAME_DISTANCE_FROM_LEAVES = 1;
+
+    static class DefaultResolver implements DNSToSwitchMapping {
+
+        @Override
+        public List<String> resolve(List<String> names) {
+            List<String> rNames = new ArrayList<String>(names.size());
+            for (@SuppressWarnings("unused") String name : names) {
+                rNames.add(NetworkTopology.DEFAULT_RACK);
+            }
+            return rNames;
+        }
+
+        @Override
+        public void reloadCachedMappings() {
+            // nop
+        }
+
+    }
+
+    // for now, we just maintain the writable bookies' topology
+    protected NetworkTopology topology;
+    protected DNSToSwitchMapping dnsResolver;
+    protected HashedWheelTimer timer;
+    protected final Map<BookieSocketAddress, BookieNode> knownBookies;
+    protected BookieNode localNode;
+    protected final ReentrantReadWriteLock rwLock;
+    protected ImmutableSet<BookieSocketAddress> readOnlyBookies = null;
+    protected boolean reorderReadsRandom = false;
+    protected boolean enforceDurability = false;
+    protected int stabilizePeriodSeconds = 0;
+    protected StatsLogger statsLogger = null;
+
+    RackawareEnsemblePlacementPolicyImpl() {
+        this(false);
+    }
+
+    RackawareEnsemblePlacementPolicyImpl(boolean enforceDurability) {
+        this.enforceDurability = enforceDurability;
+        topology = new NetworkTopologyImpl();
+        knownBookies = new HashMap<BookieSocketAddress, BookieNode>();
+
+        rwLock = new ReentrantReadWriteLock();
+    }
+
+    protected BookieNode createBookieNode(BookieSocketAddress addr) {
+        return new BookieNode(addr, resolveNetworkLocation(addr));
+    }
+
+    /**
+     * Initialize the policy.
+     *
+     * @param dnsResolver the object used to resolve addresses to their network address
+     * @return initialized ensemble placement policy
+     */
+    protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dnsResolver,
+                                                              HashedWheelTimer timer,
+                                                              boolean reorderReadsRandom,
+                                                              int stabilizePeriodSeconds,
+                                                              StatsLogger statsLogger) {
+        this.statsLogger = statsLogger;
+        this.reorderReadsRandom = reorderReadsRandom;
+        this.stabilizePeriodSeconds = stabilizePeriodSeconds;
+        this.dnsResolver = dnsResolver;
+        this.timer = timer;
+
+        // create the network topology
+        if (stabilizePeriodSeconds > 0) {
+            this.topology = new StabilizeNetworkTopology(timer, stabilizePeriodSeconds);
+        } else {
+            this.topology = new NetworkTopologyImpl();
+        }
+
+        BookieNode bn;
+        try {
+            bn = createBookieNode(new BookieSocketAddress(InetAddress.getLocalHost().getHostAddress(), 0));
+        } catch (UnknownHostException e) {
+            LOG.error("Failed to get local host address : ", e);
+            bn = null;
+        }
+        localNode = bn;
+        LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.",
+            new Object[] { localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(),
+                dnsResolver.getClass().getName() });
+        return this;
+    }
+
+    @Override
+    public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
+                                                           Optional<DNSToSwitchMapping> optionalDnsResolver,
+                                                           HashedWheelTimer timer,
+                                                           FeatureProvider featureProvider,
+                                                           StatsLogger statsLogger) {
+        DNSToSwitchMapping dnsResolver;
+        if (optionalDnsResolver.isPresent()) {
+            dnsResolver = optionalDnsResolver.get();
+        } else {
+            String dnsResolverName = conf.getString(REPP_DNS_RESOLVER_CLASS, ScriptBasedMapping.class.getName());
+            try {
+                dnsResolver = ReflectionUtils.newInstance(dnsResolverName, DNSToSwitchMapping.class);
+                if (dnsResolver instanceof Configurable) {
+                    ((Configurable) dnsResolver).setConf(conf);
+                }
+            } catch (RuntimeException re) {
+                LOG.info("Failed to initialize DNS Resolver {}, used default subnet resolver.", dnsResolverName, re);
+                dnsResolver = new DefaultResolver();
+            }
+        }
+        return initialize(
+                dnsResolver,
+                timer,
+                conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
+                conf.getNetworkTopologyStabilizePeriodSeconds(),
+                statsLogger);
+    }
+
+    @Override
+    public void uninitalize() {
+        // do nothing
+    }
+
+    protected String resolveNetworkLocation(BookieSocketAddress addr) {
+        return NetUtils.resolveNetworkLocation(dnsResolver, addr.getSocketAddress());
+    }
+
+    @Override
+    public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
+            Set<BookieSocketAddress> readOnlyBookies) {
+        rwLock.writeLock().lock();
+        try {
+            ImmutableSet<BookieSocketAddress> joinedBookies, leftBookies, deadBookies;
+            Set<BookieSocketAddress> oldBookieSet = knownBookies.keySet();
+            // left bookies : bookies in known bookies, but not in new writable bookie cluster.
+            leftBookies = Sets.difference(oldBookieSet, writableBookies).immutableCopy();
+            // joined bookies : bookies in new writable bookie cluster, but not in known bookies
+            joinedBookies = Sets.difference(writableBookies, oldBookieSet).immutableCopy();
+            // dead bookies.
+            deadBookies = Sets.difference(leftBookies, readOnlyBookies).immutableCopy();
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Cluster changed : left bookies are {}, joined bookies are {}, while dead bookies are {}.",
+                        new Object[] { leftBookies, joinedBookies, deadBookies });
+            }
+            handleBookiesThatLeft(leftBookies);
+            handleBookiesThatJoined(joinedBookies);
+
+            if (!readOnlyBookies.isEmpty()) {
+                this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies);
+            }
+
+            return deadBookies;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
+    public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+        for (BookieSocketAddress addr : leftBookies) {
+            BookieNode node = knownBookies.remove(addr);
+            if(null != node) {
+                topology.remove(node);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
+        // node joined
+        for (BookieSocketAddress addr : joinedBookies) {
+            BookieNode node = createBookieNode(addr);
+            topology.add(node);
+            knownBookies.put(addr, node);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
+            }
+        }
+    }
+
+    protected Set<Node> convertBookiesToNodes(Set<BookieSocketAddress> excludeBookies) {
+        Set<Node> nodes = new HashSet<Node>();
+        for (BookieSocketAddress addr : excludeBookies) {
+            BookieNode bn = knownBookies.get(addr);
+            if (null == bn) {
+                bn = createBookieNode(addr);
+            }
+            nodes.add(bn);
+        }
+        return nodes;
+    }
+
+    @Override
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                                      Set<BookieSocketAddress> excludeBookies)
+            throws BKNotEnoughBookiesException {
+        return newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, null, null);
+    }
+
+    protected ArrayList<BookieSocketAddress> newEnsembleInternal(int ensembleSize,
+                                                               int writeQuorumSize,
+                                                               Set<BookieSocketAddress> excludeBookies,
+                                                               Ensemble<BookieNode> parentEnsemble,
+                                                               Predicate<BookieNode> parentPredicate)
+            throws BKNotEnoughBookiesException {
+        return newEnsembleInternal(
+                ensembleSize,
+                writeQuorumSize,
+                writeQuorumSize,
+                excludeBookies,
+                parentEnsemble,
+                parentPredicate);
+    }
+
+    @Override
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize,
+                                                    int writeQuorumSize,
+                                                    int ackQuorumSize,
+                                                    Set<BookieSocketAddress> excludeBookies,
+                                                    Ensemble<BookieNode> parentEnsemble,
+                                                    Predicate<BookieNode> parentPredicate)
+            throws BKNotEnoughBookiesException {
+        return newEnsembleInternal(
+                ensembleSize,
+                writeQuorumSize,
+                ackQuorumSize,
+                excludeBookies,
+                parentEnsemble,
+                parentPredicate);
+    }
+
+    protected ArrayList<BookieSocketAddress> newEnsembleInternal(
+            int ensembleSize,
+            int writeQuorumSize,
+            int ackQuorumSize,
+            Set<BookieSocketAddress> excludeBookies,
+            Ensemble<BookieNode> parentEnsemble,
+            Predicate<BookieNode> parentPredicate) throws BKNotEnoughBookiesException {
+        rwLock.readLock().lock();
+        try {
+            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            RRTopologyAwareCoverageEnsemble ensemble =
+                    new RRTopologyAwareCoverageEnsemble(
+                            ensembleSize,
+                            writeQuorumSize,
+                            ackQuorumSize,
+                            RACKNAME_DISTANCE_FROM_LEAVES,
+                            parentEnsemble,
+                            parentPredicate);
+            BookieNode prevNode = null;
+            int numRacks = topology.getNumOfRacks();
+            // only one rack, use the random algorithm.
+            if (numRacks < 2) {
+                List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.instance,
+                        ensemble);
+                ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
+                for (BookieNode bn : bns) {
+                    addrs.add(bn.getAddr());
+                }
+                return addrs;
+            }
+            // pick nodes by racks, to ensure there is at least two racks per write quorum.
+            for (int i = 0; i < ensembleSize; i++) {
+                String curRack;
+                if (null == prevNode) {
+                    if ((null == localNode) ||
+                            localNode.getNetworkLocation().equals(NetworkTopology.DEFAULT_RACK)) {
+                        curRack = NodeBase.ROOT;
+                    } else {
+                        curRack = localNode.getNetworkLocation();
+                    }
+                } else {
+                    curRack = "~" + prevNode.getNetworkLocation();
+                }
+                prevNode = selectFromNetworkLocation(curRack, excludeNodes, ensemble, ensemble);
+            }
+            ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+            if (ensembleSize != bookieList.size()) {
+                LOG.error("Not enough {} bookies are available to form an ensemble : {}.",
+                          ensembleSize, bookieList);
+                throw new BKNotEnoughBookiesException();
+            }
+            return bookieList;
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                             Collection<BookieSocketAddress> currentEnsemble,
+                                             BookieSocketAddress bookieToReplace,
+                                             Set<BookieSocketAddress> excludeBookies)
+            throws BKNotEnoughBookiesException {
+        rwLock.readLock().lock();
+        try {
+            excludeBookies.addAll(currentEnsemble);
+            BookieNode bn = knownBookies.get(bookieToReplace);
+            if (null == bn) {
+                bn = createBookieNode(bookieToReplace);
+            }
+
+            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            // add the bookie to replace in exclude set
+            excludeNodes.add(bn);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
+                        excludeNodes);
+            }
+            // pick a candidate from same rack to replace
+            BookieNode candidate = selectFromNetworkLocation(
+                    bn.getNetworkLocation(),
+                    excludeNodes,
+                    TruePredicate.instance,
+                    EnsembleForReplacementWithNoConstraints.instance);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bn);
+            }
+            return candidate.getAddr();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public BookieNode selectFromNetworkLocation(
+            String networkLoc,
+            Set<Node> excludeBookies,
+            Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble)
+            throws BKNotEnoughBookiesException {
+        // select one from local rack
+        try {
+            return selectRandomFromRack(networkLoc, excludeBookies, predicate, ensemble);
+        } catch (BKNotEnoughBookiesException e) {
+            LOG.warn("Failed to choose a bookie from {} : "
+                     + "excluded {}, fallback to choose bookie randomly from the cluster.",
+                     networkLoc, excludeBookies);
+            // randomly choose one from whole cluster, ignore the provided predicate.
+            return selectRandom(1, excludeBookies, predicate, ensemble).get(0);
+        }
+    }
+
+    protected String getRemoteRack(BookieNode node) {
+        return "~" + node.getNetworkLocation();
+    }
+
+    /**
+     * Choose random node under a given network path.
+     *
+     * @param netPath
+     *          network path
+     * @param excludeBookies
+     *          exclude bookies
+     * @param predicate
+     *          predicate to check whether the target is a good target.
+     * @param ensemble
+     *          ensemble structure
+     * @return chosen bookie.
+     */
+    protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
+            Ensemble<BookieNode> ensemble) throws BKNotEnoughBookiesException {
+        List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
+        Collections.shuffle(leaves);
+        for (Node n : leaves) {
+            if (excludeBookies.contains(n)) {
+                continue;
+            }
+            if (!(n instanceof BookieNode) || !predicate.apply((BookieNode) n, ensemble)) {
+                continue;
+            }
+            BookieNode bn = (BookieNode) n;
+            // got a good candidate
+            if (ensemble.addNode(bn)) {
+                // add the candidate to exclude set
+                excludeBookies.add(bn);
+            }
+            return bn;
+        }
+        throw new BKNotEnoughBookiesException();
+    }
+
+    /**
+     * Choose a random node from whole cluster.
+     *
+     * @param numBookies
+     *          number bookies to choose
+     * @param excludeBookies
+     *          bookies set to exclude.
+     * @param ensemble
+     *          ensemble to hold the bookie chosen.
+     * @return the bookie node chosen.
+     * @throws BKNotEnoughBookiesException
+     */
+    protected List<BookieNode> selectRandom(int numBookies,
+                                            Set<Node> excludeBookies,
+                                            Predicate<BookieNode> predicate,
+                                            Ensemble<BookieNode> ensemble)
+            throws BKNotEnoughBookiesException {
+        return selectRandomInternal(new ArrayList<BookieNode>(knownBookies.values()),  numBookies, excludeBookies, predicate, ensemble);
+    }
+
+    protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelectFrom,
+                                                    int numBookies,
+                                                    Set<Node> excludeBookies,
+                                                    Predicate<BookieNode> predicate,
+                                                    Ensemble<BookieNode> ensemble)
+        throws BKNotEnoughBookiesException {
+        Collections.shuffle(bookiesToSelectFrom);
+        List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
+        for (BookieNode bookie : bookiesToSelectFrom) {
+            if (excludeBookies.contains(bookie)) {
+                continue;
+            }
+
+            // When durability is being enforced; we must not violate the
+            // predicate even when selecting a random bookie; as durability
+            // guarantee is not best effort; correctness is implied by it
+            if (enforceDurability && !predicate.apply(bookie, ensemble)) {
+                continue;
+            }
+
+            if (ensemble.addNode(bookie)) {
+                excludeBookies.add(bookie);
+                newBookies.add(bookie);
+                --numBookies;
+            }
+
+            if (numBookies == 0) {
+                return newBookies;
+            }
+        }
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
+                numBookies, excludeBookies, bookiesToSelectFrom });
+        }
+        throw new BKNotEnoughBookiesException();
+    }
+
+
+
+    @Override
+    public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        int ensembleSize = ensemble.size();
+        List<Integer> finalList = new ArrayList<Integer>(writeSet.size());
+        List<Long> observedFailuresList = new ArrayList<Long>(writeSet.size());
+        List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size());
+        List<Integer> unAvailableList = new ArrayList<Integer>(writeSet.size());
+        for (Integer idx : writeSet) {
+            BookieSocketAddress address = ensemble.get(idx);
+            Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+            if (null == knownBookies.get(address)) {
+                // there isn't too much differences between readonly bookies from unavailable bookies. since there
+                // is no write requests to them, so we shouldn't try reading from readonly bookie in prior to writable
+                // bookies.
+                if ((null == readOnlyBookies) || !readOnlyBookies.contains(address)) {
+                    unAvailableList.add(idx);
+                } else {
+                    readOnlyList.add(idx);
+                }
+            } else {
+                if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) {
+                    finalList.add(idx);
+                } else {
+                    observedFailuresList.add(lastFailedEntryOnBookie * ensembleSize + idx);
+                }
+            }
+        }
+
+        if (reorderReadsRandom) {
+            Collections.shuffle(finalList);
+            Collections.shuffle(readOnlyList);
+            Collections.shuffle(unAvailableList);
+        }
+
+        Collections.sort(observedFailuresList);
+
+        for(long value: observedFailuresList) {
+            finalList.add((int)(value % ensembleSize));
+        }
+
+        finalList.addAll(readOnlyList);
+        finalList.addAll(unAvailableList);
+        return finalList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
new file mode 100644
index 0000000..abdcb61
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import com.google.common.base.Optional;
+
+
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.DNSToSwitchMapping;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.Node;
+import org.apache.bookkeeper.net.NodeBase;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy {
+    static final Logger LOG = LoggerFactory.getLogger(RegionAwareEnsemblePlacementPolicy.class);
+
+    public static final String REPP_REGIONS_TO_WRITE = "reppRegionsToWrite";
+    public static final String REPP_MINIMUM_REGIONS_FOR_DURABILITY = "reppMinimumRegionsForDurability";
+    public static final String REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE = "reppEnableDurabilityEnforcementInReplace";
+    public static final String REPP_DISABLE_DURABILITY_FEATURE_NAME = "reppDisableDurabilityFeatureName";
+    public static final String REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME = "reppDisallowBookiePlacementInRegionFeatureName";
+    public static final String REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE = "reppDisableDurabilityEnforcementFeature";
+    public static final String REPP_ENABLE_VALIDATION = "reppEnableValidation";
+    public static final String REGION_AWARE_ANOMALOUS_ENSEMBLE = "region_aware_anomalous_ensemble";
+    static final int MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT = 2;
+    static final int REGIONID_DISTANCE_FROM_LEAVES = 2;
+    static final String UNKNOWN_REGION = "UnknownRegion";
+    static final int REMOTE_NODE_IN_REORDER_SEQUENCE = 2;
+
+    protected final Map<String, TopologyAwareEnsemblePlacementPolicy> perRegionPlacement;
+    protected final ConcurrentMap<BookieSocketAddress, String> address2Region;
+    protected FeatureProvider featureProvider;
+    protected String disallowBookiePlacementInRegionFeatureName;
+    protected String myRegion = null;
+    protected int minRegionsForDurability = 0;
+    protected boolean enableValidation = true;
+    protected boolean enforceDurabilityInReplace = false;
+    protected Feature disableDurabilityFeature;
+
+    RegionAwareEnsemblePlacementPolicy() {
+        super();
+        perRegionPlacement = new HashMap<String, TopologyAwareEnsemblePlacementPolicy>();
+        address2Region = new ConcurrentHashMap<BookieSocketAddress, String>();
+    }
+
+    protected String getRegion(BookieSocketAddress addr) {
+        String region = address2Region.get(addr);
+        if (null == region) {
+            String networkLocation = resolveNetworkLocation(addr);
+            if (NetworkTopology.DEFAULT_RACK.equals(networkLocation)) {
+                region = UNKNOWN_REGION;
+            } else {
+                String[] parts = networkLocation.split(NodeBase.PATH_SEPARATOR_STR);
+                if (parts.length <= 1) {
+                    region = UNKNOWN_REGION;
+                } else {
+                    region = parts[1];
+                }
+            }
+            address2Region.putIfAbsent(addr, region);
+        }
+        return region;
+    }
+
+    protected String getLocalRegion(BookieNode node) {
+        if (null == node || null == node.getAddr()) {
+            return UNKNOWN_REGION;
+        }
+        return getRegion(node.getAddr());
+    }
+
+    @Override
+    public void handleBookiesThatLeft(Set<BookieSocketAddress> leftBookies) {
+        super.handleBookiesThatLeft(leftBookies);
+
+        for(TopologyAwareEnsemblePlacementPolicy policy: perRegionPlacement.values()) {
+            policy.handleBookiesThatLeft(leftBookies);
+        }
+    }
+
+    @Override
+    public void handleBookiesThatJoined(Set<BookieSocketAddress> joinedBookies) {
+        Map<String, Set<BookieSocketAddress>> perRegionClusterChange = new HashMap<String, Set<BookieSocketAddress>>();
+
+        // node joined
+        for (BookieSocketAddress addr : joinedBookies) {
+            BookieNode node = createBookieNode(addr);
+            topology.add(node);
+            knownBookies.put(addr, node);
+            String region = getLocalRegion(node);
+            if (null == perRegionPlacement.get(region)) {
+                perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
+                        .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger));
+            }
+
+            Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region);
+            if (null == regionSet) {
+                regionSet = new HashSet<BookieSocketAddress>();
+                regionSet.add(addr);
+                perRegionClusterChange.put(region, regionSet);
+            } else {
+                regionSet.add(addr);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
+            }
+        }
+
+        for(String region: perRegionPlacement.keySet()) {
+            Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region);
+            if (null == regionSet) {
+                regionSet = new HashSet<BookieSocketAddress>();
+            }
+            perRegionPlacement.get(region).handleBookiesThatJoined(regionSet);
+        }
+    }
+
+    @Override
+    public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
+                                                         Optional<DNSToSwitchMapping> optionalDnsResolver,
+                                                         HashedWheelTimer timer,
+                                                         FeatureProvider featureProvider,
+                                                         StatsLogger statsLogger) {
+        super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
+        myRegion = getLocalRegion(localNode);
+        enableValidation = conf.getBoolean(REPP_ENABLE_VALIDATION, true);
+
+        // We have to statically provide regions we want the writes to go through and how many regions
+        // are required for durability. This decision cannot be driven by the active bookies as the
+        // current topology will not be indicative of constraints that must be enforced for durability
+        String regionsString = conf.getString(REPP_REGIONS_TO_WRITE, null);
+        if (null != regionsString) {
+            // Regions are specified as
+            // R1;R2;...
+            String[] regions = regionsString.split(";");
+            for (String region: regions) {
+                perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
+                        .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger));
+            }
+            minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT);
+            if (minRegionsForDurability > 0) {
+                enforceDurability = true;
+                enforceDurabilityInReplace = conf.getBoolean(REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE, true);
+            }
+            if (regions.length < minRegionsForDurability) {
+                throw new IllegalArgumentException("Regions provided are insufficient to meet the durability constraints");
+            }
+        }
+        this.featureProvider = featureProvider;
+        this.disallowBookiePlacementInRegionFeatureName = conf.getString(REPP_DISALLOW_BOOKIE_PLACEMENT_IN_REGION_FEATURE_NAME);
+        this.disableDurabilityFeature = conf.getFeature(REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE, null);
+        if (null == disableDurabilityFeature) {
+            this.disableDurabilityFeature =
+                    featureProvider.getFeature(
+                        conf.getString(REPP_DISABLE_DURABILITY_FEATURE_NAME,
+                                BookKeeperConstants.FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT));
+        }
+        return this;
+    }
+
+    protected List<BookieNode> selectRandomFromRegions(Set<String> availableRegions,
+                                            int numBookies,
+                                            Set<Node> excludeBookies,
+                                            Predicate<BookieNode> predicate,
+                                            Ensemble<BookieNode> ensemble)
+        throws BKException.BKNotEnoughBookiesException {
+        List<BookieNode> availableBookies = new ArrayList<BookieNode>();
+        for(BookieNode bookieNode: knownBookies.values()) {
+            if (availableRegions.contains(getLocalRegion(bookieNode))) {
+                availableBookies.add(bookieNode);
+            }
+        }
+
+        return selectRandomInternal(availableBookies,  numBookies, excludeBookies, predicate, ensemble);
+    }
+
+
+    @Override
+    public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
+                                                    Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+
+        int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;
+
+        // All of these conditions indicate bad configuration
+        if (ackQuorumSize < effectiveMinRegionsForDurability) {
+            throw new IllegalArgumentException("Ack Quorum size provided are insufficient to meet the durability constraints");
+        } else if (ensembleSize < writeQuorumSize) {
+            throw new IllegalArgumentException("write quorum (" + writeQuorumSize + ") cannot exceed ensemble size (" + ensembleSize + ")");
+        } else if (writeQuorumSize < ackQuorumSize) {
+            throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") cannot exceed write quorum size (" + writeQuorumSize + ")");
+        } else if (effectiveMinRegionsForDurability > 0) {
+            // We must survive the failure of numRegions - effectiveMinRegionsForDurability. When these
+            // regions have failed we would spread the replicas over the remaining
+            // effectiveMinRegionsForDurability regions; we have to make sure that the ack quorum is large
+            // enough such that there is a configuration for spreading the replicas across
+            // effectiveMinRegionsForDurability - 1 regions
+            if (ackQuorumSize <= (writeQuorumSize - (writeQuorumSize / effectiveMinRegionsForDurability))) {
+                throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") " +
+                    "violates the requirement to satisfy durability constraints when running in degraded mode");
+            }
+        }
+
+        rwLock.readLock().lock();
+        try {
+            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            Set<String> availableRegions = new HashSet<String>();
+            for (String region: perRegionPlacement.keySet()) {
+                if ((null == disallowBookiePlacementInRegionFeatureName) ||
+                    !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName).isAvailable()) {
+                    availableRegions.add(region);
+                }
+            }
+            int numRegionsAvailable = availableRegions.size();
+
+            // If we were unable to get region information or all regions are disallowed which is
+            // an invalid configuration; default to random selection from the set of nodes
+            if (numRegionsAvailable < 1) {
+                // We cant disallow all regions; if we did, raise an alert to draw attention
+                if (perRegionPlacement.keySet().size() >= 1) {
+                    LOG.error("No regions available, invalid configuration");
+                }
+                List<BookieNode> bns = selectRandom(ensembleSize, excludeNodes, TruePredicate.instance,
+                    EnsembleForReplacementWithNoConstraints.instance);
+                ArrayList<BookieSocketAddress> addrs = new ArrayList<BookieSocketAddress>(ensembleSize);
+                for (BookieNode bn : bns) {
+                    addrs.add(bn.getAddr());
+                }
+                return addrs;
+            }
+
+            // Single region, fall back to RackAwareEnsemblePlacement
+            if (numRegionsAvailable < 2) {
+                RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
+                                                                    writeQuorumSize,
+                                                                    ackQuorumSize,
+                                                                    REGIONID_DISTANCE_FROM_LEAVES,
+                                                                    effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
+                                                                    effectiveMinRegionsForDurability);
+                TopologyAwareEnsemblePlacementPolicy nextPolicy = perRegionPlacement.get(availableRegions.iterator().next());
+                return nextPolicy.newEnsemble(ensembleSize, writeQuorumSize, writeQuorumSize, excludeBookies, ensemble, ensemble);
+            }
+
+            int remainingEnsemble = ensembleSize;
+            int remainingWriteQuorum = writeQuorumSize;
+
+            // Equally distribute the nodes across all regions to whatever extent possible
+            // with the hierarchy in mind
+            // Try and place as many nodes in a region as possible, the ones that cannot be
+            // accommodated are placed on other regions
+            // Within each region try and follow rack aware placement
+            Map<String, Pair<Integer,Integer>> regionsWiseAllocation = new HashMap<String, Pair<Integer,Integer>>();
+            for (String region: availableRegions) {
+                regionsWiseAllocation.put(region, Pair.of(0,0));
+            }
+            int remainingEnsembleBeforeIteration;
+            Set<String> regionsReachedMaxAllocation = new HashSet<String>();
+            RRTopologyAwareCoverageEnsemble ensemble;
+            int iteration = 0;
+            do {
+                LOG.info("RegionAwareEnsemblePlacementPolicy#newEnsemble Iteration {}", iteration++);
+                int numRemainingRegions = numRegionsAvailable - regionsReachedMaxAllocation.size();
+                ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
+                                    writeQuorumSize,
+                                    ackQuorumSize,
+                                    REGIONID_DISTANCE_FROM_LEAVES,
+                                    // We pass all regions we know off to the coverage ensemble as
+                                    // regardless of regions that are available; constraints are
+                                    // always applied based on all possible regions
+                                    effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
+                                    effectiveMinRegionsForDurability);
+                remainingEnsembleBeforeIteration = remainingEnsemble;
+                for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) {
+                    String region = regionEntry.getKey();
+                    final Pair<Integer, Integer> currentAllocation = regionEntry.getValue();
+                    TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
+                    if (!regionsReachedMaxAllocation.contains(region)) {
+                        if (numRemainingRegions <= 0) {
+                            LOG.error("Inconsistent State: This should never happen");
+                            throw new BKException.BKNotEnoughBookiesException();
+                        }
+
+                        int addToEnsembleSize = Math.min(remainingEnsemble, (remainingEnsembleBeforeIteration + numRemainingRegions - 1) / numRemainingRegions);
+                        boolean success = false;
+                        while(addToEnsembleSize > 0) {
+                            int addToWriteQuorum = Math.max(1, Math.min(remainingWriteQuorum, Math.round(1.0f * writeQuorumSize * addToEnsembleSize / ensembleSize)));
+
+                            // Temp ensemble will be merged back into the ensemble only if we are able to successfully allocate
+                            // the target number of bookies in this region; if we fail because we dont have enough bookies; then we
+                            // retry the process with a smaller target
+                            RRTopologyAwareCoverageEnsemble tempEnsemble = new RRTopologyAwareCoverageEnsemble(ensemble);
+                            int newEnsembleSize = currentAllocation.getLeft() + addToEnsembleSize;
+                            int newWriteQuorumSize = currentAllocation.getRight() + addToWriteQuorum;
+                            try {
+                                policyWithinRegion.newEnsemble(newEnsembleSize, newWriteQuorumSize, newWriteQuorumSize, excludeBookies, tempEnsemble, tempEnsemble);
+                                ensemble = tempEnsemble;
+                                remainingEnsemble -= addToEnsembleSize;
+                                remainingWriteQuorum -= writeQuorumSize;
+                                regionsWiseAllocation.put(region, Pair.of(newEnsembleSize, newWriteQuorumSize));
+                                success = true;
+                                LOG.info("Allocated {} bookies in region {} : {}",
+                                        new Object[]{newEnsembleSize, region, ensemble});
+                                break;
+                            } catch (BKException.BKNotEnoughBookiesException exc) {
+                                LOG.warn("Could not allocate {} bookies in region {}, try allocating {} bookies",
+                                         new Object[] {newEnsembleSize, region, (newEnsembleSize - 1) });
+                                addToEnsembleSize--;
+                            }
+                        }
+
+                        // we couldn't allocate additional bookies from the region,
+                        // it should have reached its max allocation.
+                        if (!success) {
+                            regionsReachedMaxAllocation.add(region);
+                        }
+                    }
+
+                    if (regionsReachedMaxAllocation.contains(region)) {
+                        if (currentAllocation.getLeft() > 0) {
+                            LOG.info("Allocating {} bookies in region {} : ensemble {} exclude {}",
+                                new Object[]{currentAllocation.getLeft(), region, excludeBookies, ensemble});
+                            policyWithinRegion.newEnsemble(
+                                    currentAllocation.getLeft(),
+                                    currentAllocation.getRight(),
+                                    currentAllocation.getRight(),
+                                    excludeBookies,
+                                    ensemble,
+                                    ensemble);
+                            LOG.info("Allocated {} bookies in region {} : {}",
+                                new Object[]{currentAllocation.getLeft(), region, ensemble});
+                        }
+                    }
+                }
+
+                if (regionsReachedMaxAllocation.containsAll(regionsWiseAllocation.keySet())) {
+                    break;
+                }
+            } while ((remainingEnsemble > 0) && (remainingEnsemble < remainingEnsembleBeforeIteration));
+
+            ArrayList<BookieSocketAddress> bookieList = ensemble.toList();
+            if (ensembleSize != bookieList.size()) {
+                LOG.error("Not enough {} bookies are available to form an ensemble : {}.",
+                          ensembleSize, bookieList);
+                throw new BKException.BKNotEnoughBookiesException();
+            }
+
+            if(enableValidation && !ensemble.validate()) {
+                LOG.error("Not enough {} bookies are available to form a valid ensemble : {}.",
+                    ensembleSize, bookieList);
+                throw new BKException.BKNotEnoughBookiesException();
+            }
+
+            return ensemble.toList();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public BookieSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace,
+                                           Set<BookieSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
+        rwLock.readLock().lock();
+        try {
+            boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable();
+            int effectiveMinRegionsForDurability = enforceDurability ? minRegionsForDurability : 1;
+            Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
+            RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
+                writeQuorumSize,
+                ackQuorumSize,
+                REGIONID_DISTANCE_FROM_LEAVES,
+                effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
+                effectiveMinRegionsForDurability);
+
+            BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace);
+            if (null == bookieNodeToReplace) {
+                bookieNodeToReplace = createBookieNode(bookieToReplace);
+            }
+            excludeNodes.add(bookieNodeToReplace);
+
+            for(BookieSocketAddress bookieAddress: currentEnsemble) {
+                if (bookieAddress.equals(bookieToReplace)) {
+                    continue;
+                }
+
+                BookieNode bn = knownBookies.get(bookieAddress);
+                if (null == bn) {
+                    bn = createBookieNode(bookieAddress);
+                }
+
+                excludeNodes.add(bn);
+
+                if (!ensemble.apply(bn, ensemble)) {
+                    LOG.warn("Anomalous ensemble detected");
+                    if (null != statsLogger) {
+                        statsLogger.getCounter(REGION_AWARE_ANOMALOUS_ENSEMBLE).inc();
+                    }
+                    enforceDurability = false;
+                }
+
+                ensemble.addNode(bn);
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Try to choose a new bookie to replace {}, excluding {}.", bookieToReplace,
+                    excludeNodes);
+            }
+            // pick a candidate from same rack to replace
+            BookieNode candidate = replaceFromRack(bookieNodeToReplace, excludeNodes,
+                ensemble, ensemble, enforceDurability);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Bookie {} is chosen to replace bookie {}.", candidate, bookieNodeToReplace);
+            }
+            return candidate.getAddr();
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    protected BookieNode replaceFromRack(BookieNode bookieNodeToReplace,
+                                         Set<Node> excludeBookies,
+                                         Predicate<BookieNode> predicate,
+                                         Ensemble<BookieNode> ensemble,
+                                         boolean enforceDurability)
+        throws BKException.BKNotEnoughBookiesException {
+        Set<String> availableRegions = new HashSet<String>();
+        for (String region: perRegionPlacement.keySet()) {
+            if ((null == disallowBookiePlacementInRegionFeatureName) ||
+                !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName).isAvailable()) {
+                availableRegions.add(region);
+            }
+        }
+        String regionForBookieToReplace = getLocalRegion(bookieNodeToReplace);
+        if (availableRegions.contains(regionForBookieToReplace)) {
+            TopologyAwareEnsemblePlacementPolicy regionPolicy = perRegionPlacement.get(regionForBookieToReplace);
+            if (null != regionPolicy) {
+                try {
+                    // select one from local rack => it falls back to selecting a node from the region
+                    // if the rack does not have an available node, selecting from the same region
+                    // should not violate durability constraints so we can simply not have to check
+                    // for that.
+                    return regionPolicy.selectFromNetworkLocation(
+                        bookieNodeToReplace.getNetworkLocation(),
+                        excludeBookies,
+                        TruePredicate.instance,
+                        EnsembleForReplacementWithNoConstraints.instance);
+                } catch (BKException.BKNotEnoughBookiesException e) {
+                    LOG.warn("Failed to choose a bookie from {} : "
+                            + "excluded {}, fallback to choose bookie randomly from the cluster.",
+                        bookieNodeToReplace.getNetworkLocation(), excludeBookies);
+                }
+            }
+        }
+
+        // randomly choose one from all the regions that are available, ignore the provided predicate if we are not
+        // enforcing durability.
+        return selectRandomFromRegions(availableRegions, 1,
+            excludeBookies,
+            enforceDurability ? predicate : TruePredicate.instance,
+            enforceDurability ? ensemble : EnsembleForReplacementWithNoConstraints.instance).get(0);
+    }
+
+    @Override
+    public final List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        if (UNKNOWN_REGION.equals(myRegion)) {
+            return super.reorderReadSequence(ensemble, writeSet, bookieFailureHistory);
+        } else {
+            int ensembleSize = ensemble.size();
+            List<Integer> finalList = new ArrayList<Integer>(writeSet.size());
+            List<Integer> localList = new ArrayList<Integer>(writeSet.size());
+            List<Long> localFailures = new ArrayList<Long>(writeSet.size());
+            List<Integer> remoteList = new ArrayList<Integer>(writeSet.size());
+            List<Long> remoteFailures = new ArrayList<Long>(writeSet.size());
+            List<Integer> readOnlyList = new ArrayList<Integer>(writeSet.size());
+            List<Integer> unAvailableList = new ArrayList<Integer>(writeSet.size());
+            for (Integer idx : writeSet) {
+                BookieSocketAddress address = ensemble.get(idx);
+                String region = getRegion(address);
+                Long lastFailedEntryOnBookie = bookieFailureHistory.get(address);
+                if (null == knownBookies.get(address)) {
+                    // there isn't too much differences between readonly bookies from unavailable bookies. since there
+                    // is no write requests to them, so we shouldn't try reading from readonly bookie in prior to writable
+                    // bookies.
+                    if ((null == readOnlyBookies) || !readOnlyBookies.contains(address)) {
+                        unAvailableList.add(idx);
+                    } else {
+                        readOnlyList.add(idx);
+                    }
+                } else if (region.equals(myRegion)) {
+                    if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) {
+                        localList.add(idx);
+                    } else {
+                         localFailures.add(lastFailedEntryOnBookie * ensembleSize + idx);
+                    }
+                } else {
+                    if ((lastFailedEntryOnBookie == null) || (lastFailedEntryOnBookie < 0)) {
+                        remoteList.add(idx);
+                    } else {
+                        remoteFailures.add(lastFailedEntryOnBookie * ensembleSize + idx);
+                    }
+                }
+            }
+
+            // Given that idx is less than ensemble size the order of the elements in these two lists
+            // is determined by the lastFailedEntryOnBookie
+            Collections.sort(localFailures);
+            Collections.sort(remoteFailures);
+
+            if (reorderReadsRandom) {
+                Collections.shuffle(localList);
+                Collections.shuffle(remoteList);
+                Collections.shuffle(readOnlyList);
+                Collections.shuffle(unAvailableList);
+            }
+
+            // nodes within a region are ordered as follows
+            // (Random?) list of nodes that have no history of failure
+            // Nodes with Failure history are ordered in the reverse
+            // order of the most recent entry that generated an error
+            for(long value: localFailures) {
+                localList.add((int)(value % ensembleSize));
+            }
+
+            for(long value: remoteFailures) {
+                remoteList.add((int)(value % ensembleSize));
+            }
+
+            // Insert a node from the remote region at the specified location so we
+            // try more than one region within the max allowed latency
+            for (int i = 0; i < REMOTE_NODE_IN_REORDER_SEQUENCE; i++) {
+                if (localList.size() > 0) {
+                    finalList.add(localList.remove(0));
+                } else {
+                    break;
+                }
+            }
+
+            if (remoteList.size() > 0) {
+                finalList.add(remoteList.remove(0));
+            }
+
+            // Add all the local nodes
+            finalList.addAll(localList);
+            finalList.addAll(remoteList);
+            finalList.addAll(readOnlyList);
+            finalList.addAll(unAvailableList);
+            return finalList;
+        }
+    }
+
+    @Override
+    public final List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        if (UNKNOWN_REGION.equals(myRegion)) {
+            return super.reorderReadLACSequence(ensemble, writeSet, bookieFailureHistory);
+        }
+        List<Integer> finalList = reorderReadSequence(ensemble, writeSet, bookieFailureHistory);
+
+        if (finalList.size() < ensemble.size()) {
+            for (int i = 0; i < ensemble.size(); i++) {
+                if (!finalList.contains(i)) {
+                    finalList.add(i);
+                }
+            }
+        }
+        return finalList;
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
new file mode 100644
index 0000000..c222827
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/TopologyAwareEnsemblePlacementPolicy.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.net.NetworkTopology;
+import org.apache.bookkeeper.net.NodeBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class TopologyAwareEnsemblePlacementPolicy implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> {
+    static final Logger LOG = LoggerFactory.getLogger(TopologyAwareEnsemblePlacementPolicy.class);
+
+    protected static class TruePredicate implements Predicate<BookieNode> {
+
+        public static final TruePredicate instance = new TruePredicate();
+
+        @Override
+        public boolean apply(BookieNode candidate, Ensemble chosenNodes) {
+            return true;
+        }
+
+    }
+
+    protected static class EnsembleForReplacementWithNoConstraints implements Ensemble<BookieNode> {
+
+        public static final EnsembleForReplacementWithNoConstraints instance = new EnsembleForReplacementWithNoConstraints();
+        static final ArrayList<BookieSocketAddress> EMPTY_LIST = new ArrayList<BookieSocketAddress>(0);
+
+        @Override
+        public boolean addNode(BookieNode node) {
+            // do nothing
+            return true;
+        }
+
+        @Override
+        public ArrayList<BookieSocketAddress> toList() {
+            return EMPTY_LIST;
+        }
+
+        /**
+         * Validates if an ensemble is valid
+         *
+         * @return true if the ensemble is valid; false otherwise
+         */
+        @Override
+        public boolean validate() {
+            return true;
+        }
+
+    }
+
+    protected static class BookieNode extends NodeBase {
+
+        private final BookieSocketAddress addr; // identifier of a bookie node.
+
+        BookieNode(BookieSocketAddress addr, String networkLoc) {
+            super(addr.toString(), networkLoc);
+            this.addr = addr;
+        }
+
+        public BookieSocketAddress getAddr() {
+            return addr;
+        }
+
+        @Override
+        public int hashCode() {
+            return name.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof BookieNode)) {
+                return false;
+            }
+            BookieNode other = (BookieNode) obj;
+            return getName().equals(other.getName());
+        }
+
+        @Override
+        public String toString() {
+            return String.format("<Bookie:%s>", name);
+        }
+
+    }
+
+    /**
+     * A predicate checking the rack coverage for write quorum in {@link RoundRobinDistributionSchedule},
+     * which ensures that a write quorum should be covered by at least two racks.
+     */
+    protected static class RRTopologyAwareCoverageEnsemble implements Predicate<BookieNode>, Ensemble<BookieNode> {
+
+        protected interface CoverageSet {
+            boolean apply(BookieNode candidate);
+            void addBookie(BookieNode candidate);
+            public CoverageSet duplicate();
+        }
+
+        protected class RackQuorumCoverageSet implements CoverageSet {
+            HashSet<String> racksOrRegionsInQuorum = new HashSet<String>();
+            int seenBookies = 0;
+
+            @Override
+            public boolean apply(BookieNode candidate) {
+                // If we don't have sufficient members in the write quorum; then we cant enforce
+                // rack/region diversity
+                if (writeQuorumSize < 2) {
+                    return true;
+                }
+
+                if (seenBookies + 1 == writeQuorumSize) {
+                    return racksOrRegionsInQuorum.size() > (racksOrRegionsInQuorum.contains(candidate.getNetworkLocation(distanceFromLeaves)) ? 1 : 0);
+                }
+                return true;
+            }
+
+            @Override
+            public void addBookie(BookieNode candidate) {
+                ++seenBookies;
+                racksOrRegionsInQuorum.add(candidate.getNetworkLocation(distanceFromLeaves));
+            }
+
+            @Override
+            public RackQuorumCoverageSet duplicate() {
+                RackQuorumCoverageSet ret = new RackQuorumCoverageSet();
+                ret.racksOrRegionsInQuorum = Sets.newHashSet(this.racksOrRegionsInQuorum);
+                ret.seenBookies = this.seenBookies;
+                return ret;
+            }
+        }
+
+        protected class RackOrRegionDurabilityCoverageSet implements CoverageSet {
+            HashMap<String, Integer> allocationToRacksOrRegions = new HashMap<String, Integer>();
+
+            RackOrRegionDurabilityCoverageSet() {
+                for (String rackOrRegion: racksOrRegions) {
+                    allocationToRacksOrRegions.put(rackOrRegion, 0);
+                }
+            }
+
+            @Override
+            public RackOrRegionDurabilityCoverageSet duplicate() {
+                RackOrRegionDurabilityCoverageSet ret = new RackOrRegionDurabilityCoverageSet();
+                ret.allocationToRacksOrRegions = Maps.newHashMap(this.allocationToRacksOrRegions);
+                return ret;
+            }
+
+            private boolean checkSumOfSubsetWithinLimit(final Set<String> includedRacksOrRegions,
+                            final Set<String> remainingRacksOrRegions,
+                            int subsetSize,
+                            int maxAllowedSum) {
+                if (remainingRacksOrRegions.isEmpty() || (subsetSize <= 0)) {
+                    if (maxAllowedSum < 0) {
+                        LOG.trace("CHECK FAILED: RacksOrRegions Included {} Remaining {}, subsetSize {}, maxAllowedSum {}", new Object[]{
+                            includedRacksOrRegions, remainingRacksOrRegions, subsetSize, maxAllowedSum
+                        });
+                    }
+                    return (maxAllowedSum >= 0);
+                }
+
+                for(String rackOrRegion: remainingRacksOrRegions) {
+                    Integer currentAllocation = allocationToRacksOrRegions.get(rackOrRegion);
+                    if (currentAllocation == null) {
+                        allocationToRacksOrRegions.put(rackOrRegion, 0);
+                        currentAllocation = 0;
+                    }
+
+                    if (currentAllocation > maxAllowedSum) {
+                        LOG.trace("CHECK FAILED: RacksOrRegions Included {} Candidate {}, subsetSize {}, maxAllowedSum {}", new Object[]{
+                            includedRacksOrRegions, rackOrRegion, subsetSize, maxAllowedSum
+                        });
+                        return false;
+                    } else {
+                        Set<String> remainingElements = new HashSet<String>(remainingRacksOrRegions);
+                        Set<String> includedElements = new HashSet<String>(includedRacksOrRegions);
+                        includedElements.add(rackOrRegion);
+                        remainingElements.remove(rackOrRegion);
+                        if (!checkSumOfSubsetWithinLimit(includedElements,
+                            remainingElements,
+                            subsetSize - 1,
+                            maxAllowedSum - currentAllocation)) {
+                            return false;
+                        }
+                    }
+                }
+
+                return true;
+            }
+
+            @Override
+            public boolean apply(BookieNode candidate) {
+                if (minRacksOrRegionsForDurability <= 1) {
+                    return true;
+                }
+
+                String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves);
+                candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR) ? candidateRackOrRegion.substring(1) : candidateRackOrRegion;
+                final Set<String> remainingRacksOrRegions = new HashSet<String>(racksOrRegions);
+                remainingRacksOrRegions.remove(candidateRackOrRegion);
+                final Set<String> includedRacksOrRegions = new HashSet<String>();
+                includedRacksOrRegions.add(candidateRackOrRegion);
+
+                // If minRacksOrRegionsForDurability are required for durability; we must ensure that
+                // no subset of (minRacksOrRegionsForDurability - 1) regions have ackQuorumSize
+                // We are only modifying candidateRackOrRegion if we accept this bookie, so lets only
+                // find sets that contain this candidateRackOrRegion
+                Integer currentAllocation = allocationToRacksOrRegions.get(candidateRackOrRegion);
+                if (currentAllocation == null) {
+                    LOG.info("Detected a region that was not initialized {}", candidateRackOrRegion);
+                    if (candidateRackOrRegion.equals(NetworkTopology.DEFAULT_REGION)) {
+                        LOG.error("Failed to resolve network location {}", candidate);
+                    } else if (!racksOrRegions.contains(candidateRackOrRegion)) {
+                        LOG.error("Unknown region detected {}", candidateRackOrRegion);
+                    }
+                    allocationToRacksOrRegions.put(candidateRackOrRegion, 0);
+                    currentAllocation = 0;
+                }
+
+                int inclusiveLimit = (ackQuorumSize - 1) - (currentAllocation + 1);
+                return checkSumOfSubsetWithinLimit(includedRacksOrRegions,
+                        remainingRacksOrRegions, minRacksOrRegionsForDurability - 2, inclusiveLimit);
+            }
+
+            @Override
+            public void addBookie(BookieNode candidate) {
+                String candidateRackOrRegion = candidate.getNetworkLocation(distanceFromLeaves);
+                candidateRackOrRegion = candidateRackOrRegion.startsWith(NodeBase.PATH_SEPARATOR_STR) ? candidateRackOrRegion.substring(1) : candidateRackOrRegion;
+                int oldCount = 0;
+                if (null != allocationToRacksOrRegions.get(candidateRackOrRegion)) {
+                    oldCount = allocationToRacksOrRegions.get(candidateRackOrRegion);
+                }
+                allocationToRacksOrRegions.put(candidateRackOrRegion, oldCount + 1);
+            }
+        }
+
+
+
+        final int distanceFromLeaves;
+        final int ensembleSize;
+        final int writeQuorumSize;
+        final int ackQuorumSize;
+        final int minRacksOrRegionsForDurability;
+        final ArrayList<BookieNode> chosenNodes;
+        final Set<String> racksOrRegions;
+        private final CoverageSet[] quorums;
+        final Predicate<BookieNode> parentPredicate;
+        final Ensemble<BookieNode> parentEnsemble;
+
+        protected RRTopologyAwareCoverageEnsemble(RRTopologyAwareCoverageEnsemble that) {
+            this.distanceFromLeaves = that.distanceFromLeaves;
+            this.ensembleSize = that.ensembleSize;
+            this.writeQuorumSize = that.writeQuorumSize;
+            this.ackQuorumSize = that.ackQuorumSize;
+            this.chosenNodes = Lists.newArrayList(that.chosenNodes);
+            this.quorums = new CoverageSet[that.quorums.length];
+            for (int i = 0; i < that.quorums.length; i++) {
+                if (null != that.quorums[i]) {
+                    this.quorums[i] = that.quorums[i].duplicate();
+                } else {
+                    this.quorums[i] = null;
+                }
+            }
+            this.parentPredicate = that.parentPredicate;
+            this.parentEnsemble = that.parentEnsemble;
+            if (null != that.racksOrRegions) {
+                this.racksOrRegions = new HashSet<String>(that.racksOrRegions);
+            } else {
+                this.racksOrRegions = null;
+            }
+            this.minRacksOrRegionsForDurability = that.minRacksOrRegionsForDurability;
+        }
+
+        protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
+                                                  int writeQuorumSize,
+                                                  int ackQuorumSize,
+                                                  int distanceFromLeaves,
+                                                  Set<String> racksOrRegions,
+                                                  int minRacksOrRegionsForDurability) {
+            this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, null, null,
+                 racksOrRegions, minRacksOrRegionsForDurability);
+        }
+
+        protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
+                                                  int writeQuorumSize,
+                                                  int ackQuorumSize,
+                                                  int distanceFromLeaves,
+                                                  Ensemble<BookieNode> parentEnsemble,
+                                                  Predicate<BookieNode> parentPredicate) {
+            this(ensembleSize, writeQuorumSize, ackQuorumSize, distanceFromLeaves, parentEnsemble, parentPredicate,
+                 null, 0);
+        }
+
+        protected RRTopologyAwareCoverageEnsemble(int ensembleSize,
+                                                  int writeQuorumSize,
+                                                  int ackQuorumSize,
+                                                  int distanceFromLeaves,
+                                                  Ensemble<BookieNode> parentEnsemble,
+                                                  Predicate<BookieNode> parentPredicate,
+                                                  Set<String> racksOrRegions,
+                                                  int minRacksOrRegionsForDurability) {
+            this.ensembleSize = ensembleSize;
+            this.writeQuorumSize = writeQuorumSize;
+            this.ackQuorumSize = ackQuorumSize;
+            this.distanceFromLeaves = distanceFromLeaves;
+            this.chosenNodes = new ArrayList<BookieNode>(ensembleSize);
+            if (minRacksOrRegionsForDurability > 0) {
+                this.quorums = new RackOrRegionDurabilityCoverageSet[ensembleSize];
+            } else {
+                this.quorums = new RackQuorumCoverageSet[ensembleSize];
+            }
+            this.parentEnsemble = parentEnsemble;
+            this.parentPredicate = parentPredicate;
+            this.racksOrRegions = racksOrRegions;
+            this.minRacksOrRegionsForDurability = minRacksOrRegionsForDurability;
+        }
+
+        @Override
+        public boolean apply(BookieNode candidate, Ensemble<BookieNode> ensemble) {
+            if (ensemble != this) {
+                return false;
+            }
+
+            // An ensemble cannot contain the same node twice
+            if (chosenNodes.contains(candidate)) {
+                return false;
+            }
+
+            // candidate position
+            if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) {
+                if (null == quorums[0]) {
+                    quorums[0] = new RackOrRegionDurabilityCoverageSet();
+                }
+                if (!quorums[0].apply(candidate)) {
+                    return false;
+                }
+            } else {
+                int candidatePos = chosenNodes.size();
+                int startPos = candidatePos - writeQuorumSize + 1;
+                for (int i = startPos; i <= candidatePos; i++) {
+                    int idx = (i + ensembleSize) % ensembleSize;
+                    if (null == quorums[idx]) {
+                        if (minRacksOrRegionsForDurability > 0) {
+                            quorums[idx] = new RackOrRegionDurabilityCoverageSet();
+                        } else {
+                            quorums[idx] = new RackQuorumCoverageSet();
+                        }
+                    }
+                    if (!quorums[idx].apply(candidate)) {
+                        return false;
+                    }
+                }
+            }
+
+            return ((null == parentPredicate) || parentPredicate.apply(candidate, parentEnsemble));
+        }
+
+        @Override
+        public boolean addNode(BookieNode node) {
+            // An ensemble cannot contain the same node twice
+            if (chosenNodes.contains(node)) {
+                return false;
+            }
+
+            if ((ensembleSize == writeQuorumSize) && (minRacksOrRegionsForDurability > 0)) {
+                if (null == quorums[0]) {
+                    quorums[0] = new RackOrRegionDurabilityCoverageSet();
+                }
+                quorums[0].addBookie(node);
+            } else {
+                int candidatePos = chosenNodes.size();
+                int startPos = candidatePos - writeQuorumSize + 1;
+                for (int i = startPos; i <= candidatePos; i++) {
+                    int idx = (i + ensembleSize) % ensembleSize;
+                    if (null == quorums[idx]) {
+                        if (minRacksOrRegionsForDurability > 0) {
+                            quorums[idx] = new RackOrRegionDurabilityCoverageSet();
+                        } else {
+                            quorums[idx] = new RackQuorumCoverageSet();
+                        }
+                    }
+                    quorums[idx].addBookie(node);
+                }
+            }
+            chosenNodes.add(node);
+
+            return ((null == parentEnsemble) || parentEnsemble.addNode(node));
+        }
+
+        @Override
+        public ArrayList<BookieSocketAddress> toList() {
+            ArrayList<BookieSocketAddress> addresses = new ArrayList<BookieSocketAddress>(ensembleSize);
+            for (BookieNode bn : chosenNodes) {
+                addresses.add(bn.getAddr());
+            }
+            return addresses;
+        }
+
+        /**
+         * Validates if an ensemble is valid
+         *
+         * @return true if the ensemble is valid; false otherwise
+         */
+        @Override
+        public boolean validate() {
+            HashSet<BookieSocketAddress> addresses = new HashSet<BookieSocketAddress>(ensembleSize);
+            HashSet<String> racksOrRegions = new HashSet<String>();
+            for (BookieNode bn : chosenNodes) {
+                if (addresses.contains(bn.getAddr())) {
+                    return false;
+                }
+                addresses.add(bn.getAddr());
+                racksOrRegions.add(bn.getNetworkLocation(distanceFromLeaves));
+            }
+
+            return ((minRacksOrRegionsForDurability == 0) ||
+                    (racksOrRegions.size() >= minRacksOrRegionsForDurability));
+        }
+
+        @Override
+        public String toString() {
+            return chosenNodes.toString();
+        }
+    }
+
+    @Override
+    public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        return writeSet;
+    }
+
+    @Override
+    public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
+        List<Integer> retList = new ArrayList<Integer>(reorderReadSequence(ensemble, writeSet, bookieFailureHistory));
+        if (retList.size() < ensemble.size()) {
+            for (int i = 0; i < ensemble.size(); i++) {
+                if (!retList.contains(i)) {
+                    retList.add(i);
+                }
+            }
+        }
+        return retList;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index 57c3790..8e76bb7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -66,6 +66,8 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String TIMEOUT_TASK_INTERVAL_MILLIS = "timeoutTaskIntervalMillis";
     protected final static String PCBC_TIMEOUT_TIMER_TICK_DURATION_MS = "pcbcTimeoutTimerTickDurationMs";
     protected final static String PCBC_TIMEOUT_TIMER_NUM_TICKS = "pcbcTimeoutTimerNumTicks";
+    protected final static String TIMEOUT_TIMER_TICK_DURATION_MS = "timeoutTimerTickDurationMs";
+    protected final static String TIMEOUT_TIMER_NUM_TICKS = "timeoutTimerNumTicks";
 
     // Bookie health check settings
     protected final static String BOOKIE_HEALTH_CHECK_ENABLED = "bookieHealthCheckEnabled";
@@ -78,6 +80,7 @@ public class ClientConfiguration extends AbstractConfiguration {
 
     // Ensemble Placement Policy
     protected final static String ENSEMBLE_PLACEMENT_POLICY = "ensemblePlacementPolicy";
+    protected final static String NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS = "networkTopologyStabilizePeriodSeconds";
 
     // Stats
     protected final static String ENABLE_TASK_EXECUTION_STATS = "enableTaskExecutionStats";
@@ -329,6 +332,48 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the tick duration in milliseconds that used for timeout timer.
+     *
+     * @return tick duration in milliseconds
+     */
+    public long getTimeoutTimerTickDurationMs() {
+        return getLong(TIMEOUT_TIMER_TICK_DURATION_MS, 100);
+    }
+
+    /**
+     * Set the tick duration in milliseconds that used for timeout timer.
+     *
+     * @param tickDuration
+     *          tick duration in milliseconds.
+     * @return client configuration.
+     */
+    public ClientConfiguration setTimeoutTimerTickDurationMs(long tickDuration) {
+        setProperty(TIMEOUT_TIMER_TICK_DURATION_MS, tickDuration);
+        return this;
+    }
+
+    /**
+     * Get number of ticks that used for timeout timer.
+     *
+     * @return number of ticks that used for timeout timer.
+     */
+    public int getTimeoutTimerNumTicks() {
+        return getInt(TIMEOUT_TIMER_NUM_TICKS, 1024);
+    }
+
+    /**
+     * Set number of ticks that used for timeout timer.
+     *
+     * @param numTicks
+     *          number of ticks that used for timeout timer.
+     * @return client configuration.
+     */
+    public ClientConfiguration setTimeoutTimerNumTicks(int numTicks) {
+        setProperty(TIMEOUT_TIMER_NUM_TICKS, numTicks);
+        return this;
+    }
+
+    /**
      * Get client netty connect timeout in millis.
      *
      * @return client netty connect timeout in millis.
@@ -666,10 +711,10 @@ public class ClientConfiguration extends AbstractConfiguration {
      * @return ensemble placement policy class.
      */
     public Class<? extends EnsemblePlacementPolicy> getEnsemblePlacementPolicy()
-        throws ConfigurationException {
+            throws ConfigurationException {
         return ReflectionUtils.getClass(this, ENSEMBLE_PLACEMENT_POLICY,
-                                        RackawareEnsemblePlacementPolicy.class,
-                                        EnsemblePlacementPolicy.class,
+                RackawareEnsemblePlacementPolicy.class,
+                EnsemblePlacementPolicy.class,
                                         defaultLoader);
     }
 
@@ -685,6 +730,27 @@ public class ClientConfiguration extends AbstractConfiguration {
     }
 
     /**
+     * Get the network topology stabilize period in seconds. if it is zero, this feature is turned off.
+     *
+     * @return network topology stabilize period in seconds.
+     */
+    public int getNetworkTopologyStabilizePeriodSeconds() {
+        return getInt(NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS, 0);
+    }
+
+    /**
+     * Set the network topology stabilize period in seconds.
+     *
+     * @see #getNetworkTopologyStabilizePeriodSeconds()
+     * @param seconds stabilize period in seconds
+     * @return client configuration.
+     */
+    public ClientConfiguration setNetworkTopologyStabilizePeriodSeconds(int seconds) {
+        setProperty(NETWORK_TOPOLOGY_STABILIZE_PERIOD_SECONDS, seconds);
+        return this;
+    }
+
+    /**
      * Whether to enable recording task execution stats.
      *
      * @return flag to enable/disable recording task execution stats.

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
index cba1f7e..99ed038 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/AbstractDNSToSwitchMapping.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
+
 package org.apache.bookkeeper.net;
 
 import java.util.HashSet;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
index eb0f6f3..8947abf 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/BookieSocketAddress.java
@@ -63,7 +63,7 @@ public class BookieSocketAddress {
     }
 
     // Public getters
-    public String getHostname() {
+    public String getHostName() {
         return hostname;
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
index 96acbc2..d7ff251 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/CachedDNSToSwitchMapping.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
 package org.apache.bookkeeper.net;
 
 import java.util.ArrayList;

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
index a5dce93..d09e422 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNS.java
@@ -69,6 +69,10 @@ public class DNS {
         // This is formed by reversing the IP numbers and appending in-addr.arpa
         //
         String[] parts = hostIp.getHostAddress().split("\\.");
+        if(parts.length !=4) {
+            //Not proper address. May be IPv6
+            throw new NamingException("IPV6");
+        }
         String reverseIP = parts[3] + "." + parts[2] + "." + parts[1] + "."
                 + parts[0] + ".in-addr.arpa";
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/bbd1eb8d/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
index 35f9a36..6156993 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/net/DNSToSwitchMapping.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-// This code has been copied from hadoop-common 0.23.1
 package org.apache.bookkeeper.net;
 
 import java.util.List;


Mime
View raw message