bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reddycha...@apache.org
Subject [bookkeeper] branch master updated: BOOKKEEPER-1033: Handle DirsPartitionDuplication
Date Fri, 07 Jul 2017 06:20:08 GMT
This is an automated email from the ASF dual-hosted git repository.

reddycharan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 9aff6d5  BOOKKEEPER-1033: Handle DirsPartitionDuplication
9aff6d5 is described below

commit 9aff6d52e01b50f25f5dbe1086ff10012ca25c48
Author: cguttapalem <cguttapalem@salesforce.com>
AuthorDate: Thu Jul 6 23:19:57 2017 -0700

    BOOKKEEPER-1033: Handle DirsPartitionDuplication
    
    - Provide configuration for allowDirsPartitionDuplication
    - while calculating total disk metrics account Partition Duplication
    
    Author: cguttapalem <cguttapalem@salesforce.com>
    
    Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <guosijie@gmail.com>
    
    This closes #189 from reddycharan/dirspartitionduplication and squashes the following commits:
    
    99fc6fc1 [cguttapalem] BOOKKEEPER-1033: Handle DirsPartitionDuplication
    1d453975 [cguttapalem] BOOKKEEPER-1033: Handle DirsPartitionDuplication
---
 bookkeeper-server/conf/bk_server.conf              |   3 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  80 ++++++++++++---
 .../apache/bookkeeper/bookie/BookieException.java  |  20 ++++
 .../org/apache/bookkeeper/bookie/BookieShell.java  |   7 +-
 .../bookkeeper/bookie/LedgerDirsManager.java       |  32 +++---
 .../bookkeeper/bookie/LedgerDirsMonitor.java       |  56 ++++++-----
 .../bookkeeper/bookie/ReadOnlyEntryLogger.java     |   4 +-
 .../bookkeeper/conf/ServerConfiguration.java       |  36 ++++++-
 .../bookkeeper/proto/GetBookieInfoProcessorV3.java |  24 +++--
 .../org/apache/bookkeeper/util/DiskChecker.java    |  54 ++++++++--
 .../bookie/BookieInitializationTest.java           | 112 ++++++++++++++++++++-
 .../apache/bookkeeper/bookie/CompactionTest.java   |  13 ++-
 .../apache/bookkeeper/bookie/CreateNewLogTest.java |   7 +-
 .../org/apache/bookkeeper/bookie/EntryLogTest.java |   4 +-
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java |   3 +-
 .../bookkeeper/bookie/TestLedgerDirsManager.java   |  12 ++-
 16 files changed, 375 insertions(+), 92 deletions(-)

diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 65883da..6068ff3 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -321,3 +321,6 @@ zkTimeout=10000
 # Minimum safe Usable size to be available in index directory for Bookie to create Index File while replaying 
 # journal at the time of Bookie Start in Readonly Mode (in bytes)
 # minUsableSizeForIndexFileCreation=1073741824
+
+# Configure the Bookie to allow/disallow multiple ledger/index/journal directories in the same filesystem diskpartition
+# allowMultipleDirsUnderSameDiskPartition=false
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index d607b53..769bda1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -41,10 +41,14 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Observable;
 import java.util.Observer;
 import java.util.Set;
@@ -57,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -82,6 +87,7 @@ import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
@@ -372,6 +378,12 @@ public class Bookie extends BookieCriticalThread {
                 }
                 masterCookie.writeToZooKeeper(zk, conf, zkCookie != null ? zkCookie.getVersion() : Version.NEW);
             }
+            
+            List<File> ledgerDirs = ledgerDirsManager.getAllLedgerDirs();
+            checkIfDirsOnSameDiskPartition(ledgerDirs);
+            List<File> indexDirs = indexDirsManager.getAllLedgerDirs();
+            checkIfDirsOnSameDiskPartition(indexDirs);
+            checkIfDirsOnSameDiskPartition(journalDirectories);
         } catch (KeeperException ke) {
             LOG.error("Couldn't access cookie in zookeeper", ke);
             throw new BookieException.InvalidCookieException(ke);
@@ -387,6 +399,52 @@ public class Bookie extends BookieCriticalThread {
         }
     }
 
