bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject [3/3] bookkeeper git commit: BOOKKEEPER-950: Ledger placement policy to accomodate different storage capacity of bookies
Date Tue, 28 Mar 2017 20:35:35 GMT
BOOKKEEPER-950: Ledger placement policy to accomodate different storage capacity of bookies

…ge capacity of bookies

This change introduces Disk weight based ledger placement. Currently free disk space is the only supported
weight for a bookie. This change also introduces a new protocol message between bk client and server
called GET_BOOKIE_INFO. This message is used by the client to retrieve the free disk space info from
all the bookies. The existing placement policies: DefaultPlacementPolicy and RackAwarePlacementPolicy
have been enhanced to make use of the weight while selecting bookies. New test cases have been added to
test RackawarePlacement with weights. A new test class has been added to test the weight based selection
algorithm in a stand alone fashion.

Author: Rithin <rithin.shetty@salesforce.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

Closes #93 from rithin-shetty/weightBasedPlacementDec13


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/0583175d
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/0583175d
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/0583175d

Branch: refs/heads/master
Commit: 0583175de72446d00088611000310b000e8e61df
Parents: 1e4ccaf
Author: Rithin <rithin.shetty@salesforce.com>
Authored: Tue Mar 28 13:35:30 2017 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Tue Mar 28 13:35:30 2017 -0700

----------------------------------------------------------------------
 .../bookie/BookKeeperServerStats.java           |    1 +
 .../org/apache/bookkeeper/bookie/Bookie.java    |    8 +
 .../apache/bookkeeper/bookie/BookieShell.java   |   79 +
 .../bookkeeper/bookie/LedgerDirsManager.java    |   28 +
 .../LocalBookieEnsemblePlacementPolicy.java     |   12 +-
 .../apache/bookkeeper/client/BookKeeper.java    |   38 +-
 .../client/BookKeeperClientStats.java           |    2 +
 .../bookkeeper/client/BookieInfoReader.java     |  261 +
 .../apache/bookkeeper/client/BookieWatcher.java |    4 +
 .../client/DefaultEnsemblePlacementPolicy.java  |  112 +-
 .../client/EnsemblePlacementPolicy.java         |   10 +
 .../RackawareEnsemblePlacementPolicy.java       |   15 +-
 .../RackawareEnsemblePlacementPolicyImpl.java   |  177 +-
 .../RegionAwareEnsemblePlacementPolicy.java     |    7 +-
 .../client/WeightedRandomSelection.java         |  156 +
 .../bookkeeper/conf/ClientConfiguration.java    |   84 +-
 .../apache/bookkeeper/proto/BookieClient.java   |   39 +-
 .../proto/BookieRequestProcessor.java           |   16 +-
 .../proto/BookkeeperInternalCallbacks.java      |    5 +
 .../bookkeeper/proto/BookkeeperProtocol.java    | 6022 +++++++++++-------
 .../proto/GetBookieInfoProcessorV3.java         |   90 +
 .../proto/PerChannelBookieClient.java           |  164 +-
 .../src/main/proto/BookkeeperProtocol.proto     |   18 +
 ...perDiskSpaceWeightedLedgerPlacementTest.java |  452 ++
 .../client/TestGetBookieInfoTimeout.java        |  141 +
 .../TestRackawareEnsemblePlacementPolicy.java   |  248 +
 .../client/TestWeightedRandomSelection.java     |  280 +
 .../bookkeeper/test/BookieClientTest.java       |   50 +
 28 files changed, 6210 insertions(+), 2309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
