bookkeeper-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From si...@apache.org
Subject bookkeeper git commit: BOOKKEEPER-944: LowWaterMark Storage Threshold
Date Mon, 05 Jun 2017 18:30:56 GMT
Repository: bookkeeper
Updated Branches:
  refs/heads/master 5f945f8a0 -> efd8ec269


BOOKKEEPER-944: LowWaterMark Storage Threshold

BOOKKEEPER-944: LowWaterMark Storage Threshold

LowWaterMark Storage Threshold and code refactoring

    - Current implementation toggles READONLY status of the bookie as soon as a directory
usage falls below the disk storage threshold.
      Added LowWaterMark parameter that limits such switches.
    	1. Bookie transition from RW to RONLY only when all the ledger dirs usage > HWM (storage
threshold)
    	2. Bookie transition from RONLY to  RW only when total system disk usage (ledger/index
disks) capacity is < LWM
    	3. When bookie is in RW mode all disks which are < HWM (storage threshold) are RW
    - refactored code and separated LedgerDirsMonitor from LedgerDirsManager, to remove circular
dependency
      between LedgerDirsManager and LedgerDirsMonitor and also it improves testability by
making them separate classes.
      It becomes easier to do functional/unit level testing and fault-injection testing at
LedgerDirsMonitor class level.
    - relevant testcases

Author: Andrey Yegorov <ayegorovsalesforce.com>
Co-Author: Charan Reddy Guttapalem <cguttapalemsalesforce.com>

Author: Charan Reddy Guttapalem <cguttapalem@salesforce.com>

Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org>

Closes #108 from reddycharan/lwmhwm


Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/efd8ec26
Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/efd8ec26
Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/efd8ec26

Branch: refs/heads/master
Commit: efd8ec26926d2e75f51d6d576543a0dcc116b83f
Parents: 5f945f8
Author: Charan Reddy Guttapalem <cguttapalem@salesforce.com>
Authored: Mon Jun 5 11:30:52 2017 -0700
Committer: Sijie Guo <sijie@apache.org>
Committed: Mon Jun 5 11:30:52 2017 -0700

----------------------------------------------------------------------
 bookkeeper-server/conf/bk_server.conf           |   8 +
 .../org/apache/bookkeeper/bookie/Bookie.java    |  41 ++--
 .../bookkeeper/bookie/LedgerDirsManager.java    | 161 +++-----------
 .../bookkeeper/bookie/LedgerDirsMonitor.java    | 191 +++++++++++++++++
 .../bookkeeper/conf/ServerConfiguration.java    |  30 +++
 .../org/apache/bookkeeper/util/DiskChecker.java |  27 ++-
 .../bookie/BookieInitializationTest.java        |  14 +-
 .../bookie/IndexPersistenceMgrTest.java         |   8 +-
 .../bookie/TestLedgerDirsManager.java           | 214 +++++++++++++++++--
 9 files changed, 526 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/conf/bk_server.conf
----------------------------------------------------------------------
diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index c7fd2ca..7f52902 100644
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -257,6 +257,14 @@ zkTimeout=10000
 #Valid values should be in between 0 and 1 (exclusive). 
 #diskUsageThreshold=0.95
 