+    /**
+     * Checks if multiple directories are in same diskpartition/filesystem/device.
+     * If ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION config parameter is not enabled, and
+     * if it is found that there are multiple directories in the same DiskPartition then
+     * it will throw DiskPartitionDuplicationException.
+     * 
+     * @param dirs dirs to validate
+     * 
+     * @throws IOException
+     */
+    private void checkIfDirsOnSameDiskPartition(List<File> dirs) throws DiskPartitionDuplicationException {
+        boolean allowDiskPartitionDuplication = conf.isAllowMultipleDirsUnderSameDiskPartition();
+        final MutableBoolean isDuplicationFoundAndNotAllowed = new MutableBoolean(false);
+        Map<FileStore, List<File>> fileStoreDirsMap = new HashMap<FileStore, List<File>>();
+        for (File dir : dirs) {
+            FileStore fileStore;
+            try {
+                fileStore = Files.getFileStore(dir.toPath());
+            } catch (IOException e) {
+                LOG.error("Got IOException while trying to FileStore of {}", dir);
+                throw new BookieException.DiskPartitionDuplicationException(e);
+            }
+            if (fileStoreDirsMap.containsKey(fileStore)) {
+                fileStoreDirsMap.get(fileStore).add(dir);
+            } else {
+                List<File> dirsList = new ArrayList<File>();
+                dirsList.add(dir);
+                fileStoreDirsMap.put(fileStore, dirsList);
+            }
+        }
+
+        fileStoreDirsMap.forEach((fileStore, dirsList) -> {
+            if (dirsList.size() > 1) {
+                if (allowDiskPartitionDuplication) {
+                    LOG.warn("Dirs: {} are in same DiskPartition/FileSystem: {}", dirsList, fileStore);
+                } else {
+                    LOG.error("Dirs: {} are in same DiskPartition/FileSystem: {}", dirsList, fileStore);
+                    isDuplicationFoundAndNotAllowed.setValue(true);
+                }
+            }
+        });
+        if (isDuplicationFoundAndNotAllowed.getValue()) {
+            throw new BookieException.DiskPartitionDuplicationException();
+        }
+    }
+    
     public static void checkEnvironmentWithStorageExpansion(ServerConfiguration conf,
             ZooKeeper zk, List<File> journalDirectories, List<File> allLedgerDirs) throws BookieException, IOException {
         try {
@@ -556,12 +614,12 @@ public class Bookie extends BookieCriticalThread {
         return indexDirsManager;
     }
 
-    public long getTotalDiskSpace() {
-        return getLedgerDirsManager().getTotalDiskSpace();
+    public long getTotalDiskSpace() throws IOException {
+        return getLedgerDirsManager().getTotalDiskSpace(ledgerDirsManager.getAllLedgerDirs());
     }
 
-    public long getTotalFreeSpace() {
-        return getLedgerDirsManager().getTotalFreeSpace();
+    public long getTotalFreeSpace() throws IOException {
+        return getLedgerDirsManager().getTotalFreeSpace(ledgerDirsManager.getAllLedgerDirs());
     }
 
     public static File getCurrentDirectory(File dir) {
@@ -593,15 +651,15 @@ public class Bookie extends BookieCriticalThread {
         for (File journalDirectory : conf.getJournalDirs()) {
             this.journalDirectories.add(getCurrentDirectory(journalDirectory));
         }
-
-        this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+        DiskChecker diskChecker = new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold());
+        this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), diskChecker,
                 statsLogger.scope(LD_LEDGER_SCOPE));
 
         File[] idxDirs = conf.getIndexDirs();
         if (null == idxDirs) {
             this.indexDirsManager = this.ledgerDirsManager;
         } else {
-            this.indexDirsManager = new LedgerDirsManager(conf, idxDirs,
+            this.indexDirsManager = new LedgerDirsManager(conf, idxDirs, diskChecker,
                     statsLogger.scope(LD_INDEX_SCOPE));
         }
 
@@ -615,9 +673,7 @@ public class Bookie extends BookieCriticalThread {
         // Initialise ledgerDirMonitor. This would look through all the
         // configured directories. When disk errors or all the ledger
         // directories are full, would throws exception and fail bookie startup.
-        this.ledgerMonitor = new LedgerDirsMonitor(conf, 
-                                    new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), 
-                                    ledgerDirsManager);
+        this.ledgerMonitor = new LedgerDirsMonitor(conf, diskChecker, ledgerDirsManager);
         try {
             this.ledgerMonitor.init();
         } catch (NoWritableLedgerDirException nle) {
@@ -632,9 +688,7 @@ public class Bookie extends BookieCriticalThread {
         if (null == idxDirs) {
             this.idxMonitor = this.ledgerMonitor;
         } else {
-            this.idxMonitor = new LedgerDirsMonitor(conf, 
-                                        new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), 
-                                        indexDirsManager);
+            this.idxMonitor = new LedgerDirsMonitor(conf, diskChecker, indexDirsManager);
             try {
                 this.idxMonitor.init();
             } catch (NoWritableLedgerDirException nle) {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
index b3ee280..a7793a3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieException.java
@@ -50,6 +50,8 @@ public abstract class BookieException extends Exception {
             return new InvalidCookieException();
         case Code.UpgradeException:
             return new UpgradeException();
+        case Code.DiskPartitionDuplicationException:
+            return new DiskPartitionDuplicationException();
         default:
             return new BookieIllegalOpException();
         }
@@ -64,6 +66,7 @@ public abstract class BookieException extends Exception {
 
         int InvalidCookieException = -102;
         int UpgradeException = -103;
+        int DiskPartitionDuplicationException = -104;
     }
 
     public void setCode(int code) {
@@ -92,6 +95,9 @@ public abstract class BookieException extends Exception {
         case Code.UpgradeException:
             err = "Error performing an upgrade operation ";
             break;
+        case Code.DiskPartitionDuplicationException:
+            err = "Disk Partition Duplication is not allowed";
+            break;
         default:
             err = "Invalid operation";
             break;
@@ -154,4 +160,18 @@ public abstract class BookieException extends Exception {
             super(Code.UpgradeException, reason);
         }
     }
+    
+    public static class DiskPartitionDuplicationException extends BookieException {
+        public DiskPartitionDuplicationException() {
+            super(Code.DiskPartitionDuplicationException);
+        }
+
+        public DiskPartitionDuplicationException(Throwable cause) {
+            super(Code.DiskPartitionDuplicationException, cause);
+        }
+
+        public DiskPartitionDuplicationException(String reason) {
+            super(Code.DiskPartitionDuplicationException, reason);
+        }
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
index 42d4de4..3469593 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieShell.java
@@ -68,6 +68,10 @@ import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
 import org.apache.bookkeeper.net.BookieSocketAddress;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.replication.AuditorElector;
+import org.apache.bookkeeper.replication.BookieLedgerIndexer;
+import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
+import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.EntryFormatter;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
@@ -2219,7 +2223,8 @@ public class BookieShell implements Tool {
         if (null == journals) {
             journals = Lists.newArrayListWithCapacity(bkConf.getJournalDirs().length);
             for (File journalDir : bkConf.getJournalDirs()) {
-                journals.add(new Journal(journalDir, bkConf, new LedgerDirsManager(bkConf, bkConf.getLedgerDirs())));
+                journals.add(new Journal(journalDir, bkConf, new LedgerDirsManager(bkConf, bkConf.getLedgerDirs(),
+                    new DiskChecker(bkConf.getDiskUsageThreshold(), bkConf.getDiskUsageWarnThreshold()))));
             }
         }
         return journals;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 66782ac..23726ca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -35,6 +35,7 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,12 +59,14 @@ public class LedgerDirsManager {
     private boolean forceGCAllowWhenNoSpace;
     private long minUsableSizeForIndexFileCreation;
 
-    public LedgerDirsManager(ServerConfiguration conf, File[] dirs) {
-        this(conf, dirs, NullStatsLogger.INSTANCE);
+    private final DiskChecker diskChecker;
+
+    public LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker diskChecker) {
+        this(conf, dirs, diskChecker, NullStatsLogger.INSTANCE);
     }
 
     @VisibleForTesting
-    LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) {
+    LedgerDirsManager(ServerConfiguration conf, File[] dirs, DiskChecker diskChecker, StatsLogger statsLogger) {
         this.ledgerDirectories = Arrays.asList(Bookie
                 .getCurrentDirectories(dirs));
         this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
@@ -88,7 +91,10 @@ public class LedgerDirsManager {
                 }
             });
         }
+
+        this.diskChecker = diskChecker;
         statsLogger.registerGauge(LD_WRITABLE_DIRS, new Gauge<Number>() {
+
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -121,13 +127,10 @@ public class LedgerDirsManager {
      * in all of the ledger directories put together.
      *
      * @return totalDiskSpace in bytes
+     * @throws IOException 
      */
-    public long getTotalFreeSpace() {
-        long totalFreeSpace = 0;
-        for (File dir: this.ledgerDirectories) {
-            totalFreeSpace += dir.getFreeSpace();
-        }
-        return totalFreeSpace;
+    public long getTotalFreeSpace(List<File> dirs) throws IOException {
+        return diskChecker.getTotalFreeSpace(dirs);
     }
 
     /**
@@ -135,15 +138,12 @@ public class LedgerDirsManager {
      * in all of the ledger directories put together.
      *
      * @return freeDiskSpace in bytes
+     * @throws IOException 
      */
-    public long getTotalDiskSpace() {
-        long totalDiskSpace = 0;
-        for (File dir: this.ledgerDirectories) {
-            totalDiskSpace += dir.getTotalSpace();
-        }
-        return totalDiskSpace;
+    public long getTotalDiskSpace(List<File> dirs) throws IOException {
+        return diskChecker.getTotalDiskSpace(dirs);
     }
-
+    
     /**
      * Get disk usages map
      * @return ConcurrentMap<File, Float> diskUsages
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
index 0220adf..81f0f64 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
@@ -106,32 +107,41 @@ class LedgerDirsMonitor extends BookieThread {
             // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold.
             // - If So, walk through the entire list of fullfilledDirs and add them back to writableLedgerDirs list if
             // their usage is < conf.getDiskUsageThreshold.
-            if (hasWritableLedgerDirs || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf
-                    .getDiskLowWaterMarkUsageThreshold()) {
-                // Check all full-filled disk space usage
-                for (File dir : fullfilledDirs) {
-                    try {
-                        diskUsages.put(dir, diskChecker.checkDir(dir));
-                        ldm.addToWritableDirs(dir, true);
-                    } catch (DiskErrorException e) {
-                        // Notify disk failure to all the listeners
-                        for (LedgerDirsListener listener : ldm.getListeners()) {
-                            listener.diskFailed(dir);
+            try {
+                if (hasWritableLedgerDirs
+                        || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf
+                                .getDiskLowWaterMarkUsageThreshold()) {
+                    // Check all full-filled disk space usage
+                    for (File dir : fullfilledDirs) {
+                        try {
+                            diskUsages.put(dir, diskChecker.checkDir(dir));
+                            ldm.addToWritableDirs(dir, true);
+                        } catch (DiskErrorException e) {
+                            // Notify disk failure to all the listeners
+                            for (LedgerDirsListener listener : ldm.getListeners()) {
+                                listener.diskFailed(dir);
+                            }
+                        } catch (DiskWarnThresholdException e) {
+                            diskUsages.put(dir, e.getUsage());
+                            // the full-filled dir become writable but still
+                            // above
+                            // warn threshold
+                            ldm.addToWritableDirs(dir, false);
+                        } catch (DiskOutOfSpaceException e) {
+                            // the full-filled dir is still full-filled
+                            diskUsages.put(dir, e.getUsage());
                         }
-                    } catch (DiskWarnThresholdException e) {
-                        diskUsages.put(dir, e.getUsage());
-                        // the full-filled dir become writable but still above
-                        // warn threshold
-                        ldm.addToWritableDirs(dir, false);
-                    } catch (DiskOutOfSpaceException e) {
-                        // the full-filled dir is still full-filled
-                        diskUsages.put(dir, e.getUsage());
                     }
+                } else {
+                    LOG.debug(
+                            "Current TotalDiskUsage: {} is greater than LWMThreshold: {}. So not adding any filledDir to WritableDirsList",
+                            totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
+                }
+            } catch (IOException ioe) {
+                LOG.error("Got IOException while monitoring Dirs", ioe);
+                for (LedgerDirsListener listener : ldm.getListeners()) {
+                    listener.fatalError();
                 }
-            } else {
-                LOG.debug(
-                        "Current TotalDiskUsage: {} is greater than LWMThreshold: {}. So not adding any filledDir to WritableDirsList",
-                        totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
             }
             try {
                 Thread.sleep(interval);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
index c0af6c3..14f6562 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/ReadOnlyEntryLogger.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
 
 /**
  * Read Only Entry Logger
@@ -32,7 +33,8 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 public class ReadOnlyEntryLogger extends EntryLogger {
 
     public ReadOnlyEntryLogger(ServerConfiguration conf) throws IOException {
-        super(conf, new LedgerDirsManager(conf, conf.getLedgerDirs()));
+        super(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
     }
 
     @Override
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index e5c96e8..e08fab0 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -21,17 +21,18 @@ import java.io.File;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-import com.google.common.annotations.Beta;
-
+import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
+import org.apache.bookkeeper.bookie.LedgerStorage;
+import org.apache.bookkeeper.bookie.SortedLedgerStorage;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.util.BookKeeperConstants;
 import org.apache.bookkeeper.util.ReflectionUtils;
 import org.apache.commons.configuration.ConfigurationException;
-import org.apache.bookkeeper.bookie.InterleavedLedgerStorage;
-import org.apache.bookkeeper.bookie.SortedLedgerStorage;
 import org.apache.commons.lang.StringUtils;
 
+import com.google.common.annotations.Beta;
+
 /**
  * Configuration manages server-side settings
  */
@@ -150,6 +151,8 @@ public class ServerConfiguration extends AbstractConfiguration {
     
     protected final static String MIN_USABLESIZE_FOR_INDEXFILE_CREATION = "minUsableSizeForIndexFileCreation";
 
+    protected final static String ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION = "allowMultipleDirsUnderSameDiskPartition";
+
     /**
      * Construct a default configuration object
      */
@@ -2006,7 +2009,7 @@ public class ServerConfiguration extends AbstractConfiguration {
         super.setNettyMaxFrameSizeBytes(maxSize);
         return this;
     }
-    
+
     /**
      * Gets the minimum safe Usable size to be available in index directory for Bookie to create Index File while replaying 
      * journal at the time of Bookie Start in Readonly Mode (in bytes)
@@ -2028,4 +2031,27 @@ public class ServerConfiguration extends AbstractConfiguration {
         this.setProperty(MIN_USABLESIZE_FOR_INDEXFILE_CREATION, Long.toString(minUsableSizeForIndexFileCreation));
         return this;
     }
+
+    /**
+     * returns whether it is allowed to have multiple ledger/index/journal
+     * Directories in the same filesystem diskpartition
+     *
+     * @return
+     */
+    public boolean isAllowMultipleDirsUnderSameDiskPartition() {
+        return this.getBoolean(ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION, true);
+    }
+
+    /**
+     * Configure the Bookie to allow/disallow multiple ledger/index/journal
+     * directories in the same filesystem diskpartition
+     *
+     * @param allow
+     * 
+     * @return server configuration object.
+     */
+    public ServerConfiguration setAllowMultipleDirsUnderSameDiskPartition(boolean allow) {
+        this.setProperty(ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION, allow);
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
index 6b825d9..21a780c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/GetBookieInfoProcessorV3.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.proto.BookkeeperProtocol.GetBookieInfoRequest;
@@ -60,18 +61,21 @@ public class GetBookieInfoProcessorV3 extends PacketProcessorBaseV3 implements R
         }
         StatusCode status = StatusCode.EOK;
         long freeDiskSpace = 0L, totalDiskSpace = 0L;
-        if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
-            freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace();
-            getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace);
-        }
-        if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
-            totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace();
-            getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace);
-        }
-
-        if (LOG.isDebugEnabled()) {
+        try {
+            if ((requested & GetBookieInfoRequest.Flags.FREE_DISK_SPACE_VALUE) != 0) {
+                freeDiskSpace = requestProcessor.bookie.getTotalFreeSpace();
+                getBookieInfoResponse.setFreeDiskSpace(freeDiskSpace);
+            }
+            if ((requested & GetBookieInfoRequest.Flags.TOTAL_DISK_CAPACITY_VALUE) != 0) {
+                totalDiskSpace = requestProcessor.bookie.getTotalDiskSpace();
+                getBookieInfoResponse.setTotalDiskCapacity(totalDiskSpace);
+            }
             LOG.debug("FreeDiskSpace info is " + freeDiskSpace + " totalDiskSpace is: " + totalDiskSpace);
+        } catch (IOException e) {
+            status = StatusCode.EIO;
+            LOG.error("IOException while getting  freespace/totalspace", e);
         }
+
         getBookieInfoResponse.setStatus(status);
         requestProcessor.getBookieInfoStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos),
                 TimeUnit.NANOSECONDS);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
index 0ed949e..26610fd 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
@@ -20,7 +20,11 @@ package org.apache.bookkeeper.util;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -155,25 +159,59 @@ public class DiskChecker {
         }
     }
 
+
+    /**
+     * Calculate the total amount of free space available
+     * in all of the ledger directories put together.
+     *
+     * @return totalDiskSpace in bytes
+     * @throws IOException 
+     */
+    public long getTotalFreeSpace(List<File> dirs) throws IOException {
+        long totalFreeSpace = 0;
+        Set<FileStore> dirsFileStore = new HashSet<FileStore>();
+        for (File dir : dirs) {
+            FileStore fileStore = Files.getFileStore(dir.toPath());
+            if (dirsFileStore.add(fileStore)) {
+                totalFreeSpace += fileStore.getUsableSpace();
+            }
+        }
+        return totalFreeSpace;
+    }
+
+    /**
+     * Calculate the total amount of free space available
+     * in all of the ledger directories put together.
+     *
+     * @return freeDiskSpace in bytes
+     * @throws IOException 
+     */
+    public long getTotalDiskSpace(List<File> dirs) throws IOException {
+        long totalDiskSpace = 0;
+        Set<FileStore> dirsFileStore = new HashSet<FileStore>();
+        for (File dir : dirs) {
+            FileStore fileStore = Files.getFileStore(dir.toPath());
+            if (dirsFileStore.add(fileStore)) {
+                totalDiskSpace += fileStore.getTotalSpace();
+            }
+        }
+        return totalDiskSpace;
+    }
+    
     /**
      * calculates and returns the disk usage factor in the provided list of dirs
      * 
      * @param dirs
      *            list of directories
      * @return disk usage factor in the provided list of dirs
+     * @throws IOException 
      */
-    public float getTotalDiskUsage(List<File> dirs) {
+    public float getTotalDiskUsage(List<File> dirs) throws IOException {
         if (dirs == null || dirs.isEmpty()) {
             throw new IllegalArgumentException(
                     "list argument of getTotalDiskUsage is not supposed to be null or empty");
         }
-        long totalUsableSpace = 0;
-        long totalSpace = 0;
-        for (File dir : dirs) {
-            totalUsableSpace += dir.getUsableSpace();
-            totalSpace += dir.getTotalSpace();
-        }
-        float free = (float) totalUsableSpace / (float) totalSpace;
+        float free = (float) getTotalFreeSpace(dirs) / (float) getTotalDiskSpace(dirs);
         float used = 1f - free;
         return used;
     }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index 278f08a..41e58ab 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.BindException;
 import java.net.InetAddress;
 
+import org.apache.bookkeeper.bookie.BookieException.DiskPartitionDuplicationException;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -42,6 +43,7 @@ import org.apache.bookkeeper.replication.ReplicationException.UnavailableExcepti
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.test.PortManager;
 import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
 import org.apache.zookeeper.KeeperException;
@@ -265,7 +267,7 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         File tmpDir = createTempDir("bookie", "test");
 
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
-        int port = 12555;
+        int port = PortManager.nextFreePort();
         conf.setZkServers(null).setBookiePort(port).setJournalDirName(
                 tmpDir.getPath()).setLedgerDirNames(
                 new String[] { tmpDir.getPath() });
@@ -515,7 +517,8 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         try {
             // LedgerDirsManager#init() is used in Bookie instantiation.
             // Simulating disk errors by directly calling #init
-            LedgerDirsManager ldm = new LedgerDirsManager(conf, conf.getLedgerDirs());
+            LedgerDirsManager ldm = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                    new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
             LedgerDirsMonitor ledgerMonitor = new LedgerDirsMonitor(conf, 
                     new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), ldm);
             ledgerMonitor.init();
@@ -525,6 +528,111 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase {
         }
     }
 
+    /**
+     * if ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION is disabled then Bookie initialization
+     * will fail if there are multiple ledger/index/journal dirs are in same partition/filesystem.
+     */
+    @Test(timeout = 2000000)
+    public void testAllowDiskPartitionDuplicationDisabled() throws Exception {
+        File tmpDir1 = createTempDir("bookie", "test");
+        File tmpDir2 = createTempDir("bookie", "test");
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        int port = PortManager.nextFreePort();
+        // multiple ledgerdirs in same diskpartition
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setZkTimeout(5000).setBookiePort(port)
+        .setJournalDirName(tmpDir1.getPath())
+        .setLedgerDirNames(new String[] { tmpDir1.getPath(), tmpDir2.getPath() })
+        .setIndexDirName(new String[] { tmpDir1.getPath()});;
+        conf.setAllowMultipleDirsUnderSameDiskPartition(false);
+        BookieServer bs1 = null;
+        try {
+            bs1 = new BookieServer(conf);
+            Assert.fail("Bookkeeper should not have started since AllowMultipleDirsUnderSameDiskPartition is not enabled");
+        } catch (DiskPartitionDuplicationException dpde) {
+            // Expected
+        } finally {
+            if (bs1 != null) {
+                bs1.shutdown();
+            }
+        }
+
+        tmpDir1 = createTempDir("bookie", "test");
+        tmpDir2 = createTempDir("bookie", "test");
+        port = PortManager.nextFreePort();
+        // multiple indexdirs in same diskpartition
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setZkTimeout(5000).setBookiePort(port)
+        .setJournalDirName(tmpDir1.getPath())
+        .setLedgerDirNames(new String[] { tmpDir1.getPath() })
+        .setIndexDirName(new String[] { tmpDir1.getPath(), tmpDir2.getPath() });
+        conf.setAllowMultipleDirsUnderSameDiskPartition(false);
+        bs1 = null;
+        try {
+            bs1 = new BookieServer(conf);
+            Assert.fail("Bookkeeper should not have started since AllowMultipleDirsUnderSameDiskPartition is not enabled");
+        } catch (DiskPartitionDuplicationException dpde) {
+            // Expected
+        } finally {
+            if (bs1 != null) {
+                bs1.shutdown();
+            }
+        }
+
+        tmpDir1 = createTempDir("bookie", "test");
+        tmpDir2 = createTempDir("bookie", "test");
+        port = PortManager.nextFreePort();
+        // multiple journaldirs in same diskpartition
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setZkTimeout(5000).setBookiePort(port)
+        .setJournalDirsName(new String[] { tmpDir1.getPath(), tmpDir2.getPath() })
+        .setLedgerDirNames(new String[] { tmpDir1.getPath() })
+        .setIndexDirName(new String[] { tmpDir1.getPath()});
+        conf.setAllowMultipleDirsUnderSameDiskPartition(false);
+        bs1 = null;
+        try {
+            bs1 = new BookieServer(conf);
+            Assert.fail(
+                    "Bookkeeper should not have started since AllowMultipleDirsUnderSameDiskPartition is not enabled");
+        } catch (DiskPartitionDuplicationException dpde) {
+            // Expected
+        } finally {
+            if (bs1 != null) {
+                bs1.shutdown();
+            }
+        }
+    }
+
+    /**
+     * if ALLOW_MULTIPLEDIRS_UNDER_SAME_DISKPARTITION is enabled then Bookie initialization
+     * should succeed even if there are multiple ledger/index/journal dirs in the same diskpartition/filesystem.
+     */
+    @Test(timeout = 2000000)
+    public void testAllowDiskPartitionDuplicationAllowed() throws Exception {
+        File tmpDir1 = createTempDir("bookie", "test");
+        File tmpDir2 = createTempDir("bookie", "test");
+        File tmpDir3 = createTempDir("bookie", "test");
+        File tmpDir4 = createTempDir("bookie", "test");
+        File tmpDir5 = createTempDir("bookie", "test");
+        File tmpDir6 = createTempDir("bookie", "test");
+
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        int port = 12555;
+        conf.setZkServers(zkUtil.getZooKeeperConnectString()).setZkTimeout(5000).setBookiePort(port)
+                .setJournalDirsName(new String[] { tmpDir1.getPath(), tmpDir2.getPath() })
+                .setLedgerDirNames(new String[] { tmpDir3.getPath(), tmpDir4.getPath() })
+                .setIndexDirName(new String[] { tmpDir5.getPath(), tmpDir6.getPath() });
+        conf.setAllowMultipleDirsUnderSameDiskPartition(true);
+        BookieServer bs1 = null;
+        try {
+            bs1 = new BookieServer(conf);          
+        } catch (DiskPartitionDuplicationException dpde) {
+            Assert.fail("Bookkeeper should have started since AllowMultipleDirsUnderSameDiskPartition is enabled");
+        } finally {
+            if (bs1 != null) {
+                bs1.shutdown();
+            }
+        }
+    }
+    
     private void createNewZKClient() throws Exception {
         // create a zookeeper client
         LOG.debug("Instantiate ZK Client");
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 0f22415..602764b 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -50,6 +50,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.TestUtils;
 import org.apache.bookkeeper.versioning.Version;
@@ -200,7 +201,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         conf.setGcWaitTime(60000);
         conf.setMinorCompactionInterval(120000);
         conf.setMajorCompactionInterval(240000);
-        LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         CheckpointSource cp = new CheckpointSource() {
             @Override
             public Checkpoint newCheckpoint() {
@@ -530,7 +532,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
             };
         final byte[] KEY = "foobar".getBytes();
         File log0 = new File(curDir, "0.log");
-        LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         assertFalse("Log shouldnt exist", log0.exists());
         InterleavedLedgerStorage storage = new InterleavedLedgerStorage();
         storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE);
@@ -647,7 +650,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         Bookie.checkDirectoryStructure(curDir);
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
 
-        LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         final Set<Long> ledgers = Collections
                 .newSetFromMap(new ConcurrentHashMap<Long, Boolean>());
         LedgerManager manager = getLedgerManager(ledgers);
@@ -690,7 +694,8 @@ public abstract class CompactionTest extends BookKeeperClusterTestCase {
         conf.setGcWaitTime(500);
         conf.setMinorCompactionInterval(1);
         conf.setMajorCompactionInterval(2);
-        LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        LedgerDirsManager dirManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         CheckpointSource cp = new CheckpointSource() {
             @Override
             public Checkpoint newCheckpoint() {
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
index 42167a3..0f8dc86 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CreateNewLogTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.junit.Assert;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -82,7 +83,8 @@ public class CreateNewLogTest {
         // Creating a new configuration with a number of 
         // ledger directories.
         conf.setLedgerDirNames(ledgerDirs);
-        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
         
         // Extracted from createNewLog()
@@ -106,7 +108,8 @@ public class CreateNewLogTest {
         // Creating a new configuration with a number of ledger directories.
         conf.setLedgerDirNames(ledgerDirs);
         conf.setIsForceGCAllowWhenNoSpace(true);
-        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        LedgerDirsManager ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         EntryLogger el = new EntryLogger(conf, ledgerDirsManager);
 
         // Extracted from createNewLog()
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 7eac1a2..a8c0596 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -173,7 +174,8 @@ public class EntryLogTest {
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
         EntryLogger entryLogger = null;
         try {
-            entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs()));
+            entryLogger = new EntryLogger(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                    new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
             fail("Expecting FileNotFoundException");
         } catch (FileNotFoundException e) {
             assertEquals("Entry log directory does not exist", e
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 4e36ba1..15a13ee 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -65,7 +65,8 @@ public class IndexPersistenceMgrTest {
         conf.setJournalDirName(journalDir.getPath());
         conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
 
-        ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         ledgerMonitor = new LedgerDirsMonitor(conf, 
                 new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), ledgerDirsManager);
         ledgerMonitor.init();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index 6b96ca1..09869a1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -79,7 +79,8 @@ public class TestLedgerDirsManager {
         conf.setIsForceGCAllowWhenNoSpace(true);
 
         mockDiskChecker = new MockDiskChecker(threshold, warnThreshold);
-        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE);
+        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         ledgerMonitor = new LedgerDirsMonitor(conf, 
                 mockDiskChecker, dirsManager);
         ledgerMonitor.init();
@@ -184,7 +185,8 @@ public class TestLedgerDirsManager {
         conf.setDiskUsageWarnThreshold(warn);
 
         mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
-        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE);
+        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager);
         ledgerMonitor.init();
         final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
@@ -230,7 +232,6 @@ public class TestLedgerDirsManager {
 
         final float nospace = 0.90f;
         final float lwm = 0.80f;
-        final float warn = 0.99f;
         HashMap<File, Float> usageMap;
 
         File tmpDir1 = createTempDir("bkTest", ".dir");
@@ -243,11 +244,12 @@ public class TestLedgerDirsManager {
 
         conf.setDiskUsageThreshold(nospace);
         conf.setDiskLowWaterMarkUsageThreshold(lwm);
-        conf.setDiskUsageWarnThreshold(warn);
+        conf.setDiskUsageWarnThreshold(nospace);
         conf.setLedgerDirNames(new String[] { tmpDir1.toString(), tmpDir2.toString() });
 
         mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
-        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE);
+        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
         ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager);
         usageMap = new HashMap<File, Float>();
         usageMap.put(curDir1, 0.1f);

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <commits@bookkeeper.apache.org>'].

Mime
View raw message