index 9f1dbbb..99a2db1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookKeeperServerStats.java
@@ -37,6 +37,7 @@ public interface BookKeeperServerStats {
     public final static String READ_ENTRY_FENCE_READ = "READ_ENTRY_FENCE_READ";
     public final static String WRITE_LAC = "WRITE_LAC";
     public final static String READ_LAC = "READ_LAC";
+    public final static String GET_BOOKIE_INFO = "GET_BOOKIE_INFO";
 
     // Bookie Operations
     public final static String BOOKIE_ADD_ENTRY_BYTES = "BOOKIE_ADD_ENTRY_BYTES";

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index b3e0ed3..0338db7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -622,6 +622,14 @@ public class Bookie extends BookieCriticalThread {
         return indexDirsManager;
     }
 
+    public long getTotalDiskSpace() {
+        return getLedgerDirsManager().getTotalDiskSpace();
+    }
+
+    public long getTotalFreeSpace() {
+        return getLedgerDirsManager().getTotalFreeSpace();
+    }
+
     public static File getCurrentDirectory(File dir) {
         return new File(dir, BookKeeperConstants.CURRENT_DIR);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 9afc5d7..1400fcb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -24,12 +24,14 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.BasicFileAttributes;
 import java.nio.file.attribute.FileTime;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -41,6 +43,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
@@ -48,6 +51,8 @@ import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.Journal;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieInfoReader;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
@@ -63,7 +68,10 @@ import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
 import org.apache.bookkeeper.meta.LedgerManagerFactory;
 import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
 import org.apache.bookkeeper.replication.AuditorElector;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.IOUtils;
@@ -78,6 +86,7 @@ import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.MissingArgumentException;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.CompositeConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.PropertiesConfiguration;
@@ -126,6 +135,7 @@ public class BookieShell implements Tool {
     static final String CMD_EXPANDSTORAGE = "expandstorage";
     static final String CMD_UPDATELEDGER = "updateledgers";
     static final String CMD_DELETELEDGER = "deleteledger";
+    static final String CMD_BOOKIEINFO = "bookieinfo";
     static final String CMD_HELP = "help";
 
     final ServerConfiguration bkConf = new ServerConfiguration();
@@ -1752,6 +1762,74 @@ public class BookieShell implements Tool {
         }
     }
 
+    /*
+     * Command to retrieve bookie information like free disk space, etc from all
+     * the bookies in the cluster.
+     */
+    class BookieInfoCmd extends MyCommand {
+        Options lOpts = new Options();
+
+        BookieInfoCmd() {
+            super(CMD_BOOKIEINFO);
+        }
+
+        @Override
+        String getDescription() {
+            return "Retrieve bookie info such as free and total disk space";
+        }
+
+        @Override
+        String getUsage() {
+            return "bookieinfo";
+        }
+
+        @Override
+        Options getOptions() {
+            return lOpts;
+        }
+
+        String getReadable(long val) {
+            String unit[] = {"", "KB", "MB", "GB", "TB" };
+            int cnt = 0;
+            double d = val;
+            while (d >= 1000 && cnt < unit.length-1) {
+                d = d/1000;
+                cnt++;
+            }
+            DecimalFormat df = new DecimalFormat("#.###");
+            df.setRoundingMode(RoundingMode.DOWN);
+            return cnt > 0 ? "(" + df.format(d) + unit[cnt] + ")" : unit[cnt];
+        }
+
+        @Override
+        public int runCmd(CommandLine cmdLine) throws Exception {
+            ClientConfiguration clientConf = new ClientConfiguration(bkConf);
+            clientConf.setDiskWeightBasedPlacementEnabled(true);
+            BookKeeper bk = new BookKeeper(clientConf);
+
+            Map<BookieSocketAddress, BookieInfo> map = bk.getBookieInfo();
+            if (map.size() == 0) {
+                System.out.println("Failed to retrieve bookie information from any of the bookies");
+                bk.close();
+                return 0;
+            }
+
+            System.out.println("Free disk space info:");
+            long totalFree = 0, total=0;
+            for (Map.Entry<BookieSocketAddress, BookieInfo> e : map.entrySet()) {
+                BookieInfo bInfo = e.getValue();
+                System.out.println(e.getKey() + ":\tFree: " + bInfo.getFreeDiskSpace() +  getReadable(bInfo.getFreeDiskSpace()) +
+                        "\tTotal: " + bInfo.getTotalDiskSpace() +  getReadable(bInfo.getTotalDiskSpace()));
+                totalFree += bInfo.getFreeDiskSpace();
+                total += bInfo.getTotalDiskSpace();
+            }
+            System.out.println("Total free disk space in the cluster:\t" + totalFree + getReadable(totalFree));
+            System.out.println("Total disk capacity in the cluster:\t" + total + getReadable(total));
+            bk.close();
+            return 0;
+        }
+    }
+
     /**
      * A facility for reporting update ledger progress.
      */
@@ -1782,6 +1860,7 @@ public class BookieShell implements Tool {
         commands.put(CMD_EXPANDSTORAGE, new ExpandStorageCmd());
         commands.put(CMD_UPDATELEDGER, new UpdateLedgerCmd());
         commands.put(CMD_DELETELEDGER, new DeleteLedgerCmd());
+        commands.put(CMD_BOOKIEINFO, new BookieInfoCmd());
         commands.put(CMD_HELP, new HelpCmd());
     }
 

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 52836f7..8f65c6b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -119,6 +119,34 @@ public class LedgerDirsManager {
     }
 
     /**
+     * Calculate the total amount of free space available
+     * in all of the ledger directories put together.
+     *
+     * @return totalDiskSpace in bytes
+     */
+    public long getTotalFreeSpace() {
+        long totalFreeSpace = 0;
+        for (File dir: this.ledgerDirectories) {
+            totalFreeSpace += dir.getFreeSpace();
+        }
+        return totalFreeSpace;
+    }
+
+    /**
+     * Calculate the total amount of free space available
+     * in all of the ledger directories put together.
+     *
+     * @return freeDiskSpace in bytes
+     */
+    public long getTotalDiskSpace() {
+        long totalDiskSpace = 0;
+        for (File dir: this.ledgerDirectories) {
+            totalDiskSpace += dir.getTotalSpace();
+        }
+        return totalDiskSpace;
+    }
+
+    /**
      * Get only writable ledger dirs.
      */
     public List<File> getWritableLedgerDirs()

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
index e7bfe94..00ac0d0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LocalBookieEnsemblePlacementPolicy.java
@@ -23,20 +23,22 @@ import java.net.UnknownHostException;
 import java.util.*;
 
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
-import org.jboss.netty.util.HashedWheelTimer;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.configuration.Configuration;
+import com.google.common.collect.Lists;
+import org.jboss.netty.util.HashedWheelTimer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
 
 /**
  * Special ensemble placement policy that always return local bookie. Only works with ledgers with ensemble=1.
@@ -100,4 +102,8 @@ public class LocalBookieEnsemblePlacementPolicy implements EnsemblePlacementPoli
         return Lists.newArrayList(bookieAddress);
     }
 
+    @Override
+    public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieToFreeSpaceMap) {
+        return;
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
index 409a138..a42db17 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java
@@ -36,6 +36,7 @@ import org.apache.bookkeeper.client.AsyncCallback.CreateCallback;
 import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
 import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeatureProvider;
@@ -65,6 +66,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -114,6 +116,7 @@ public class BookKeeper implements AutoCloseable {
     final HashedWheelTimer requestTimer;
     final boolean ownTimer;
     final FeatureProvider featureProvider;
+    ScheduledExecutorService bookieInfoScheduler;
 
     // Ledger manager responsible for how to store ledger meta data
     final LedgerManagerFactory ledgerManagerFactory;
@@ -122,6 +125,7 @@ public class BookKeeper implements AutoCloseable {
 
     // Ensemble Placement Policy
     final EnsemblePlacementPolicy placementPolicy;
+    BookieInfoReader bookieInfoReader;
 
     final ClientConfiguration conf;
     final int explicitLacInterval;
@@ -363,7 +367,19 @@ public class BookKeeper implements AutoCloseable {
         // initialize bookie client
         this.bookieClient = new BookieClient(conf, this.channelFactory, this.mainWorkerPool, statsLogger);
         this.bookieWatcher = new BookieWatcher(conf, this.scheduler, this.placementPolicy, this);
-        this.bookieWatcher.readBookiesBlocking();
+        if (conf.getDiskWeightBasedPlacementEnabled()) {
+            LOG.info("Weighted ledger placement enabled");
+            ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder()
+                    .setNameFormat("BKClientMetaDataPollScheduler-%d");
+            this.bookieInfoScheduler = Executors.newSingleThreadScheduledExecutor(tFBuilder.build());
+            this.bookieInfoReader = new BookieInfoReader(this, conf, this.bookieInfoScheduler);
+            this.bookieWatcher.readBookiesBlocking();
+            this.bookieInfoReader.start();
+        } else {
+            LOG.info("Weighted ledger placement is not enabled");
+            this.bookieInfoReader = new BookieInfoReader(this, conf, null);
+            this.bookieWatcher.readBookiesBlocking();
+        }
 
         // initialize ledger manager
         this.ledgerManagerFactory = LedgerManagerFactory.newLedgerManagerFactory(conf, this.zk);
@@ -467,6 +483,20 @@ public class BookKeeper implements AutoCloseable {
     }
 
     /**
+     * Retrieves BookieInfo from all the bookies in the cluster. It sends requests
+     * to all the bookies in parallel and returns the info from the bookies that responded.
+     * If there was an error in reading from any bookie, nothing will be returned for
+     * that bookie in the map.
+     * @return map
+     *             A map of bookieSocketAddress to its BookiInfo
+     * @throws BKException
+     * @throws InterruptedException
+     */
+    public Map<BookieSocketAddress, BookieInfo> getBookieInfo() throws BKException, InterruptedException {
+        return bookieInfoReader.getBookieInfo();
+    }
+
+    /**
      * Creates a new ledger asynchronously. To create a ledger, we need to specify
      * the ensemble size, the quorum size, the digest type, a password, a callback
      * implementation, and an optional control object. The ensemble size is how
@@ -1138,6 +1168,12 @@ public class BookKeeper implements AutoCloseable {
         if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
             LOG.warn("The mainWorkerPool did not shutdown cleanly");
         }
+        if (this.bookieInfoScheduler != null) {
+            this.bookieInfoScheduler.shutdown();
+            if (!bookieInfoScheduler.awaitTermination(10, TimeUnit.SECONDS)) {
+                LOG.warn("The bookieInfoScheduler did not shutdown cleanly");
+            }
+        }
 
         if (ownTimer) {
             requestTimer.stop();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
index a020425..dc193a7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeperClientStats.java
@@ -34,6 +34,7 @@ public interface BookKeeperClientStats {
     public final static String ENSEMBLE_CHANGES = "NUM_ENSEMBLE_CHANGE";
     public final static String LAC_UPDATE_HITS = "LAC_UPDATE_HITS";
     public final static String LAC_UPDATE_MISSES = "LAC_UPDATE_MISSES";
+    public final static String GET_BOOKIE_INFO_OP = "GET_BOOKIE_INFO";
 
     // per channel stats
     public final static String CHANNEL_SCOPE = "per_channel_bookie_client";
@@ -46,4 +47,5 @@ public interface BookKeeperClientStats {
     public final static String CHANNEL_TIMEOUT_WRITE_LAC = "TIMEOUT_WRITE_LAC";
     public final static String CHANNEL_READ_LAC_OP = "READ_LAC";
     public final static String CHANNEL_TIMEOUT_READ_LAC = "TIMEOUT_READ_LAC";
+    public final static String TIMEOUT_GET_BOOKIE_INFO = "TIMEOUT_GET_BOOKIE_INFO";
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
new file mode 100644
index 0000000..7ef1b76
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieInfoReader.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.net.BookieSocketAddress;
+import org.apache.bookkeeper.proto.BookieClient;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BookieInfoReader {
+    private static final Logger LOG = LoggerFactory.getLogger(BookieInfoReader.class);
+    private final ScheduledExecutorService scheduler;
+    private final BookKeeper bk;
+    private final ClientConfiguration conf;
+    private ConcurrentMap<BookieSocketAddress, BookieInfo> bookieInfoMap = new ConcurrentHashMap<BookieSocketAddress, BookieInfo>();
+    private Collection<BookieSocketAddress> bookies;
+    private final AtomicInteger totalSent = new AtomicInteger();
+    private final AtomicInteger completedCnt = new AtomicInteger();
+    private final AtomicBoolean instanceRunning = new AtomicBoolean();
+    private final AtomicBoolean isQueued = new AtomicBoolean();
+    private final AtomicBoolean refreshBookieList = new AtomicBoolean();
+
+    public static class BookieInfo implements WeightedObject {
+        private final long freeDiskSpace;
+        private final long totalDiskSpace;
+        public BookieInfo() {
+            this(0L, 0L);
+        }
+        public BookieInfo(long totalDiskSpace, long freeDiskSpace) {
+            this.totalDiskSpace = totalDiskSpace;
+            this.freeDiskSpace = freeDiskSpace;
+        }
+        public long getFreeDiskSpace() {
+            return freeDiskSpace;
+        }
+        public long getTotalDiskSpace() {
+            return totalDiskSpace;
+        }
+        @Override
+        public long getWeight() {
+            return freeDiskSpace;
+        }
+        public String toString() {
+            return "FreeDiskSpace: " + this.freeDiskSpace + " TotalDiskCapacity: " + this.totalDiskSpace;
+        }
+    }
+
+    BookieInfoReader(BookKeeper bk,
+                          ClientConfiguration conf,
+                          ScheduledExecutorService scheduler) {
+        this.bk = bk;
+        this.conf = conf;
+        this.scheduler = scheduler;
+    }
+    void start() {
+        scheduler.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                LOG.debug("Running periodic BookieInfo scan");
+                getReadWriteBookieInfo(null);
+            }
+        }, 0, conf.getGetBookieInfoIntervalSeconds(), TimeUnit.SECONDS);
+    }
+    void submitTask(final Collection<BookieSocketAddress> newBookies) {
+        scheduler.submit(new Runnable() {
+            @Override
+            public void run() {
+                getReadWriteBookieInfo(newBookies);
+            }
+        });
+    }
+    void availableBookiesChanged(Set<BookieSocketAddress> newBookies) {
+        LOG.info("Scheduling bookie info read due to changes in available bookies.");
+        submitTask(newBookies);
+    }
+
+    /*
+     * This routine is responsible for issuing bookieInfoGet messages to all the read write bookies.
+     * instanceRunning will be true until we have sent the bookieInfoGet requests to
+     * all the readwrite bookies and have processed all the callbacks. Only then is it reset to
+     * false. At that time, if any pending tasks are queued, they are scheduled by the
+     * last callback processing task. isQueued variable is used to indicate the pending
+     * tasks. refreshBookieList is used to indicate that we need to read we need to explicitly
+     * retireve the bookies list from zk because we don't remember the bookie list for
+     * queued ops.
+     */
+    @SuppressWarnings("unchecked")
+    void getReadWriteBookieInfo(Collection<BookieSocketAddress> newBookiesList) {
+        if (instanceRunning.get() == false) {
+            instanceRunning.compareAndSet(false, true);
+        } else {
+            isQueued.set(true);
+            if (newBookiesList != null) {
+                refreshBookieList.set(true);
+            }
+            LOG.debug("Exiting due to running instance");
+            return;
+        }
+        Collection<BookieSocketAddress> deadBookies = null, joinedBookies=null;
+        if (newBookiesList == null) {
+            try {
+                if (this.bookies == null) {
+                    joinedBookies = this.bookies = bk.bookieWatcher.getBookies();
+                } else if (refreshBookieList.get()) {
+                    LOG.debug("Refreshing bookie list");
+                    newBookiesList = bk.bookieWatcher.getBookies();
+                    refreshBookieList.set(false);
+                } else {
+                    // the bookie list is already up to date, just retrieve their info
+                    joinedBookies = this.bookies;
+                }
+            } catch (BKException e) {
+                LOG.error("Unable to get the available bookies ", e);
+                onExit();
+                return;
+            }
+        }
+        if (newBookiesList != null) {
+            if (this.bookies != null) {
+                joinedBookies = CollectionUtils.subtract(newBookiesList, this.bookies);
+                deadBookies = CollectionUtils.subtract(this.bookies, newBookiesList);
+                for (BookieSocketAddress b : deadBookies) {
+                    bookieInfoMap.remove(b);
+                    this.bookies.remove(b);
+                }
+                for (BookieSocketAddress b : joinedBookies) {
+                    this.bookies.add(b);
+                }
+            } else {
+                joinedBookies = this.bookies = newBookiesList;
+            }
+        }
+
+        BookieClient bkc = bk.getBookieClient();
+        final long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE |
+                               BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
+        totalSent.set(0);
+        completedCnt.set(0);
+        LOG.debug("Getting bookie info for: {}", joinedBookies);
+        for (BookieSocketAddress b : joinedBookies) {
+            bkc.getBookieInfo(b, requested,
+                    new GetBookieInfoCallback() {
+                        void processReadInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
+                            BookieSocketAddress b = (BookieSocketAddress) ctx;
+                            if (rc != BKException.Code.OK) {
+                                LOG.error("Reading bookie info from bookie {} failed due to error: {}.", b, rc);
+                                // if there was data earlier, don't overwrite it
+                                // create a new one only if the key was missing
+                                bookieInfoMap.putIfAbsent(b, new BookieInfo());
+                            } else {
+                                LOG.debug("Bookie Info for bookie {} is {}", b, bInfo);
+                                bookieInfoMap.put(b, bInfo);
+                            }
+                            if (completedCnt.incrementAndGet() == totalSent.get()) {
+                                bk.placementPolicy.updateBookieInfo(bookieInfoMap);
+                                onExit();
+                            }
+                        }
+                        @Override
+                        public void getBookieInfoComplete(final int rc, final BookieInfo bInfo, final Object ctx) {
+                            scheduler.submit(
+                                new Runnable() {
+                                    @Override
+                                    public void run() {
+                                        processReadInfoComplete(rc, bInfo, ctx);
+                                    }
+                                });
+                        }
+                    }, b);
+            totalSent.incrementAndGet();
+        }
+        if (totalSent.get() == 0) {
+            if (deadBookies != null) {
+                // if no new bookies joined but some existing bookies went away
+                // we need to inform the placementPloicy
+                bk.placementPolicy.updateBookieInfo(bookieInfoMap);
+            }
+            onExit();
+        }
+    }
+
+    void onExit() {
+        if (isQueued.get()) {
+            LOG.debug("Scheduling a queued task");
+            submitTask(null);
+        }
+        isQueued.set(false);
+        instanceRunning.set(false);
+    }
+
+    Map<BookieSocketAddress, BookieInfo> getBookieInfo() throws BKException, InterruptedException {
+        BookieClient bkc = bk.getBookieClient();
+        final AtomicInteger totalSent = new AtomicInteger();
+        final AtomicInteger totalCompleted = new AtomicInteger();
+        final ConcurrentMap<BookieSocketAddress, BookieInfo> map = new ConcurrentHashMap<BookieSocketAddress, BookieInfo>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        long requested = BookkeeperProtocol.GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE |
+                         BookkeeperProtocol.GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE;
+
+        Collection<BookieSocketAddress> bookies;
+        bookies = bk.bookieWatcher.getBookies();
+        bookies.addAll(bk.bookieWatcher.getReadOnlyBookies());
+
+        totalSent.set(bookies.size());
+        for (BookieSocketAddress b : bookies) {
+            bkc.getBookieInfo(b, requested, new GetBookieInfoCallback() {
+                        @Override
+                        public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) {
+                            BookieSocketAddress b = (BookieSocketAddress) ctx;
+                            if (rc != BKException.Code.OK) {
+                                LOG.error("Reading bookie info from bookie {} failed due to error: {}.", b, rc);
+                            } else {
+                                LOG.debug("Free disk space on bookie {} is {}.", b, bInfo.getFreeDiskSpace());
+                                map.put(b, bInfo);
+                            }
+                            if (totalCompleted.incrementAndGet() == totalSent.get()) {
+                                latch.countDown();
+                            }
+                        }
+                    }, b);
+        }
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            LOG.error("Received InterruptedException ", e);
+            throw e;
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
index cec6920..04499eb 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookieWatcher.java
@@ -184,6 +184,10 @@ class BookieWatcher implements Watcher, ChildrenCallback {
         synchronized (this) {
             Set<BookieSocketAddress> readonlyBookies = readOnlyBookieWatcher.getReadOnlyBookies();
             placementPolicy.onClusterChanged(newBookieAddrs, readonlyBookies);
+            if (bk.conf.getDiskWeightBasedPlacementEnabled()) {
+                // start collecting bookieInfo for the newly joined bookies, if any
+                bk.bookieInfoReader.availableBookiesChanged(newBookieAddrs);
+            }
         }
 
         // we don't need to close clients here, because:

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
index 5a2c1f2..2b13a29 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DefaultEnsemblePlacementPolicy.java
@@ -24,25 +24,40 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.base.Optional;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.commons.collections.CollectionUtils;
 import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Default Ensemble Placement Policy, which picks bookies randomly
  */
 public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
-
+    static final Logger LOG = LoggerFactory.getLogger(DefaultEnsemblePlacementPolicy.class);
     static final Set<BookieSocketAddress> EMPTY_SET = new HashSet<BookieSocketAddress>();
 
+    private boolean isWeighted;
+    private int maxWeightMultiple;
     private Set<BookieSocketAddress> knownBookies = new HashSet<BookieSocketAddress>();
+    private Map<BookieSocketAddress, WeightedObject> bookieInfoMap;
+    private WeightedRandomSelection<BookieSocketAddress> weightedSelection;
+    private final ReentrantReadWriteLock rwLock;
+
+    DefaultEnsemblePlacementPolicy() {
+        rwLock = new ReentrantReadWriteLock();
+    }
 
     @Override
     public ArrayList<BookieSocketAddress> newEnsemble(int ensembleSize, int quorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Set<BookieSocketAddress> excludeBookies) throws BKNotEnoughBookiesException {
@@ -51,18 +66,43 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
             return newBookies;
         }
         List<BookieSocketAddress> allBookies;
-        synchronized (this) {
+        rwLock.readLock().lock();
+        try {
             allBookies = new ArrayList<BookieSocketAddress>(knownBookies);
+        } finally {
+            rwLock.readLock().unlock();
         }
-        Collections.shuffle(allBookies);
-        for (BookieSocketAddress bookie : allBookies) {
-            if (excludeBookies.contains(bookie)) {
-                continue;
+
+        if (isWeighted) {
+            // hold the readlock while selecting bookies. We don't want the list of bookies
+            // changing while we are creating the ensemble
+            rwLock.readLock().lock();
+            try {
+                if (CollectionUtils.subtract(allBookies, excludeBookies).size() < ensembleSize) {
+                    throw new BKNotEnoughBookiesException();
+                }
+                while (ensembleSize > 0) {
+                    BookieSocketAddress b = weightedSelection.getNextRandom();
+                    if (newBookies.contains(b) || excludeBookies.contains(b)) {
+                        continue;
+                    }
+                    newBookies.add(b);
+                    --ensembleSize;
+                }
+            } finally {
+                rwLock.readLock().unlock();
             }
-            newBookies.add(bookie);
-            --ensembleSize;
-            if (ensembleSize == 0) {
-                return newBookies;
+        } else {
+            Collections.shuffle(allBookies);
+            for (BookieSocketAddress bookie : allBookies) {
+                if (excludeBookies.contains(bookie)) {
+                    continue;
+                }
+                newBookies.add(bookie);
+                --ensembleSize;
+                if (ensembleSize == 0) {
+                    return newBookies;
+                }
             }
         }
         throw new BKNotEnoughBookiesException();
@@ -76,15 +116,33 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
     }
 
     @Override
-    public synchronized Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
+    public Set<BookieSocketAddress> onClusterChanged(Set<BookieSocketAddress> writableBookies,
             Set<BookieSocketAddress> readOnlyBookies) {
-        HashSet<BookieSocketAddress> deadBookies;
-        deadBookies = new HashSet<BookieSocketAddress>(knownBookies);
-        deadBookies.removeAll(writableBookies);
-        // readonly bookies should not be treated as dead bookies
-        deadBookies.removeAll(readOnlyBookies);
-        knownBookies = writableBookies;
-        return deadBookies;
+        rwLock.writeLock().lock();
+        try {
+            HashSet<BookieSocketAddress> deadBookies;
+            deadBookies = new HashSet<BookieSocketAddress>(knownBookies);
+            deadBookies.removeAll(writableBookies);
+            // readonly bookies should not be treated as dead bookies
+            deadBookies.removeAll(readOnlyBookies);
+            if (this.isWeighted) {
+                for (BookieSocketAddress b : deadBookies) {
+                    this.bookieInfoMap.remove(b);
+                }
+                @SuppressWarnings("unchecked")
+                Collection<BookieSocketAddress> newBookies = CollectionUtils.subtract(writableBookies, knownBookies);
+                for (BookieSocketAddress b : newBookies) {
+                    this.bookieInfoMap.put(b, new BookieInfo());
+                }
+                if (deadBookies.size() > 0 || newBookies.size() > 0) {
+                    this.weightedSelection.updateMap(this.bookieInfoMap);
+                }
+            }
+            knownBookies = writableBookies;
+            return deadBookies;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
     }
 
     @Override
@@ -111,10 +169,28 @@ public class DefaultEnsemblePlacementPolicy implements EnsemblePlacementPolicy {
                                               HashedWheelTimer timer,
                                               FeatureProvider featureProvider,
                                               StatsLogger statsLogger) {
+        this.isWeighted = conf.getDiskWeightBasedPlacementEnabled();
+        if (this.isWeighted) {
+            this.maxWeightMultiple = conf.getBookieMaxWeightMultipleForWeightBasedPlacement();
+            this.weightedSelection = new WeightedRandomSelection<BookieSocketAddress>(this.maxWeightMultiple);
+        }
         return this;
     }
 
     @Override
+    public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieInfoMap) {
+        rwLock.writeLock().lock();
+        try {
+            for (Map.Entry<BookieSocketAddress, BookieInfo> e : bookieInfoMap.entrySet()) {
+                this.bookieInfoMap.put(e.getKey(), e.getValue());
+            }
+            this.weightedSelection.updateMap(this.bookieInfoMap);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
     public void uninitalize() {
         // do nothing
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
index 4a0f307..d2e16e8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/EnsemblePlacementPolicy.java
@@ -26,6 +26,7 @@ import java.util.Set;
 import com.google.common.base.Optional;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.net.BookieSocketAddress;
@@ -135,4 +136,13 @@ public interface EnsemblePlacementPolicy {
      */
     public List<Integer> reorderReadLACSequence(ArrayList<BookieSocketAddress> ensemble,
                                                 List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory);
+
+    /**
+     * Send the bookie info details.
+     * 
+     * @param bookieInfoMap
+     *          A map that has the bookie to BookieInfo
+     */
+    default public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieInfoMap) {
+    }
 }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
index c306ca0..7272447 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicy.java
@@ -22,8 +22,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.net.DNSToSwitchMapping;
 import org.apache.bookkeeper.net.Node;
@@ -32,7 +30,6 @@ import org.jboss.netty.util.HashedWheelTimer;
 
 public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicyImpl
         implements ITopologyAwareEnsemblePlacementPolicy<TopologyAwareEnsemblePlacementPolicy.BookieNode> {
-
     RackawareEnsemblePlacementPolicyImpl slave = null;
 
     RackawareEnsemblePlacementPolicy() {
@@ -48,13 +45,17 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
                                                           HashedWheelTimer timer,
                                                           boolean reorderReadsRandom,
                                                           int stabilizePeriodSeconds,
+                                                          boolean isWeighted,
+                                                          int maxWeightMultiple,
                                                           StatsLogger statsLogger) {
         if (stabilizePeriodSeconds > 0) {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, 0, isWeighted, maxWeightMultiple, statsLogger);
             slave = new RackawareEnsemblePlacementPolicyImpl(enforceDurability);
-            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, statsLogger);
+            slave.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted,
+                    maxWeightMultiple, statsLogger);
         } else {
-            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, statsLogger);
+            super.initialize(dnsResolver, timer, reorderReadsRandom, stabilizePeriodSeconds, isWeighted,
+                    maxWeightMultiple, statsLogger);
             slave = null;
         }
         return this;
@@ -96,7 +97,7 @@ public class RackawareEnsemblePlacementPolicy extends RackawareEnsemblePlacement
     public BookieSocketAddress replaceBookie(
         int ensembleSize, int writeQuorumSize, int ackQuorumSize, java.util.Map<String, byte[]> customMetadata, Collection<BookieSocketAddress> currentEnsemble, BookieSocketAddress bookieToReplace, Set<BookieSocketAddress> excludeBookies)
             throws BKException.BKNotEnoughBookiesException {
-        try {
+       try {
             return super.replaceBookie(ensembleSize, writeQuorumSize, ackQuorumSize, customMetadata,
                     currentEnsemble, bookieToReplace, excludeBookies);
         } catch (BKException.BKNotEnoughBookiesException bnebe) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
index 79ff0da..8d56f7a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RackawareEnsemblePlacementPolicyImpl.java
@@ -24,12 +24,15 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.apache.bookkeeper.client.WeightedRandomSelection.WeightedObject;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.Configurable;
 import org.apache.bookkeeper.feature.FeatureProvider;
@@ -44,6 +47,7 @@ import org.apache.bookkeeper.net.ScriptBasedMapping;
 import org.apache.bookkeeper.net.StabilizeNetworkTopology;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.collections.CollectionUtils;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,6 +64,10 @@ import com.google.common.collect.Sets;
 class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacementPolicy {
 
     static final Logger LOG = LoggerFactory.getLogger(RackawareEnsemblePlacementPolicyImpl.class);
+    boolean isWeighted;
+    int maxWeightMultiple;
+    private Map<BookieNode, WeightedObject> bookieInfoMap = new HashMap<BookieNode, WeightedObject>();
+    private WeightedRandomSelection<BookieNode> weightedSelection;
 
     public static final String REPP_DNS_RESOLVER_CLASS = "reppDnsResolverClass";
     public static final String REPP_RANDOM_READ_REORDERING = "ensembleRandomReadReordering";
@@ -123,6 +131,8 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
                                                               HashedWheelTimer timer,
                                                               boolean reorderReadsRandom,
                                                               int stabilizePeriodSeconds,
+                                                              boolean isWeighted,
+                                                              int maxWeightMultiple,
                                                               StatsLogger statsLogger) {
         this.statsLogger = statsLogger;
         this.reorderReadsRandom = reorderReadsRandom;
@@ -148,6 +158,15 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
         LOG.info("Initialize rackaware ensemble placement policy @ {} @ {} : {}.",
             new Object[] { localNode, null == localNode ? "Unknown" : localNode.getNetworkLocation(),
                 dnsResolver.getClass().getName() });
+
+        this.isWeighted = isWeighted;
+        if (this.isWeighted) {
+            this.maxWeightMultiple = maxWeightMultiple;
+            this.weightedSelection = new WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+            LOG.info("Weight based placement with max multiple of " + this.maxWeightMultiple);
+        } else {
+            LOG.info("Not weighted");
+        }
         return this;
     }
 
@@ -177,6 +196,8 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
                 timer,
                 conf.getBoolean(REPP_RANDOM_READ_REORDERING, false),
                 conf.getNetworkTopologyStabilizePeriodSeconds(),
+                conf.getDiskWeightBasedPlacementEnabled(),
+                conf.getBookieMaxWeightMultipleForWeightBasedPlacement(),
                 statsLogger);
     }
 
@@ -209,7 +230,9 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
             }
             handleBookiesThatLeft(leftBookies);
             handleBookiesThatJoined(joinedBookies);
-
+            if (this.isWeighted && (leftBookies.size() > 0 || joinedBookies.size() > 0)) {
+                this.weightedSelection.updateMap(this.bookieInfoMap);
+            }
             if (!readOnlyBookies.isEmpty()) {
                 this.readOnlyBookies = ImmutableSet.copyOf(readOnlyBookies);
             }
@@ -226,6 +249,9 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
             BookieNode node = knownBookies.remove(addr);
             if(null != node) {
                 topology.remove(node);
+                if (this.isWeighted) {
+                    this.bookieInfoMap.remove(node);
+                }
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Cluster changed : bookie {} left from cluster.", addr);
                 }
@@ -240,6 +266,9 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
             BookieNode node = createBookieNode(addr);
             topology.add(node);
             knownBookies.put(addr, node);
+            if (this.isWeighted) {
+                this.bookieInfoMap.putIfAbsent(node, new BookieInfo());
+            }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Cluster changed : bookie {} joined the cluster.", addr);
             }
@@ -387,6 +416,32 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
     }
 
     @Override
+    public void updateBookieInfo(Map<BookieSocketAddress, BookieInfo> bookieInfoMap) {
+        if (!isWeighted) {
+            LOG.info("bookieFreeDiskInfo callback called even without weighted placement policy being used.");
+            return;
+        }
+         List<BookieNode> allBookies = new ArrayList<BookieNode>(knownBookies.values());
+
+         // create a new map to reflect the new mapping
+        Map<BookieNode, WeightedObject> map = new HashMap<BookieNode, WeightedObject>();
+        for (BookieNode bookie : allBookies) {
+            if (bookieInfoMap.containsKey(bookie.getAddr())) {
+                map.put(bookie, bookieInfoMap.get(bookie.getAddr()));
+            } else {
+                map.put(bookie, new BookieInfo());
+            }
+        }
+        rwLock.writeLock().lock();
+        try {
+            this.bookieInfoMap = map;
+            this.weightedSelection.updateMap(this.bookieInfoMap);
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    @Override
     public BookieNode selectFromNetworkLocation(
             String networkLoc,
             Set<Node> excludeBookies,
@@ -409,6 +464,31 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
         return "~" + node.getNetworkLocation();
     }
 
+    private WeightedRandomSelection<BookieNode> prepareForWeightedSelection(List<Node> leaves) {
+        // create a map of bookieNode->freeDiskSpace for this rack. The assumption is that
+        // the number of nodes in a rack is of the order of 40, so it shouldn't be too bad
+        // to build it every time during a ledger creation
+        Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>();
+        for (Node n : leaves) {
+            if (!(n instanceof BookieNode)) {
+                continue;
+            }
+            BookieNode bookie = (BookieNode) n;
+            if (this.bookieInfoMap.containsKey(bookie)) {
+                rackMap.put(bookie, this.bookieInfoMap.get(bookie));
+            } else {
+                rackMap.put(bookie, new BookieInfo());
+            }
+        }
+        if (rackMap.size() == 0) {
+            return null;
+        }
+
+        WeightedRandomSelection<BookieNode> wRSelection = new WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+        wRSelection.updateMap(rackMap);
+        return wRSelection;
+    }
+
     /**
      * Choose random node under a given network path.
      *
@@ -424,9 +504,38 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
      */
     protected BookieNode selectRandomFromRack(String netPath, Set<Node> excludeBookies, Predicate<BookieNode> predicate,
             Ensemble<BookieNode> ensemble) throws BKNotEnoughBookiesException {
+        WeightedRandomSelection<BookieNode> wRSelection = null;
         List<Node> leaves = new ArrayList<Node>(topology.getLeaves(netPath));
-        Collections.shuffle(leaves);
-        for (Node n : leaves) {
+        if (!this.isWeighted) {
+            Collections.shuffle(leaves);
+        } else {
+            if (CollectionUtils.subtract(leaves, excludeBookies).size() < 1) {
+                throw new BKNotEnoughBookiesException();
+            }
+            wRSelection = prepareForWeightedSelection(leaves);
+            if (wRSelection == null) {
+                throw new BKNotEnoughBookiesException();
+            }
+        }
+
+        Iterator<Node> it = leaves.iterator();
+        Set<Node> bookiesSeenSoFar = new HashSet<Node>();
+        while (true) {
+            Node n;
+            if (isWeighted) {
+                if (bookiesSeenSoFar.size() == leaves.size()) {
+                    // Don't loop infinitely.
+                    break;
+                }
+                n = wRSelection.getNextRandom();
+                bookiesSeenSoFar.add(n);
+            } else {
+                if (it.hasNext()) {
+                    n = it.next();
+                } else {
+                    break;
+                }
+            }
             if (excludeBookies.contains(n)) {
                 continue;
             }
@@ -461,7 +570,7 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
                                             Predicate<BookieNode> predicate,
                                             Ensemble<BookieNode> ensemble)
             throws BKNotEnoughBookiesException {
-        return selectRandomInternal(new ArrayList<BookieNode>(knownBookies.values()),  numBookies, excludeBookies, predicate, ensemble);
+        return selectRandomInternal(null,  numBookies, excludeBookies, predicate, ensemble);
     }
 
     protected List<BookieNode> selectRandomInternal(List<BookieNode> bookiesToSelectFrom,
@@ -470,9 +579,56 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
                                                     Predicate<BookieNode> predicate,
                                                     Ensemble<BookieNode> ensemble)
         throws BKNotEnoughBookiesException {
-        Collections.shuffle(bookiesToSelectFrom);
+        WeightedRandomSelection<BookieNode> wRSelection = null;
+        if (bookiesToSelectFrom == null) {
+            // If the list is null, we need to select from the entire knownBookies set
+            wRSelection = this.weightedSelection;
+            bookiesToSelectFrom = new ArrayList<BookieNode>(knownBookies.values());
+        }
+        if (isWeighted) {
+            if (CollectionUtils.subtract(bookiesToSelectFrom, excludeBookies).size() < numBookies) {
+                throw new BKNotEnoughBookiesException();
+            }
+            if (wRSelection == null) {
+                Map<BookieNode, WeightedObject> rackMap = new HashMap<BookieNode, WeightedObject>();
+                for (BookieNode n : bookiesToSelectFrom) {
+                    if (excludeBookies.contains(n)) {
+                        continue;
+                    }
+                    if (this.bookieInfoMap.containsKey(n)) {
+                        rackMap.put(n, this.bookieInfoMap.get(n));
+                    } else {
+                        rackMap.put(n, new BookieInfo());
+                    }
+                }
+                wRSelection = new WeightedRandomSelection<BookieNode>(this.maxWeightMultiple);
+                wRSelection.updateMap(rackMap);
+            }
+        } else {
+            Collections.shuffle(bookiesToSelectFrom);
+        }
+
+        BookieNode bookie;
         List<BookieNode> newBookies = new ArrayList<BookieNode>(numBookies);
-        for (BookieNode bookie : bookiesToSelectFrom) {
+        Iterator<BookieNode> it = bookiesToSelectFrom.iterator();
+        Set<BookieNode> bookiesSeenSoFar = new HashSet<BookieNode>();
+        while (numBookies > 0) {
+            if (isWeighted) {
+                if (bookiesSeenSoFar.size() == bookiesToSelectFrom.size()) {
+                    // If we have gone through the whole available list of bookies,
+                    // and yet haven't been able to satisfy the ensemble request, bail out.
+                    // We don't want to loop infinitely.
+                    break;
+                }
+                bookie = wRSelection.getNextRandom();
+                bookiesSeenSoFar.add(bookie);
+            } else {
+                if (it.hasNext()) {
+                    bookie = it.next();
+                } else {
+                    break;
+                }
+            }
             if (excludeBookies.contains(bookie)) {
                 continue;
             }
@@ -489,10 +645,9 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
                 newBookies.add(bookie);
                 --numBookies;
             }
-
-            if (numBookies == 0) {
-                return newBookies;
-            }
+        }
+        if (numBookies == 0) {
+            return newBookies;
         }
         if (LOG.isDebugEnabled()) {
             LOG.debug("Failed to find {} bookies : excludeBookies {}, allBookies {}.", new Object[] {
@@ -501,8 +656,6 @@ class RackawareEnsemblePlacementPolicyImpl extends TopologyAwareEnsemblePlacemen
         throw new BKNotEnoughBookiesException();
     }
 
-
-
     @Override
     public List<Integer> reorderReadSequence(ArrayList<BookieSocketAddress> ensemble, List<Integer> writeSet, Map<BookieSocketAddress, Long> bookieFailureHistory) {
         int ensembleSize = ensemble.size();

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index 265499c..ed9985f 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -125,7 +125,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
             String region = getLocalRegion(node);
             if (null == perRegionPlacement.get(region)) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy()
-                        .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger));
+                        .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, 
+                                this.isWeighted, this.maxWeightMultiple, statsLogger));
             }
 
             Set<BookieSocketAddress> regionSet = perRegionClusterChange.get(region);
@@ -160,7 +161,6 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
         super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
         myRegion = getLocalRegion(localNode);
         enableValidation = conf.getBoolean(REPP_ENABLE_VALIDATION, true);
-
         // We have to statically provide regions we want the writes to go through and how many regions
         // are required for durability. This decision cannot be driven by the active bookies as the
         // current topology will not be indicative of constraints that must be enforced for durability
@@ -171,7 +171,8 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
             String[] regions = regionsString.split(";");
             for (String region: regions) {
                 perRegionPlacement.put(region, new RackawareEnsemblePlacementPolicy(true)
-                        .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds, statsLogger));
+                        .initialize(dnsResolver, timer, this.reorderReadsRandom, this.stabilizePeriodSeconds,
+                                this.isWeighted, this.maxWeightMultiple, statsLogger));
             }
             minRegionsForDurability = conf.getInt(REPP_MINIMUM_REGIONS_FOR_DURABILITY, MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT);
             if (minRegionsForDurability > 0) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
new file mode 100644
index 0000000..02fac1a
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/WeightedRandomSelection.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class WeightedRandomSelection<T> {
+    static final Logger LOG = LoggerFactory.getLogger(WeightedRandomSelection.class);
+
+    interface WeightedObject {
+        long getWeight();
+    }
+    Double randomMax;
+    int maxProbabilityMultiplier;
+    Map<T, WeightedObject> map;
+    TreeMap<Double, T> cummulativeMap = new TreeMap<Double, T>();
+    ReadWriteLock rwLock = new ReentrantReadWriteLock(true);
+
+    WeightedRandomSelection() {
+        maxProbabilityMultiplier = -1;
+    }
+
+    WeightedRandomSelection(int maxMultiplier) {
+        this.maxProbabilityMultiplier = maxMultiplier;
+    }
+
+    public void setMaxProbabilityMultiplier(int max) {
+        this.maxProbabilityMultiplier = max;
+    }
+
+    void updateMap(Map<T, WeightedObject> map) {
+        // get the sum total of all the values; this will be used to
+        // calculate the weighted probability later on
+        Long totalWeight = 0L, min= Long.MAX_VALUE;
+        List<WeightedObject> values = new ArrayList<WeightedObject>(map.values());
+        Collections.sort(values, new Comparator<WeightedObject>() {
+            public int compare(WeightedObject o1, WeightedObject o2) {
+                long diff = o1.getWeight() - o2.getWeight();
+                if (diff < 0L) {
+                    return -1;
+                } else if (diff > 0L) {
+                    return 1;
+                } else {
+                    return 0;
+                }
+            }
+        });
+        for (int i=0; i < values.size(); i++) {
+            totalWeight += values.get(i).getWeight();
+            if (values.get(i).getWeight() != 0 && min > values.get(i).getWeight()) {
+                min = values.get(i).getWeight();
+            }
+        }
+
+        double median = 0;
+        if (totalWeight == 0) {
+            // all the values are zeros; assign a value of 1 to all and the totalWeight equal
+            // to the size of the values
+            min = 1L;
+            median = 1;
+            totalWeight = (long)values.size();
+        } else {
+            int mid = values.size()/2;
+            if ((values.size() % 2) == 1) {
+                median = values.get(mid).getWeight();
+            } else {
+                median = (double)(values.get(mid-1).getWeight() + values.get(mid).getWeight())/2;
+            }
+        }
+
+        double medianWeight, minWeight;
+        medianWeight = median/(double)totalWeight;
+        minWeight = (double)min/totalWeight;
+
+        LOG.debug("Updating weights map. MediaWeight: " + medianWeight + " MinWeight: " + minWeight);
+
+        double maxWeight = maxProbabilityMultiplier*medianWeight;
+        Map<T, Double> weightMap = new HashMap<T, Double>();
+        for (Map.Entry<T, WeightedObject> e : map.entrySet()) {
+            double weightedProbability;
+            if (e.getValue().getWeight() > 0) {
+                weightedProbability = (double)e.getValue().getWeight()/(double)totalWeight;
+            } else {
+                weightedProbability = minWeight;
+            }
+            if (maxWeight > 0 && weightedProbability > maxWeight) {
+                weightedProbability=maxWeight;
+                LOG.debug("Capping the probability to " + weightedProbability + " for " + e.getKey() + " Value: " + e.getValue());
+            }
+            weightMap.put(e.getKey(), weightedProbability);
+        }
+
+        // The probability of picking a bookie randomly is defaultPickProbability
+        // but we change that priority by looking at the weight that each bookie
+        // carries.
+        TreeMap<Double, T> tmpCummulativeMap = new TreeMap<Double, T>();
+        Double key=0.0;
+        for (Map.Entry<T, Double> e : weightMap.entrySet()) {
+            tmpCummulativeMap.put(key, e.getKey());
+            LOG.debug("Key: " + e.getKey() + " Value: " + e.getValue()
+                    + " AssignedKey: " + key + " AssignedWeight: " + e.getValue());
+            key += e.getValue();
+        }
+
+        rwLock.writeLock().lock();
+        try {
+            this.map = map;
+            cummulativeMap = tmpCummulativeMap;
+            randomMax = key;
+        } finally {
+            rwLock.writeLock().unlock();
+        }
+    }
+
+    T getNextRandom() {
+        rwLock.readLock().lock();
+        try {
+            // pick a random number between 0 and randMax
+            Double randomNum = randomMax*Math.random();
+            // find the nearest key in the map corresponding to the randomNum
+            Double key = cummulativeMap.floorKey(randomNum);
+            //LOG.info("Random max: " + randomMax + " CummulativeMap size: " + cummulativeMap.size() + " selected key: " + key);
+            return cummulativeMap.get(key);
+        } finally {
+            rwLock.readLock().unlock();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
index b8554d4..ee137c0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java
@@ -78,7 +78,13 @@ public class ClientConfiguration extends AbstractConfiguration {
     protected final static String BOOKIE_ERROR_THRESHOLD_PER_INTERVAL = "bookieErrorThresholdPerInterval";
     protected final static String BOOKIE_QUARANTINE_TIME_SECONDS = "bookieQuarantineTimeSeconds";
 
-    // Number Worker Threads
+    // Bookie info poll interval
+    protected final static String DISK_WEIGHT_BASED_PLACEMENT_ENABLED = "diskWeightBasedPlacementEnabled";
+    protected final static String GET_BOOKIE_INFO_INTERVAL_SECONDS = "getBookieInfoIntervalSeconds";
+    protected final static String BOOKIE_MAX_MULTIPLE_FOR_WEIGHTED_PLACEMENT = "bookieMaxMultipleForWeightBasedPlacement";
+    protected final static String GET_BOOKIE_INFO_TIMEOUT_SECS = "getBookieInfoTimeoutSecs";
+
+    // Number Woker Threads
     protected final static String NUM_WORKER_THREADS = "numWorkerThreads";
 
     // Ensemble Placement Policy
@@ -939,6 +945,82 @@ public class ClientConfiguration extends AbstractConfiguration {
         super.setNettyMaxFrameSizeBytes(maxSize);
         return this;
     }
+ 
+    /**
+     * Get the time interval between successive calls for bookie get info. Default is 24 hours.
+     *
+     * @return
+     */
+    public int getGetBookieInfoIntervalSeconds() {
+        return getInt(GET_BOOKIE_INFO_INTERVAL_SECONDS, 24*60*60);
+    }
+
+    /**
+     * Return whether disk weight based placement policy is enabled
+     * @return
+     */
+    public boolean getDiskWeightBasedPlacementEnabled() {
+        return getBoolean(DISK_WEIGHT_BASED_PLACEMENT_ENABLED, false);
+    }
+
+    /**
+     * Returns the max multiple to use for nodes with very high weight
+     * @return max multiple
+     */
+    public int getBookieMaxWeightMultipleForWeightBasedPlacement() {
+        return getInt(BOOKIE_MAX_MULTIPLE_FOR_WEIGHTED_PLACEMENT, 3);
+    }
+
+    /**
+     * Return the timeout value for getBookieInfo request
+     * @return
+     */
+    public int getBookieInfoTimeout() {
+        return getInteger(GET_BOOKIE_INFO_TIMEOUT_SECS, 5);
+    }
+
+    /**
+     * Set whether or not disk weight based placement is enabled.
+     *
+     * @param isEnabled - boolean indicating enabled or not
+     * @return client configuration
+     */
+    public ClientConfiguration setDiskWeightBasedPlacementEnabled(boolean isEnabled) {
+        setProperty(DISK_WEIGHT_BASED_PLACEMENT_ENABLED, isEnabled);
+        return this;
+    }
+
+    /**
+     * Set the time interval between successive polls for bookie get info.
+     *
+     * @param pollInterval
+     * @param unit
+     * @return client configuration
+     */
+    public ClientConfiguration setGetBookieInfoIntervalSeconds(int pollInterval, TimeUnit unit) {
+        setProperty(GET_BOOKIE_INFO_INTERVAL_SECONDS, unit.toSeconds(pollInterval));
+        return this;
+    }
+
+    /**
+     * Set the max multiple to use for nodes with very high weight
+     * @param multiple
+     * @return client configuration
+     */
+    public ClientConfiguration setBookieMaxWeightMultipleForWeightBasedPlacement(int multiple) {
+        setProperty(BOOKIE_MAX_MULTIPLE_FOR_WEIGHTED_PLACEMENT, multiple);
+        return this;
+    }
+
+    /**
+     * Set the timeout value in secs for the GET_BOOKIE_INFO request
+     * @param timeout
+     * @return client configuration
+     */
+    public ClientConfiguration setGetBookieInfoTimeout(int timeoutSecs) {
+        setProperty(GET_BOOKIE_INFO_TIMEOUT_SECS, timeoutSecs);
+        return this;
+    }
 
     /**
      * Set the client role

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 4a742da..ce85aef 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -35,14 +35,16 @@ import org.apache.bookkeeper.auth.AuthProviderFactoryFactory;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GetBookieInfoCallback;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadLacCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteLacCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback;
-import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.OrderedSafeExecutor;
 import org.apache.bookkeeper.util.SafeRunnable;
@@ -350,6 +352,39 @@ public class BookieClient implements PerChannelBookieClientFactory {
         }
     }
 
+    public void getBookieInfo(final BookieSocketAddress addr, final long requested, final GetBookieInfoCallback cb, final Object ctx) {
+        closeLock.readLock().lock();
+        try {
+            final PerChannelBookieClientPool client = lookupClient(addr, BookkeeperProtocol.OperationType.GET_BOOKIE_INFO);
+            if (client == null) {
+                cb.getBookieInfoComplete(getRc(BKException.Code.BookieHandleNotAvailableException), new BookieInfo(), ctx);
+                return;
+            }
+            client.obtain(new GenericCallback<PerChannelBookieClient>() {
+                @Override
+                public void operationComplete(final int rc, PerChannelBookieClient pcbc) {
+                    if (rc != BKException.Code.OK) {
+                        try {
+                            executor.submit(new SafeRunnable() {
+                                @Override
+                                public void safeRun() {
+                                    cb.getBookieInfoComplete(rc, new BookieInfo(), ctx);
+                                }
+                            });
+                        } catch (RejectedExecutionException re) {
+                            cb.getBookieInfoComplete(getRc(BKException.Code.InterruptedException),
+                                    new BookieInfo(), ctx);
+                        }
+                        return;
+                    }
+                    pcbc.getBookieInfo(requested, cb, ctx);
+                }
+            });
+        } finally {
+            closeLock.readLock().unlock();
+        }
+    }
+
     public boolean isClosed() {
         return closed;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 210bc72..38f40f8 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -43,7 +43,7 @@ import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_ENTRY_REQUEST;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.WRITE_LAC;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.READ_LAC;
-
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.GET_BOOKIE_INFO;
 
 public class BookieRequestProcessor implements RequestProcessor {
 
@@ -78,6 +78,7 @@ public class BookieRequestProcessor implements RequestProcessor {
     final OpStatsLogger readEntryStats;
     final OpStatsLogger writeLacStats;
     final OpStatsLogger readLacStats;
+    final OpStatsLogger getBookieInfoStats;
 
     public BookieRequestProcessor(ServerConfiguration serverCfg, Bookie bookie,
                                   StatsLogger statsLogger) {
@@ -93,6 +94,7 @@ public class BookieRequestProcessor implements RequestProcessor {
         this.readRequestStats = statsLogger.getOpStatsLogger(READ_ENTRY_REQUEST);
         this.writeLacStats = statsLogger.getOpStatsLogger(WRITE_LAC);
         this.readLacStats = statsLogger.getOpStatsLogger(READ_LAC);
+        this.getBookieInfoStats = statsLogger.getOpStatsLogger(GET_BOOKIE_INFO);
     }
 
     @Override
@@ -148,6 +150,9 @@ public class BookieRequestProcessor implements RequestProcessor {
                 case READ_LAC:
                     processReadLacRequestV3(r,c);
                     break;
+                case GET_BOOKIE_INFO:
+                    processGetBookieInfoRequestV3(r,c);
+                    break;
                 default:
                     LOG.info("Unknown operation type {}", header.getOperation());
                     BookkeeperProtocol.Response.Builder response =
@@ -216,6 +221,15 @@ public class BookieRequestProcessor implements RequestProcessor {
         }
     }
 
+    private void processGetBookieInfoRequestV3(final BookkeeperProtocol.Request r, final Channel c) {
+        GetBookieInfoProcessorV3 getBookieInfo = new GetBookieInfoProcessorV3(r, c, this);
+        if (null == readThreadPool) {
+            getBookieInfo.run();
+        } else {
+            readThreadPool.submit(getBookieInfo);
+        }
+    }
+
     private void processAddRequest(final BookieProtocol.Request r, final Channel c) {
         WriteEntryProcessor write = new WriteEntryProcessor(r, c, this);
         if (null == writeThreadPool) {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/0583175d/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
index 261c93d..b2d6d82 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookkeeperInternalCallbacks.java
@@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.client.BookieInfoReader.BookieInfo;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.zookeeper.AsyncCallback;
 import org.jboss.netty.buffer.ChannelBuffer;
@@ -88,6 +89,10 @@ public class BookkeeperInternalCallbacks {
         void readEntryComplete(int rc, long ledgerId, long entryId, ChannelBuffer buffer, Object ctx);
     }
 
+    public interface GetBookieInfoCallback {
+        void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx);
+    }
+
     /**
      * This is a multi callback object that waits for all of
      * the multiple async operations to complete. If any fail, then we invoke


Mime
View raw message