+#Set the disk free space low water mark threshold. Disk is considered full when 
+#usage threshold is exceeded. Disk returns back to non-full state when usage is 
+#below low water mark threshold. This prevents it from going back and forth 
+#between these states frequently when concurrent writes and compaction are 
+#happening. This also prevent bookie from switching frequently between 
+#read-only and read-writes states in the same cases.
+#diskUsageLwmThreshold=0.90
+
 #Disk check interval in milli seconds, interval to check the ledger dirs usage.
 #Default is 10000
 #diskCheckInterval=10000

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 5a3856f..f32626a 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -62,6 +62,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.BookKeeperConstants;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.IOUtils;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
@@ -124,6 +125,9 @@ public class Bookie extends BookieCriticalThread {
 
     private final LedgerDirsManager ledgerDirsManager;
     private LedgerDirsManager indexDirsManager;
+    
+    private final LedgerDirsMonitor ledgerMonitor;
+    private final LedgerDirsMonitor idxMonitor;
 
     // ZooKeeper client instance for the Bookie
     ZooKeeper zk;
@@ -657,6 +661,7 @@ public class Bookie extends BookieCriticalThread {
 
         this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(),
                 statsLogger.scope(LD_LEDGER_SCOPE));
+
         File[] idxDirs = conf.getIndexDirs();
         if (null == idxDirs) {
             this.indexDirsManager = this.ledgerDirsManager;
@@ -672,10 +677,28 @@ public class Bookie extends BookieCriticalThread {
         LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName());
         ledgerManager = ledgerManagerFactory.newLedgerManager();
 
-        // Initialise ledgerDirManager. This would look through all the
+        // Initialise ledgerDirMonitor. This would look through all the
         // configured directories. When disk errors or all the ledger
         // directories are full, would throws exception and fail bookie startup.
-        this.ledgerDirsManager.init();
+        this.ledgerMonitor = new LedgerDirsMonitor(conf, 
+                                    new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()),

+                                    ledgerDirsManager);
+        this.ledgerMonitor.init();
+        
+        if (null == idxDirs) {
+            this.idxMonitor = this.ledgerMonitor;
+        } else {
+            this.idxMonitor = new LedgerDirsMonitor(conf, 
+                                        new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()),

+                                        indexDirsManager);
+            this.idxMonitor.init();
+        }
+
+        // ZK ephemeral node for this Bookie.
+        String myID = getMyId();
+        zkBookieRegPath = this.bookieRegistrationPath + myID;
+        zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;
+
         // instantiate the journals
         journals = Lists.newArrayList();
         for(int i=0 ;i<journalDirectories.size();i++) {
@@ -692,14 +715,8 @@ public class Bookie extends BookieCriticalThread {
         ledgerStorage.initialize(conf, ledgerManager, ledgerDirsManager, indexDirsManager,
checkpointSource, statsLogger);
         syncThread = new SyncThread(conf, getLedgerDirsListener(),
                                     ledgerStorage, checkpointSource);
-
         handles = new HandleFactoryImpl(ledgerStorage);
 
-        // ZK ephemeral node for this Bookie.
-        String myID = getMyId();
-        zkBookieRegPath = this.bookieRegistrationPath + myID;
-        zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID;
-
         // Expose Stats
         writeBytes = statsLogger.getCounter(WRITE_BYTES);
         readBytes = statsLogger.getCounter(READ_BYTES);
@@ -797,9 +814,9 @@ public class Bookie extends BookieCriticalThread {
                     journalDirectories.stream().map(File::getName).collect(Collectors.joining(",
")));
         }
         //Start DiskChecker thread
-        ledgerDirsManager.start();
+        ledgerMonitor.start();
         if (indexDirsManager != ledgerDirsManager) {
-            indexDirsManager.start();
+            idxMonitor.start();
         }
         // replay journals
         try {
@@ -1274,9 +1291,9 @@ public class Bookie extends BookieCriticalThread {
                 }
 
                 //Shutdown disk checker
-                ledgerDirsManager.shutdown();
+                ledgerMonitor.shutdown();
                 if (indexDirsManager != ledgerDirsManager) {
-                    indexDirsManager.shutdown();
+                    idxMonitor.shutdown();
                 }
 
                 // Shutdown the ZK client

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
index 8f65c6b..da56950 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsManager.java
@@ -20,6 +20,8 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_WRITABLE_DIRS;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,17 +35,11 @@ import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.util.DiskChecker;
-import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
-import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.bookkeeper.util.DiskChecker.DiskWarnThresholdException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import static org.apache.bookkeeper.bookie.BookKeeperServerStats.LD_WRITABLE_DIRS;
-
 /**
  * This class manages ledger directories used by the bookie.
  */
@@ -54,9 +50,7 @@ public class LedgerDirsManager {
     private volatile List<File> filledDirs;
     private final List<File> ledgerDirectories;
     private volatile List<File> writableLedgerDirectories;
-    private final DiskChecker diskChecker;
     private final List<LedgerDirsListener> listeners;
-    private final LedgerDirsMonitor monitor;
     private final Random rand = new Random();
     private final ConcurrentMap<File, Float> diskUsages =
             new ConcurrentHashMap<File, Float>();
@@ -67,19 +61,13 @@ public class LedgerDirsManager {
         this(conf, dirs, NullStatsLogger.INSTANCE);
     }
 
-    LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) {
-        this(conf, dirs, statsLogger, new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()));
-    }
-
     @VisibleForTesting
-    LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger, DiskChecker
diskChecker) {
+    LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) {
         this.ledgerDirectories = Arrays.asList(Bookie
                 .getCurrentDirectories(dirs));
         this.writableLedgerDirectories = new ArrayList<File>(ledgerDirectories);
         this.filledDirs = new ArrayList<File>();
         this.listeners = new ArrayList<LedgerDirsListener>();
-        this.diskChecker = diskChecker;
-        this.monitor = new LedgerDirsMonitor(conf.getDiskCheckInterval());
         this.forceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace();
         this.entryLogSize = conf.getEntryLogSizeLimit();
         for (File dir : dirs) {
@@ -117,6 +105,14 @@ public class LedgerDirsManager {
     public List<File> getAllLedgerDirs() {
         return ledgerDirectories;
     }
+    
+    /**
+     * Get all dir listeners
+     * @return List<LedgerDirsListener> listeners
+     */
+    public List<LedgerDirsListener> getListeners() {
+        return listeners;
+    }
 
     /**
      * Calculate the total amount of free space available
@@ -147,6 +143,14 @@ public class LedgerDirsManager {
     }
 
     /**
+     * Get disk usages map
+     * @return ConcurrentMap<File, Float> diskUsages
+     */
+    public ConcurrentMap<File, Float> getDiskUsages() {
+        return diskUsages;
+    }
+    
+    /**
      * Get only writable ledger dirs.
      */
     public List<File> getWritableLedgerDirs()
@@ -161,6 +165,13 @@ public class LedgerDirsManager {
         return writableLedgerDirectories;
     }
 
+    /**
+     * returns true if the writableLedgerDirs list has entries
+     */
+    public boolean hasWritableLedgerDirs() {
+        return !writableLedgerDirectories.isEmpty();
+    }
+
     public List<File> getWritableLedgerDirsForNewLog()
         throws NoWritableLedgerDirException {
 
@@ -308,126 +319,6 @@ public class LedgerDirsManager {
     }
 
     /**
-     * Sweep through all the directories to check disk errors or disk full.
-     *
-     * @throws DiskErrorException
-     *             If disk having errors
-     * @throws NoWritableLedgerDirException
-     *             If all the configured ledger directories are full or having
-     *             less space than threshold
-     */
-    public void init() throws DiskErrorException, NoWritableLedgerDirException {
-        monitor.checkDirs(writableLedgerDirectories);
-    }
-
-    // start the daemon for disk monitoring
-    public void start() {
-        monitor.setDaemon(true);
-        monitor.start();
-    }
-
-    // shutdown disk monitoring daemon
-    public void shutdown() {
-        LOG.info("Shutting down LedgerDirsMonitor");
-        monitor.interrupt();
-        try {
-            monitor.join();
-        } catch (InterruptedException e) {
-            // Ignore
-        }
-    }
-
-    /**
-     * Thread to monitor the disk space periodically.
-     */
-    private class LedgerDirsMonitor extends BookieThread {
-        private final int interval;
-
-        public LedgerDirsMonitor(int interval) {
-            super("LedgerDirsMonitorThread");
-            this.interval = interval;
-        }
-
-        @Override
-        public void run() {
-            while (true) {
-                try {
-                    List<File> writableDirs = getWritableLedgerDirs();
-                    // Check all writable dirs disk space usage.
-                    for (File dir : writableDirs) {
-                        try {
-                            diskUsages.put(dir, diskChecker.checkDir(dir));
-                        } catch (DiskErrorException e) {
-                            LOG.error("Ledger directory {} failed on disk checking : ", dir,
e);
-                            // Notify disk failure to all listeners
-                            for (LedgerDirsListener listener : listeners) {
-                                listener.diskFailed(dir);
-                            }
-                        } catch (DiskWarnThresholdException e) {
-                            LOG.warn("Ledger directory {} is almost full.", dir);
-                            diskUsages.put(dir, e.getUsage());
-                            for (LedgerDirsListener listener : listeners) {
-                                listener.diskAlmostFull(dir);
-                            }
-                        } catch (DiskOutOfSpaceException e) {
-                            LOG.error("Ledger directory {} is out-of-space.", dir);
-                            diskUsages.put(dir, e.getUsage());
-                            // Notify disk full to all listeners
-                            addToFilledDirs(dir);
-                        }
-                    }
-                } catch (NoWritableLedgerDirException e) {
-                    for (LedgerDirsListener listener : listeners) {
-                        listener.allDisksFull();
-                    }
-                }
-
-                List<File> fullfilledDirs = new ArrayList<File>(getFullFilledLedgerDirs());
-                // Check all full-filled disk space usage
-                for (File dir : fullfilledDirs) {
-                    try {
-                        diskUsages.put(dir, diskChecker.checkDir(dir));
-                        addToWritableDirs(dir, true);
-                    } catch (DiskErrorException e) {
-                        // Notify disk failure to all the listeners
-                        for (LedgerDirsListener listener : listeners) {
-                            listener.diskFailed(dir);
-                        }
-                    } catch (DiskWarnThresholdException e) {
-                        diskUsages.put(dir, e.getUsage());
-                        // the full-filled dir become writable but still above warn threshold
-                        addToWritableDirs(dir, false);
-                    } catch (DiskOutOfSpaceException e) {
-                        // the full-filled dir is still full-filled
-                        diskUsages.put(dir, e.getUsage());
-                    }
-                }
-                try {
-                    Thread.sleep(interval);
-                } catch (InterruptedException e) {
-                    LOG.info("LedgerDirsMonitor thread is interrupted");
-                    break;
-                }
-            }
-            LOG.info("LedgerDirsMonitorThread exited!");
-        }
-
-        private void checkDirs(List<File> writableDirs)
-                throws DiskErrorException, NoWritableLedgerDirException {
-            for (File dir : writableDirs) {
-                try {
-                    diskChecker.checkDir(dir);
-                } catch (DiskWarnThresholdException e) {
-                    // nop
-                } catch (DiskOutOfSpaceException e) {
-                    addToFilledDirs(dir);
-                }
-            }
-            getWritableLedgerDirs();
-        }
-    }
-
-    /**
      * Indicates All configured ledger directories are full.
      */
     public static class NoWritableLedgerDirException extends IOException {

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
new file mode 100644
index 0000000..0220adf
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java
@@ -0,0 +1,191 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.util.DiskChecker;
+import org.apache.bookkeeper.util.DiskChecker.DiskErrorException;
+import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.bookkeeper.util.DiskChecker.DiskWarnThresholdException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thread to monitor the disk space periodically.
+ */
+class LedgerDirsMonitor extends BookieThread {
+    private final static Logger LOG = LoggerFactory.getLogger(LedgerDirsMonitor.class);
+    
+    private final int interval;
+    private final ServerConfiguration conf;
+    private final ConcurrentMap<File, Float> diskUsages;
+    private final DiskChecker diskChecker;
+    private final LedgerDirsManager ldm;
+
+    public LedgerDirsMonitor(final ServerConfiguration conf, 
+            final DiskChecker diskChecker,
+            final LedgerDirsManager ldm) {
+        super("LedgerDirsMonitorThread");
+        this.interval = conf.getDiskCheckInterval();
+        this.conf = conf;
+        this.diskChecker = diskChecker;
+        this.diskUsages = ldm.getDiskUsages();
+        this.ldm = ldm;
+    }
+
+    @Override
+    public void run() {
+        while (true) {
+            try {
+                List<File> writableDirs = ldm.getWritableLedgerDirs();
+                // Check all writable dirs disk space usage.
+                for (File dir : writableDirs) {
+                    try {
+                        diskUsages.put(dir, diskChecker.checkDir(dir));
+                    } catch (DiskErrorException e) {
+                        LOG.error("Ledger directory {} failed on disk checking : ", dir,
e);
+                        // Notify disk failure to all listeners
+                        for (LedgerDirsListener listener : ldm.getListeners()) {
+                            listener.diskFailed(dir);
+                        }
+                    } catch (DiskWarnThresholdException e) {
+                        LOG.warn("Ledger directory {} is almost full.", dir);
+                        diskUsages.put(dir, e.getUsage());
+                        for (LedgerDirsListener listener : ldm.getListeners()) {
+                            listener.diskAlmostFull(dir);
+                        }
+                    } catch (DiskOutOfSpaceException e) {
+                        LOG.error("Ledger directory {} is out-of-space.", dir);
+                        diskUsages.put(dir, e.getUsage());
+                        // Notify disk full to all listeners
+                        ldm.addToFilledDirs(dir);
+                    }
+                }
+                // Let's get NoWritableLedgerDirException without waiting for the next iteration
+                // in case we are out of writable dirs
+                // otherwise for the duration of {interval} we end up in the state where

+                // bookie cannot get writable dir but considered to be writable 
+                ldm.getWritableLedgerDirs();
+            } catch (NoWritableLedgerDirException e) {
+                for (LedgerDirsListener listener : ldm.getListeners()) {
+                    listener.allDisksFull();
+                }
+            }
+
+            List<File> fullfilledDirs = new ArrayList<File>(ldm.getFullFilledLedgerDirs());
+            boolean hasWritableLedgerDirs = ldm.hasWritableLedgerDirs();
+            float totalDiskUsage = 0;
+
+            // When bookie is in READONLY mode .i.e there are no writableLedgerDirs:
+            // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold.
+            // - If So, walk through the entire list of fullfilledDirs and add them back
to writableLedgerDirs list if
+            // their usage is < conf.getDiskUsageThreshold.
+            if (hasWritableLedgerDirs || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs()))
< conf
+                    .getDiskLowWaterMarkUsageThreshold()) {
+                // Check all full-filled disk space usage
+                for (File dir : fullfilledDirs) {
+                    try {
+                        diskUsages.put(dir, diskChecker.checkDir(dir));
+                        ldm.addToWritableDirs(dir, true);
+                    } catch (DiskErrorException e) {
+                        // Notify disk failure to all the listeners
+                        for (LedgerDirsListener listener : ldm.getListeners()) {
+                            listener.diskFailed(dir);
+                        }
+                    } catch (DiskWarnThresholdException e) {
+                        diskUsages.put(dir, e.getUsage());
+                        // the full-filled dir become writable but still above
+                        // warn threshold
+                        ldm.addToWritableDirs(dir, false);
+                    } catch (DiskOutOfSpaceException e) {
+                        // the full-filled dir is still full-filled
+                        diskUsages.put(dir, e.getUsage());
+                    }
+                }
+            } else {
+                LOG.debug(
+                        "Current TotalDiskUsage: {} is greater than LWMThreshold: {}. So
not adding any filledDir to WritableDirsList",
+                        totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold());
+            }
+            try {
+                Thread.sleep(interval);
+            } catch (InterruptedException e) {
+                LOG.info("LedgerDirsMonitor thread is interrupted");
+                break;
+            }
+        }
+        LOG.info("LedgerDirsMonitorThread exited!");
+    }
+
+    /**
+     * Sweep through all the directories to check disk errors or disk full.
+     *
+     * @throws DiskErrorException
+     *             If disk having errors
+     * @throws NoWritableLedgerDirException
+     *             If all the configured ledger directories are full or having
+     *             less space than threshold
+     */
+    public void init() throws DiskErrorException, NoWritableLedgerDirException {
+        checkDirs(ldm.getWritableLedgerDirs());
+    }
+
+    // start the daemon for disk monitoring
+    @Override
+    public void start() {
+        this.setDaemon(true);
+        super.start();
+    }
+
+    // shutdown disk monitoring daemon
+    public void shutdown() {
+        LOG.info("Shutting down LedgerDirsMonitor");
+        this.interrupt();
+        try {
+            this.join();
+        } catch (InterruptedException e) {
+            // Ignore
+        }
+    }
+
+    public void checkDirs(List<File> writableDirs)
+            throws DiskErrorException, NoWritableLedgerDirException {
+        for (File dir : writableDirs) {
+            try {
+                diskChecker.checkDir(dir);
+            } catch (DiskWarnThresholdException e) {
+                // noop
+            } catch (DiskOutOfSpaceException e) {
+                ldm.addToFilledDirs(dir);
+            }
+        }
+        ldm.getWritableLedgerDirs();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index d018409..a3844d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -105,6 +105,7 @@ public class ServerConfiguration extends AbstractConfiguration {
     //Disk utilization
     protected final static String DISK_USAGE_THRESHOLD = "diskUsageThreshold";
     protected final static String DISK_USAGE_WARN_THRESHOLD = "diskUsageWarnThreshold";
+    protected final static String DISK_USAGE_LWM_THRESHOLD = "diskUsageLwmThreshold";
     protected final static String DISK_CHECK_INTERVAL = "diskCheckInterval";
     protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval";
     protected final static String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval";
@@ -1406,6 +1407,35 @@ public class ServerConfiguration extends AbstractConfiguration {
         return getFloat(DISK_USAGE_THRESHOLD, 0.95f);
     }
 
+    
+    /**
+     * Set the disk free space low water mark threshold. 
+     * Disk is considered full when usage threshold is exceeded.
+     * Disk returns back to non-full state when usage is below low water mark threshold.
+     * This prevents it from going back and forth between these states frequently 
+     * when concurrent writes and compaction are happening. This also prevent bookie from

+     * switching frequently between read-only and read-writes states in the same cases. 

+     *
+     * @param threshold threshold to declare a disk full
+     *
+     * @return ServerConfiguration
+     */
+    public ServerConfiguration setDiskLowWaterMarkUsageThreshold(float threshold) {
+        setProperty(DISK_USAGE_LWM_THRESHOLD, threshold);
+        return this;
+    }
+
+    /**
+     * Returns disk free space low water mark threshold. By default it is the 
+     * same as usage threshold (for backwards-compatibility).
+     *
+     * @return the percentage below which a disk will NOT be considered full
+     */
+    public float getDiskLowWaterMarkUsageThreshold() {
+        return getFloat(DISK_USAGE_LWM_THRESHOLD, getDiskUsageThreshold());
+    }
+
+    
     /**
      * Set the disk checker interval to monitor ledger disk space
      *

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
index b1251a6..0ed949e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java
@@ -20,6 +20,7 @@ package org.apache.bookkeeper.util;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
@@ -155,6 +156,29 @@ public class DiskChecker {
     }
 
     /**
+     * calculates and returns the disk usage factor in the provided list of dirs
+     * 
+     * @param dirs
+     *            list of directories
+     * @return disk usage factor in the provided list of dirs
+     */
+    public float getTotalDiskUsage(List<File> dirs) {
+        if (dirs == null || dirs.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "list argument of getTotalDiskUsage is not supposed to be null or empty");
+        }
+        long totalUsableSpace = 0;
+        long totalSpace = 0;
+        for (File dir : dirs) {
+            totalUsableSpace += dir.getUsableSpace();
+            totalSpace += dir.getTotalSpace();
+        }
+        float free = (float) totalUsableSpace / (float) totalSpace;
+        float used = 1f - free;
+        return used;
+    }
+
+    /**
      * Create the directory if it doesn't exist and
      *
      * @param dir
@@ -191,9 +215,8 @@ public class DiskChecker {
      *
      * @param diskSpaceThreshold
      */
-    @VisibleForTesting
     void setDiskSpaceThreshold(float diskSpaceThreshold, float diskUsageWarnThreshold) {
-        validateThreshold(diskSpaceThreshold, diskSpaceThreshold);
+        validateThreshold(diskSpaceThreshold, diskUsageWarnThreshold);
         this.diskUsageThreshold = diskSpaceThreshold;
         this.diskUsageWarnThreshold = diskUsageWarnThreshold;
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
index e2b3c47..35a9c02 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java
@@ -26,18 +26,19 @@ import java.io.File;
 import java.io.IOException;
 import java.net.BindException;
 import java.net.InetAddress;
-import org.junit.Assert;
 
-import org.apache.bookkeeper.conf.TestBKConfiguration;
+import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.client.BookKeeperAdmin;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.proto.BookieServer;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.KeeperException;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -362,7 +363,10 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase
{
             // LedgerDirsManager#init() is used in Bookie instantiation.
             // Simulating disk errors by directly calling #init
             LedgerDirsManager ldm = new LedgerDirsManager(conf, conf.getLedgerDirs());
-            ldm.init();
+            LedgerDirsMonitor ledgerMonitor = new LedgerDirsMonitor(conf, 
+                    new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()),
ldm);
+            ledgerMonitor.init();
+            fail("should throw exception");
         } catch (Exception e) {
             // expected
         }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index d97343c..4e36ba1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie;
 
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.util.DiskChecker;
 import org.apache.bookkeeper.util.SnapshotMap;
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -45,6 +46,7 @@ public class IndexPersistenceMgrTest {
     ServerConfiguration conf;
     File journalDir, ledgerDir;
     LedgerDirsManager ledgerDirsManager;
+    LedgerDirsMonitor ledgerMonitor;
 
     @Before
     public void setUp() throws Exception {
@@ -64,11 +66,15 @@ public class IndexPersistenceMgrTest {
         conf.setLedgerDirNames(new String[] { ledgerDir.getPath() });
 
         ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs());
+        ledgerMonitor = new LedgerDirsMonitor(conf, 
+                new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()),
ledgerDirsManager);
+        ledgerMonitor.init();
     }
 
     @After
     public void tearDown() throws Exception {
-        ledgerDirsManager.shutdown();
+        //TODO: it is being shut down but never started. why?
+        ledgerMonitor.shutdown();
         FileUtils.deleteDirectory(journalDir);
         FileUtils.deleteDirectory(ledgerDir);
     }

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
----------------------------------------------------------------------
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
index b72cfc7..6b96ca1 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java
@@ -28,7 +28,9 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
@@ -47,9 +49,10 @@ import org.slf4j.LoggerFactory;
 public class TestLedgerDirsManager {
     private final static Logger LOG = LoggerFactory.getLogger(TestLedgerDirsManager.class);
 
-    ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+    ServerConfiguration conf;
     File curDir;
     LedgerDirsManager dirsManager;
+    LedgerDirsMonitor ledgerMonitor;
     MockDiskChecker mockDiskChecker;
     int diskCheckInterval = 1000;
     float threshold = 0.5f;
@@ -69,19 +72,22 @@ public class TestLedgerDirsManager {
         curDir = Bookie.getCurrentDirectory(tmpDir);
         Bookie.checkDirectoryStructure(curDir);
 
-        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf = TestBKConfiguration.newServerConfiguration();
         conf.setLedgerDirNames(new String[] { tmpDir.toString() });
+        conf.setDiskLowWaterMarkUsageThreshold(conf.getDiskUsageThreshold());
         conf.setDiskCheckInterval(diskCheckInterval);
         conf.setIsForceGCAllowWhenNoSpace(true);
 
         mockDiskChecker = new MockDiskChecker(threshold, warnThreshold);
-        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE,
mockDiskChecker);
-        dirsManager.init();
+        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE);
+        ledgerMonitor = new LedgerDirsMonitor(conf, 
+                mockDiskChecker, dirsManager);
+        ledgerMonitor.init();
     }
 
     @After
     public void tearDown() throws Exception {
-        dirsManager.shutdown();
+        ledgerMonitor.shutdown();
         for (File dir : tempDirs) {
             FileUtils.deleteDirectory(dir);
         }
@@ -146,7 +152,7 @@ public class TestLedgerDirsManager {
 
         MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
         dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
-        dirsManager.start();
+        ledgerMonitor.start();
 
         assertFalse(mockLedgerDirsListener.readOnly);
         mockDiskChecker.setUsage(threshold + 0.05f);
@@ -161,9 +167,166 @@ public class TestLedgerDirsManager {
         assertFalse(mockLedgerDirsListener.readOnly);
     }
 
+    @Test(timeout = 60000)
+    public void testLedgerDirsMonitorHandlingLowWaterMark() throws Exception {
+
+        ledgerMonitor.shutdown();
+
+        final float warn = 0.90f;
+        final float nospace = 0.98f;
+        final float lwm = (warn + nospace) / 2;
+        final float lwm2warn = (warn + lwm) / 2;
+        final float lwm2nospace = (lwm + nospace) / 2;
+        final float nospaceExceeded = nospace + 0.005f;
+
+        conf.setDiskUsageThreshold(nospace);
+        conf.setDiskLowWaterMarkUsageThreshold(lwm);
+        conf.setDiskUsageWarnThreshold(warn);
+
+        mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
+        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE);
+        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager);
+        ledgerMonitor.init();
+        final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
+        dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
+        ledgerMonitor.start();
+
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        assertFalse(mockLedgerDirsListener.readOnly);
+
+        // go above LWM but below threshold
+        // should still be writable
+        mockDiskChecker.setUsage(lwm2nospace);
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        assertFalse(mockLedgerDirsListener.readOnly);
+
+        // exceed the threshold, should go to readonly
+        mockDiskChecker.setUsage(nospaceExceeded);
+        Thread.sleep(diskCheckInterval + 100);
+        assertTrue(mockLedgerDirsListener.readOnly);
+
+        // drop below threshold but above LWM
+        // should stay read-only
+        mockDiskChecker.setUsage(lwm2nospace);
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        assertTrue(mockLedgerDirsListener.readOnly);
+
+        // drop below LWM
+        // should become writable
+        mockDiskChecker.setUsage(lwm2warn);
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        assertFalse(mockLedgerDirsListener.readOnly);
+
+        // go above LWM but below threshold
+        // should still be writable
+        mockDiskChecker.setUsage(lwm2nospace);
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        assertFalse(mockLedgerDirsListener.readOnly);
+    }
+
+    @Test(timeout = 60000)
+    public void testLedgerDirsMonitorHandlingWithMultipleLedgerDirectories() throws Exception
{
+        ledgerMonitor.shutdown();
+
+        final float nospace = 0.90f;
+        final float lwm = 0.80f;
+        final float warn = 0.99f;
+        HashMap<File, Float> usageMap;
+
+        File tmpDir1 = createTempDir("bkTest", ".dir");
+        File curDir1 = Bookie.getCurrentDirectory(tmpDir1);
+        Bookie.checkDirectoryStructure(curDir1);
+
+        File tmpDir2 = createTempDir("bkTest", ".dir");
+        File curDir2 = Bookie.getCurrentDirectory(tmpDir2);
+        Bookie.checkDirectoryStructure(curDir2);
+
+        conf.setDiskUsageThreshold(nospace);
+        conf.setDiskLowWaterMarkUsageThreshold(lwm);
+        conf.setDiskUsageWarnThreshold(warn);
+        conf.setLedgerDirNames(new String[] { tmpDir1.toString(), tmpDir2.toString() });
+
+        mockDiskChecker = new MockDiskChecker(nospace, warnThreshold);
+        dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE);
+        ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager);
+        usageMap = new HashMap<File, Float>();
+        usageMap.put(curDir1, 0.1f);
+        usageMap.put(curDir2, 0.1f);
+        mockDiskChecker.setUsageMap(usageMap);
+        ledgerMonitor.init();
+        final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener();
+        dirsManager.addLedgerDirsListener(mockLedgerDirsListener);
+        ledgerMonitor.start();
+
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        assertFalse(mockLedgerDirsListener.readOnly);
+
+        // go above LWM but below threshold
+        // should still be writable
+        setUsageAndThenVerify(curDir1, lwm + 0.05f, curDir2, lwm + 0.05f, mockDiskChecker,
mockLedgerDirsListener,
+                false);
+
+        // one dir usagespace above storagethreshold, another dir below storagethreshold
+        // should still be writable
+        setUsageAndThenVerify(curDir1, nospace + 0.02f, curDir2, nospace - 0.05f, mockDiskChecker,
+                mockLedgerDirsListener, false);
+
+        // should remain readonly
+        setUsageAndThenVerify(curDir1, nospace + 0.05f, curDir2, nospace + 0.02f, mockDiskChecker,
+                mockLedgerDirsListener, true);
+
+        // bring the disk usages to less than the threshold,
+        // but more than the LWM.
+        // should still be readonly
+        setUsageAndThenVerify(curDir1, nospace - 0.05f, curDir2, nospace - 0.05f, mockDiskChecker,
+                mockLedgerDirsListener, true);
+
+        // bring one dir diskusage to less than lwm,
+        // the other dir to be more than lwm, but the
+        // overall diskusage to be more than lwm
+        // should still be readonly
+        setUsageAndThenVerify(curDir1, lwm - 0.03f, curDir2, lwm + 0.07f, mockDiskChecker,
mockLedgerDirsListener,
+                true);
+
+        // bring one dir diskusage to much less than lwm,
+        // the other dir to be more than storage threahold, but the
+        // overall diskusage is less than lwm
+        // should goto readwrite
+        setUsageAndThenVerify(curDir1, lwm - 0.17f, curDir2, nospace + 0.03f, mockDiskChecker,
mockLedgerDirsListener,
+                false);
+        assertTrue("Only one LedgerDir should be writable", dirsManager.getWritableLedgerDirs().size()
== 1);
+
+        // bring both the dirs below lwm
+        // should still be readwrite
+        setUsageAndThenVerify(curDir1, lwm - 0.03f, curDir2, lwm - 0.02f, mockDiskChecker,
mockLedgerDirsListener,
+                false);
+        assertTrue("Both the LedgerDirs should be writable", dirsManager.getWritableLedgerDirs().size()
== 2);
+
+        // bring both the dirs above lwm but < threshold
+        // should still be readwrite
+        setUsageAndThenVerify(curDir1, lwm + 0.02f, curDir2, lwm + 0.08f, mockDiskChecker,
mockLedgerDirsListener,
+                false);
+    }
+
+    private void setUsageAndThenVerify(File dir1, float dir1Usage, File dir2, float dir2Usage,
+            MockDiskChecker mockDiskChecker, MockLedgerDirsListener mockLedgerDirsListener,
boolean verifyReadOnly)
+            throws InterruptedException {
+        HashMap<File, Float> usageMap = new HashMap<File, Float>();
+        usageMap.put(dir1, dir1Usage);
+        usageMap.put(dir2, dir2Usage);
+        mockDiskChecker.setUsageMap(usageMap);
+        Thread.sleep((diskCheckInterval * 2) + 100);
+        if (verifyReadOnly) {
+            assertTrue(mockLedgerDirsListener.readOnly);
+        } else {
+            assertFalse(mockLedgerDirsListener.readOnly);
+        }
+    }
+
     private class MockDiskChecker extends DiskChecker {
 
-        private float used;
+        private volatile float used;
+        private volatile Map<File, Float> usageMap = null;
 
         public MockDiskChecker(float threshold, float warnThreshold) {
             super(threshold, warnThreshold);
@@ -172,23 +335,48 @@ public class TestLedgerDirsManager {
 
         @Override
         public float checkDir(File dir) throws DiskErrorException, DiskOutOfSpaceException,
DiskWarnThresholdException {
-            if (used > getDiskUsageThreshold()) {
-                throw new DiskOutOfSpaceException("", used);
+            float dirUsage = getDirUsage(dir);
+
+            if (dirUsage > getDiskUsageThreshold()) {
+                throw new DiskOutOfSpaceException("", dirUsage);
+            }
+            if (dirUsage > getDiskUsageWarnThreshold()) {
+                throw new DiskWarnThresholdException("", dirUsage);
             }
-            if (used > getDiskUsageWarnThreshold()) {
-                throw new DiskWarnThresholdException("", used);
+            return dirUsage;
+        }
+
+        @Override
+        public float getTotalDiskUsage(List<File> dirs) {
+            float accumulatedDiskUsage = 0f;
+            for (File dir : dirs) {
+                accumulatedDiskUsage += getDirUsage(dir);
             }
-            return used;
+            return (accumulatedDiskUsage / dirs.size());
+        }
+
+        public float getDirUsage(File dir) {
+            float dirUsage;
+            if ((usageMap == null) || (!usageMap.containsKey(dir))) {
+                dirUsage = used;
+            } else {
+                dirUsage = usageMap.get(dir);
+            }
+            return dirUsage;
         }
 
         public void setUsage(float usage) {
             this.used = usage;
         }
+
+        public void setUsageMap(Map<File, Float> usageMap) {
+            this.usageMap = usageMap;
+        }
     }
 
     private class MockLedgerDirsListener implements LedgerDirsListener {
 
-        public boolean readOnly;
+        public volatile boolean readOnly;
 
         public MockLedgerDirsListener() {
             reset();


Mime
View raw message