Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A39D9200C7C for ; Mon, 5 Jun 2017 20:31:05 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A25DF160BD4; Mon, 5 Jun 2017 18:31:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CA03B160BBB for ; Mon, 5 Jun 2017 20:31:03 +0200 (CEST) Received: (qmail 97777 invoked by uid 500); 5 Jun 2017 18:30:58 -0000 Mailing-List: contact commits-help@bookkeeper.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: bookkeeper-dev@bookkeeper.apache.org Delivered-To: mailing list commits@bookkeeper.apache.org Received: (qmail 97767 invoked by uid 99); 5 Jun 2017 18:30:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 05 Jun 2017 18:30:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 86115DFB94; Mon, 5 Jun 2017 18:30:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sijie@apache.org To: commits@bookkeeper.apache.org Message-Id: <045225b6ed974e6da6a1612b720e3904@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: bookkeeper git commit: BOOKKEEPER-944: LowWaterMark Storage Threshold Date: Mon, 5 Jun 2017 18:30:56 +0000 (UTC) archived-at: Mon, 05 Jun 2017 18:31:05 -0000 Repository: bookkeeper Updated Branches: refs/heads/master 5f945f8a0 -> efd8ec269 BOOKKEEPER-944: LowWaterMark Storage Threshold BOOKKEEPER-944: LowWaterMark Storage Threshold LowWaterMark Storage Threshold and code refactoring - Current implementation toggles READONLY status of the bookie as soon as a directory usage falls below the disk storage threshold. Added LowWaterMark parameter that limits such switches. 1. Bookie transition from RW to RONLY only when all the ledger dirs usage > HWM (storage threshold) 2. Bookie transition from RONLY to RW only when total system disk usage (ledger/index disks) capacity is < LWM 3. When bookie is in RW mode all disks which are < HWM (storage threshold) are RW - refactored code and separated LedgerDirsMonitor from LedgerDirsManager, to remove circular dependency between LedgerDirsManager and LedgerDirsMonitor and also it improves testability by making them separate classes. It becomes easier to do functional/unit level testing and fault-injection testing at LedgerDirsMonitor class level. - relevant testcases Author: Andrey Yegorov Co-Author: Charan Reddy Guttapalem Author: Charan Reddy Guttapalem Reviewers: Enrico Olivelli , Sijie Guo Closes #108 from reddycharan/lwmhwm Project: http://git-wip-us.apache.org/repos/asf/bookkeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/bookkeeper/commit/efd8ec26 Tree: http://git-wip-us.apache.org/repos/asf/bookkeeper/tree/efd8ec26 Diff: http://git-wip-us.apache.org/repos/asf/bookkeeper/diff/efd8ec26 Branch: refs/heads/master Commit: efd8ec26926d2e75f51d6d576543a0dcc116b83f Parents: 5f945f8 Author: Charan Reddy Guttapalem Authored: Mon Jun 5 11:30:52 2017 -0700 Committer: Sijie Guo Committed: Mon Jun 5 11:30:52 2017 -0700 ---------------------------------------------------------------------- bookkeeper-server/conf/bk_server.conf | 8 + .../org/apache/bookkeeper/bookie/Bookie.java | 41 ++-- .../bookkeeper/bookie/LedgerDirsManager.java | 161 +++----------- .../bookkeeper/bookie/LedgerDirsMonitor.java | 191 +++++++++++++++++ .../bookkeeper/conf/ServerConfiguration.java | 30 +++ .../org/apache/bookkeeper/util/DiskChecker.java | 27 ++- .../bookie/BookieInitializationTest.java | 14 +- .../bookie/IndexPersistenceMgrTest.java | 8 +- .../bookie/TestLedgerDirsManager.java | 214 +++++++++++++++++-- 9 files changed, 526 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/conf/bk_server.conf ---------------------------------------------------------------------- diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf index c7fd2ca..7f52902 100644 --- a/bookkeeper-server/conf/bk_server.conf +++ b/bookkeeper-server/conf/bk_server.conf @@ -257,6 +257,14 @@ zkTimeout=10000 #Valid values should be in between 0 and 1 (exclusive). #diskUsageThreshold=0.95 +#Set the disk free space low water mark threshold. Disk is considered full when +#usage threshold is exceeded. Disk returns back to non-full state when usage is +#below low water mark threshold. This prevents it from going back and forth +#between these states frequently when concurrent writes and compaction are +#happening. This also prevent bookie from switching frequently between +#read-only and read-writes states in the same cases. +#diskUsageLwmThreshold=0.90 + #Disk check interval in milli seconds, interval to check the ledger dirs usage. #Default is 10000 #diskCheckInterval=10000 http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java index 5a3856f..f32626a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java @@ -62,6 +62,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.bookkeeper.util.BookKeeperConstants; +import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.MathUtils; import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap; @@ -124,6 +125,9 @@ public class Bookie extends BookieCriticalThread { private final LedgerDirsManager ledgerDirsManager; private LedgerDirsManager indexDirsManager; + + private final LedgerDirsMonitor ledgerMonitor; + private final LedgerDirsMonitor idxMonitor; // ZooKeeper client instance for the Bookie ZooKeeper zk; @@ -657,6 +661,7 @@ public class Bookie extends BookieCriticalThread { this.ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), statsLogger.scope(LD_LEDGER_SCOPE)); + File[] idxDirs = conf.getIndexDirs(); if (null == idxDirs) { this.indexDirsManager = this.ledgerDirsManager; @@ -672,10 +677,28 @@ public class Bookie extends BookieCriticalThread { LOG.info("instantiate ledger manager {}", ledgerManagerFactory.getClass().getName()); ledgerManager = ledgerManagerFactory.newLedgerManager(); - // Initialise ledgerDirManager. This would look through all the + // Initialise ledgerDirMonitor. This would look through all the // configured directories. When disk errors or all the ledger // directories are full, would throws exception and fail bookie startup. - this.ledgerDirsManager.init(); + this.ledgerMonitor = new LedgerDirsMonitor(conf, + new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), + ledgerDirsManager); + this.ledgerMonitor.init(); + + if (null == idxDirs) { + this.idxMonitor = this.ledgerMonitor; + } else { + this.idxMonitor = new LedgerDirsMonitor(conf, + new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), + indexDirsManager); + this.idxMonitor.init(); + } + + // ZK ephemeral node for this Bookie. + String myID = getMyId(); + zkBookieRegPath = this.bookieRegistrationPath + myID; + zkBookieReadOnlyPath = this.bookieReadonlyRegistrationPath + "/" + myID; + // instantiate the journals journals = Lists.newArrayList(); for(int i=0 ;i filledDirs; private final List ledgerDirectories; private volatile List writableLedgerDirectories; - private final DiskChecker diskChecker; private final List listeners; - private final LedgerDirsMonitor monitor; private final Random rand = new Random(); private final ConcurrentMap diskUsages = new ConcurrentHashMap(); @@ -67,19 +61,13 @@ public class LedgerDirsManager { this(conf, dirs, NullStatsLogger.INSTANCE); } - LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) { - this(conf, dirs, statsLogger, new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())); - } - @VisibleForTesting - LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger, DiskChecker diskChecker) { + LedgerDirsManager(ServerConfiguration conf, File[] dirs, StatsLogger statsLogger) { this.ledgerDirectories = Arrays.asList(Bookie .getCurrentDirectories(dirs)); this.writableLedgerDirectories = new ArrayList(ledgerDirectories); this.filledDirs = new ArrayList(); this.listeners = new ArrayList(); - this.diskChecker = diskChecker; - this.monitor = new LedgerDirsMonitor(conf.getDiskCheckInterval()); this.forceGCAllowWhenNoSpace = conf.getIsForceGCAllowWhenNoSpace(); this.entryLogSize = conf.getEntryLogSizeLimit(); for (File dir : dirs) { @@ -117,6 +105,14 @@ public class LedgerDirsManager { public List getAllLedgerDirs() { return ledgerDirectories; } + + /** + * Get all dir listeners + * @return List listeners + */ + public List getListeners() { + return listeners; + } /** * Calculate the total amount of free space available @@ -147,6 +143,14 @@ public class LedgerDirsManager { } /** + * Get disk usages map + * @return ConcurrentMap diskUsages + */ + public ConcurrentMap getDiskUsages() { + return diskUsages; + } + + /** * Get only writable ledger dirs. */ public List getWritableLedgerDirs() @@ -161,6 +165,13 @@ public class LedgerDirsManager { return writableLedgerDirectories; } + /** + * returns true if the writableLedgerDirs list has entries + */ + public boolean hasWritableLedgerDirs() { + return !writableLedgerDirectories.isEmpty(); + } + public List getWritableLedgerDirsForNewLog() throws NoWritableLedgerDirException { @@ -308,126 +319,6 @@ public class LedgerDirsManager { } /** - * Sweep through all the directories to check disk errors or disk full. - * - * @throws DiskErrorException - * If disk having errors - * @throws NoWritableLedgerDirException - * If all the configured ledger directories are full or having - * less space than threshold - */ - public void init() throws DiskErrorException, NoWritableLedgerDirException { - monitor.checkDirs(writableLedgerDirectories); - } - - // start the daemon for disk monitoring - public void start() { - monitor.setDaemon(true); - monitor.start(); - } - - // shutdown disk monitoring daemon - public void shutdown() { - LOG.info("Shutting down LedgerDirsMonitor"); - monitor.interrupt(); - try { - monitor.join(); - } catch (InterruptedException e) { - // Ignore - } - } - - /** - * Thread to monitor the disk space periodically. - */ - private class LedgerDirsMonitor extends BookieThread { - private final int interval; - - public LedgerDirsMonitor(int interval) { - super("LedgerDirsMonitorThread"); - this.interval = interval; - } - - @Override - public void run() { - while (true) { - try { - List writableDirs = getWritableLedgerDirs(); - // Check all writable dirs disk space usage. - for (File dir : writableDirs) { - try { - diskUsages.put(dir, diskChecker.checkDir(dir)); - } catch (DiskErrorException e) { - LOG.error("Ledger directory {} failed on disk checking : ", dir, e); - // Notify disk failure to all listeners - for (LedgerDirsListener listener : listeners) { - listener.diskFailed(dir); - } - } catch (DiskWarnThresholdException e) { - LOG.warn("Ledger directory {} is almost full.", dir); - diskUsages.put(dir, e.getUsage()); - for (LedgerDirsListener listener : listeners) { - listener.diskAlmostFull(dir); - } - } catch (DiskOutOfSpaceException e) { - LOG.error("Ledger directory {} is out-of-space.", dir); - diskUsages.put(dir, e.getUsage()); - // Notify disk full to all listeners - addToFilledDirs(dir); - } - } - } catch (NoWritableLedgerDirException e) { - for (LedgerDirsListener listener : listeners) { - listener.allDisksFull(); - } - } - - List fullfilledDirs = new ArrayList(getFullFilledLedgerDirs()); - // Check all full-filled disk space usage - for (File dir : fullfilledDirs) { - try { - diskUsages.put(dir, diskChecker.checkDir(dir)); - addToWritableDirs(dir, true); - } catch (DiskErrorException e) { - // Notify disk failure to all the listeners - for (LedgerDirsListener listener : listeners) { - listener.diskFailed(dir); - } - } catch (DiskWarnThresholdException e) { - diskUsages.put(dir, e.getUsage()); - // the full-filled dir become writable but still above warn threshold - addToWritableDirs(dir, false); - } catch (DiskOutOfSpaceException e) { - // the full-filled dir is still full-filled - diskUsages.put(dir, e.getUsage()); - } - } - try { - Thread.sleep(interval); - } catch (InterruptedException e) { - LOG.info("LedgerDirsMonitor thread is interrupted"); - break; - } - } - LOG.info("LedgerDirsMonitorThread exited!"); - } - - private void checkDirs(List writableDirs) - throws DiskErrorException, NoWritableLedgerDirException { - for (File dir : writableDirs) { - try { - diskChecker.checkDir(dir); - } catch (DiskWarnThresholdException e) { - // nop - } catch (DiskOutOfSpaceException e) { - addToFilledDirs(dir); - } - } - getWritableLedgerDirs(); - } - } - - /** * Indicates All configured ledger directories are full. */ public static class NoWritableLedgerDirException extends IOException { http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java new file mode 100644 index 0000000..0220adf --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDirsMonitor.java @@ -0,0 +1,191 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.bookkeeper.bookie; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentMap; + +import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; +import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; +import org.apache.bookkeeper.conf.ServerConfiguration; +import org.apache.bookkeeper.util.DiskChecker; +import org.apache.bookkeeper.util.DiskChecker.DiskErrorException; +import org.apache.bookkeeper.util.DiskChecker.DiskOutOfSpaceException; +import org.apache.bookkeeper.util.DiskChecker.DiskWarnThresholdException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Thread to monitor the disk space periodically. + */ +class LedgerDirsMonitor extends BookieThread { + private final static Logger LOG = LoggerFactory.getLogger(LedgerDirsMonitor.class); + + private final int interval; + private final ServerConfiguration conf; + private final ConcurrentMap diskUsages; + private final DiskChecker diskChecker; + private final LedgerDirsManager ldm; + + public LedgerDirsMonitor(final ServerConfiguration conf, + final DiskChecker diskChecker, + final LedgerDirsManager ldm) { + super("LedgerDirsMonitorThread"); + this.interval = conf.getDiskCheckInterval(); + this.conf = conf; + this.diskChecker = diskChecker; + this.diskUsages = ldm.getDiskUsages(); + this.ldm = ldm; + } + + @Override + public void run() { + while (true) { + try { + List writableDirs = ldm.getWritableLedgerDirs(); + // Check all writable dirs disk space usage. + for (File dir : writableDirs) { + try { + diskUsages.put(dir, diskChecker.checkDir(dir)); + } catch (DiskErrorException e) { + LOG.error("Ledger directory {} failed on disk checking : ", dir, e); + // Notify disk failure to all listeners + for (LedgerDirsListener listener : ldm.getListeners()) { + listener.diskFailed(dir); + } + } catch (DiskWarnThresholdException e) { + LOG.warn("Ledger directory {} is almost full.", dir); + diskUsages.put(dir, e.getUsage()); + for (LedgerDirsListener listener : ldm.getListeners()) { + listener.diskAlmostFull(dir); + } + } catch (DiskOutOfSpaceException e) { + LOG.error("Ledger directory {} is out-of-space.", dir); + diskUsages.put(dir, e.getUsage()); + // Notify disk full to all listeners + ldm.addToFilledDirs(dir); + } + } + // Let's get NoWritableLedgerDirException without waiting for the next iteration + // in case we are out of writable dirs + // otherwise for the duration of {interval} we end up in the state where + // bookie cannot get writable dir but considered to be writable + ldm.getWritableLedgerDirs(); + } catch (NoWritableLedgerDirException e) { + for (LedgerDirsListener listener : ldm.getListeners()) { + listener.allDisksFull(); + } + } + + List fullfilledDirs = new ArrayList(ldm.getFullFilledLedgerDirs()); + boolean hasWritableLedgerDirs = ldm.hasWritableLedgerDirs(); + float totalDiskUsage = 0; + + // When bookie is in READONLY mode .i.e there are no writableLedgerDirs: + // - Check if the total disk usage is below DiskLowWaterMarkUsageThreshold. + // - If So, walk through the entire list of fullfilledDirs and add them back to writableLedgerDirs list if + // their usage is < conf.getDiskUsageThreshold. + if (hasWritableLedgerDirs || (totalDiskUsage = diskChecker.getTotalDiskUsage(ldm.getAllLedgerDirs())) < conf + .getDiskLowWaterMarkUsageThreshold()) { + // Check all full-filled disk space usage + for (File dir : fullfilledDirs) { + try { + diskUsages.put(dir, diskChecker.checkDir(dir)); + ldm.addToWritableDirs(dir, true); + } catch (DiskErrorException e) { + // Notify disk failure to all the listeners + for (LedgerDirsListener listener : ldm.getListeners()) { + listener.diskFailed(dir); + } + } catch (DiskWarnThresholdException e) { + diskUsages.put(dir, e.getUsage()); + // the full-filled dir become writable but still above + // warn threshold + ldm.addToWritableDirs(dir, false); + } catch (DiskOutOfSpaceException e) { + // the full-filled dir is still full-filled + diskUsages.put(dir, e.getUsage()); + } + } + } else { + LOG.debug( + "Current TotalDiskUsage: {} is greater than LWMThreshold: {}. So not adding any filledDir to WritableDirsList", + totalDiskUsage, conf.getDiskLowWaterMarkUsageThreshold()); + } + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + LOG.info("LedgerDirsMonitor thread is interrupted"); + break; + } + } + LOG.info("LedgerDirsMonitorThread exited!"); + } + + /** + * Sweep through all the directories to check disk errors or disk full. + * + * @throws DiskErrorException + * If disk having errors + * @throws NoWritableLedgerDirException + * If all the configured ledger directories are full or having + * less space than threshold + */ + public void init() throws DiskErrorException, NoWritableLedgerDirException { + checkDirs(ldm.getWritableLedgerDirs()); + } + + // start the daemon for disk monitoring + @Override + public void start() { + this.setDaemon(true); + super.start(); + } + + // shutdown disk monitoring daemon + public void shutdown() { + LOG.info("Shutting down LedgerDirsMonitor"); + this.interrupt(); + try { + this.join(); + } catch (InterruptedException e) { + // Ignore + } + } + + public void checkDirs(List writableDirs) + throws DiskErrorException, NoWritableLedgerDirException { + for (File dir : writableDirs) { + try { + diskChecker.checkDir(dir); + } catch (DiskWarnThresholdException e) { + // noop + } catch (DiskOutOfSpaceException e) { + ldm.addToFilledDirs(dir); + } + } + ldm.getWritableLedgerDirs(); + } +} + http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java index d018409..a3844d1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java @@ -105,6 +105,7 @@ public class ServerConfiguration extends AbstractConfiguration { //Disk utilization protected final static String DISK_USAGE_THRESHOLD = "diskUsageThreshold"; protected final static String DISK_USAGE_WARN_THRESHOLD = "diskUsageWarnThreshold"; + protected final static String DISK_USAGE_LWM_THRESHOLD = "diskUsageLwmThreshold"; protected final static String DISK_CHECK_INTERVAL = "diskCheckInterval"; protected final static String AUDITOR_PERIODIC_CHECK_INTERVAL = "auditorPeriodicCheckInterval"; protected final static String AUDITOR_PERIODIC_BOOKIE_CHECK_INTERVAL = "auditorPeriodicBookieCheckInterval"; @@ -1406,6 +1407,35 @@ public class ServerConfiguration extends AbstractConfiguration { return getFloat(DISK_USAGE_THRESHOLD, 0.95f); } + + /** + * Set the disk free space low water mark threshold. + * Disk is considered full when usage threshold is exceeded. + * Disk returns back to non-full state when usage is below low water mark threshold. + * This prevents it from going back and forth between these states frequently + * when concurrent writes and compaction are happening. This also prevent bookie from + * switching frequently between read-only and read-writes states in the same cases. + * + * @param threshold threshold to declare a disk full + * + * @return ServerConfiguration + */ + public ServerConfiguration setDiskLowWaterMarkUsageThreshold(float threshold) { + setProperty(DISK_USAGE_LWM_THRESHOLD, threshold); + return this; + } + + /** + * Returns disk free space low water mark threshold. By default it is the + * same as usage threshold (for backwards-compatibility). + * + * @return the percentage below which a disk will NOT be considered full + */ + public float getDiskLowWaterMarkUsageThreshold() { + return getFloat(DISK_USAGE_LWM_THRESHOLD, getDiskUsageThreshold()); + } + + /** * Set the disk checker interval to monitor ledger disk space * http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java index b1251a6..0ed949e 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/DiskChecker.java @@ -20,6 +20,7 @@ package org.apache.bookkeeper.util; import java.io.File; import java.io.IOException; +import java.util.List; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -155,6 +156,29 @@ public class DiskChecker { } /** + * calculates and returns the disk usage factor in the provided list of dirs + * + * @param dirs + * list of directories + * @return disk usage factor in the provided list of dirs + */ + public float getTotalDiskUsage(List dirs) { + if (dirs == null || dirs.isEmpty()) { + throw new IllegalArgumentException( + "list argument of getTotalDiskUsage is not supposed to be null or empty"); + } + long totalUsableSpace = 0; + long totalSpace = 0; + for (File dir : dirs) { + totalUsableSpace += dir.getUsableSpace(); + totalSpace += dir.getTotalSpace(); + } + float free = (float) totalUsableSpace / (float) totalSpace; + float used = 1f - free; + return used; + } + + /** * Create the directory if it doesn't exist and * * @param dir @@ -191,9 +215,8 @@ public class DiskChecker { * * @param diskSpaceThreshold */ - @VisibleForTesting void setDiskSpaceThreshold(float diskSpaceThreshold, float diskUsageWarnThreshold) { - validateThreshold(diskSpaceThreshold, diskSpaceThreshold); + validateThreshold(diskSpaceThreshold, diskUsageWarnThreshold); this.diskUsageThreshold = diskSpaceThreshold; this.diskUsageWarnThreshold = diskUsageWarnThreshold; } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java index e2b3c47..35a9c02 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieInitializationTest.java @@ -26,18 +26,19 @@ import java.io.File; import java.io.IOException; import java.net.BindException; import java.net.InetAddress; -import org.junit.Assert; -import org.apache.bookkeeper.conf.TestBKConfiguration; +import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.client.BookKeeperAdmin; +import org.apache.bookkeeper.conf.TestBKConfiguration; import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.zookeeper.ZooKeeperClient; +import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.KeeperException; +import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -362,7 +363,10 @@ public class BookieInitializationTest extends BookKeeperClusterTestCase { // LedgerDirsManager#init() is used in Bookie instantiation. // Simulating disk errors by directly calling #init LedgerDirsManager ldm = new LedgerDirsManager(conf, conf.getLedgerDirs()); - ldm.init(); + LedgerDirsMonitor ledgerMonitor = new LedgerDirsMonitor(conf, + new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), ldm); + ledgerMonitor.init(); + fail("should throw exception"); } catch (Exception e) { // expected } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java index d97343c..4e36ba1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java @@ -22,6 +22,7 @@ package org.apache.bookkeeper.bookie; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.util.DiskChecker; import org.apache.bookkeeper.util.SnapshotMap; import org.apache.commons.io.FileUtils; import org.junit.After; @@ -45,6 +46,7 @@ public class IndexPersistenceMgrTest { ServerConfiguration conf; File journalDir, ledgerDir; LedgerDirsManager ledgerDirsManager; + LedgerDirsMonitor ledgerMonitor; @Before public void setUp() throws Exception { @@ -64,11 +66,15 @@ public class IndexPersistenceMgrTest { conf.setLedgerDirNames(new String[] { ledgerDir.getPath() }); ledgerDirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs()); + ledgerMonitor = new LedgerDirsMonitor(conf, + new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()), ledgerDirsManager); + ledgerMonitor.init(); } @After public void tearDown() throws Exception { - ledgerDirsManager.shutdown(); + //TODO: it is being shut down but never started. why? + ledgerMonitor.shutdown(); FileUtils.deleteDirectory(journalDir); FileUtils.deleteDirectory(ledgerDir); } http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/efd8ec26/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java ---------------------------------------------------------------------- diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java index b72cfc7..6b96ca1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestLedgerDirsManager.java @@ -28,7 +28,9 @@ import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener; import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException; @@ -47,9 +49,10 @@ import org.slf4j.LoggerFactory; public class TestLedgerDirsManager { private final static Logger LOG = LoggerFactory.getLogger(TestLedgerDirsManager.class); - ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + ServerConfiguration conf; File curDir; LedgerDirsManager dirsManager; + LedgerDirsMonitor ledgerMonitor; MockDiskChecker mockDiskChecker; int diskCheckInterval = 1000; float threshold = 0.5f; @@ -69,19 +72,22 @@ public class TestLedgerDirsManager { curDir = Bookie.getCurrentDirectory(tmpDir); Bookie.checkDirectoryStructure(curDir); - ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf = TestBKConfiguration.newServerConfiguration(); conf.setLedgerDirNames(new String[] { tmpDir.toString() }); + conf.setDiskLowWaterMarkUsageThreshold(conf.getDiskUsageThreshold()); conf.setDiskCheckInterval(diskCheckInterval); conf.setIsForceGCAllowWhenNoSpace(true); mockDiskChecker = new MockDiskChecker(threshold, warnThreshold); - dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE, mockDiskChecker); - dirsManager.init(); + dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE); + ledgerMonitor = new LedgerDirsMonitor(conf, + mockDiskChecker, dirsManager); + ledgerMonitor.init(); } @After public void tearDown() throws Exception { - dirsManager.shutdown(); + ledgerMonitor.shutdown(); for (File dir : tempDirs) { FileUtils.deleteDirectory(dir); } @@ -146,7 +152,7 @@ public class TestLedgerDirsManager { MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener(); dirsManager.addLedgerDirsListener(mockLedgerDirsListener); - dirsManager.start(); + ledgerMonitor.start(); assertFalse(mockLedgerDirsListener.readOnly); mockDiskChecker.setUsage(threshold + 0.05f); @@ -161,9 +167,166 @@ public class TestLedgerDirsManager { assertFalse(mockLedgerDirsListener.readOnly); } + @Test(timeout = 60000) + public void testLedgerDirsMonitorHandlingLowWaterMark() throws Exception { + + ledgerMonitor.shutdown(); + + final float warn = 0.90f; + final float nospace = 0.98f; + final float lwm = (warn + nospace) / 2; + final float lwm2warn = (warn + lwm) / 2; + final float lwm2nospace = (lwm + nospace) / 2; + final float nospaceExceeded = nospace + 0.005f; + + conf.setDiskUsageThreshold(nospace); + conf.setDiskLowWaterMarkUsageThreshold(lwm); + conf.setDiskUsageWarnThreshold(warn); + + mockDiskChecker = new MockDiskChecker(nospace, warnThreshold); + dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE); + ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager); + ledgerMonitor.init(); + final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener(); + dirsManager.addLedgerDirsListener(mockLedgerDirsListener); + ledgerMonitor.start(); + + Thread.sleep((diskCheckInterval * 2) + 100); + assertFalse(mockLedgerDirsListener.readOnly); + + // go above LWM but below threshold + // should still be writable + mockDiskChecker.setUsage(lwm2nospace); + Thread.sleep((diskCheckInterval * 2) + 100); + assertFalse(mockLedgerDirsListener.readOnly); + + // exceed the threshold, should go to readonly + mockDiskChecker.setUsage(nospaceExceeded); + Thread.sleep(diskCheckInterval + 100); + assertTrue(mockLedgerDirsListener.readOnly); + + // drop below threshold but above LWM + // should stay read-only + mockDiskChecker.setUsage(lwm2nospace); + Thread.sleep((diskCheckInterval * 2) + 100); + assertTrue(mockLedgerDirsListener.readOnly); + + // drop below LWM + // should become writable + mockDiskChecker.setUsage(lwm2warn); + Thread.sleep((diskCheckInterval * 2) + 100); + assertFalse(mockLedgerDirsListener.readOnly); + + // go above LWM but below threshold + // should still be writable + mockDiskChecker.setUsage(lwm2nospace); + Thread.sleep((diskCheckInterval * 2) + 100); + assertFalse(mockLedgerDirsListener.readOnly); + } + + @Test(timeout = 60000) + public void testLedgerDirsMonitorHandlingWithMultipleLedgerDirectories() throws Exception { + ledgerMonitor.shutdown(); + + final float nospace = 0.90f; + final float lwm = 0.80f; + final float warn = 0.99f; + HashMap usageMap; + + File tmpDir1 = createTempDir("bkTest", ".dir"); + File curDir1 = Bookie.getCurrentDirectory(tmpDir1); + Bookie.checkDirectoryStructure(curDir1); + + File tmpDir2 = createTempDir("bkTest", ".dir"); + File curDir2 = Bookie.getCurrentDirectory(tmpDir2); + Bookie.checkDirectoryStructure(curDir2); + + conf.setDiskUsageThreshold(nospace); + conf.setDiskLowWaterMarkUsageThreshold(lwm); + conf.setDiskUsageWarnThreshold(warn); + conf.setLedgerDirNames(new String[] { tmpDir1.toString(), tmpDir2.toString() }); + + mockDiskChecker = new MockDiskChecker(nospace, warnThreshold); + dirsManager = new LedgerDirsManager(conf, conf.getLedgerDirs(), NullStatsLogger.INSTANCE); + ledgerMonitor = new LedgerDirsMonitor(conf, mockDiskChecker, dirsManager); + usageMap = new HashMap(); + usageMap.put(curDir1, 0.1f); + usageMap.put(curDir2, 0.1f); + mockDiskChecker.setUsageMap(usageMap); + ledgerMonitor.init(); + final MockLedgerDirsListener mockLedgerDirsListener = new MockLedgerDirsListener(); + dirsManager.addLedgerDirsListener(mockLedgerDirsListener); + ledgerMonitor.start(); + + Thread.sleep((diskCheckInterval * 2) + 100); + assertFalse(mockLedgerDirsListener.readOnly); + + // go above LWM but below threshold + // should still be writable + setUsageAndThenVerify(curDir1, lwm + 0.05f, curDir2, lwm + 0.05f, mockDiskChecker, mockLedgerDirsListener, + false); + + // one dir usagespace above storagethreshold, another dir below storagethreshold + // should still be writable + setUsageAndThenVerify(curDir1, nospace + 0.02f, curDir2, nospace - 0.05f, mockDiskChecker, + mockLedgerDirsListener, false); + + // should remain readonly + setUsageAndThenVerify(curDir1, nospace + 0.05f, curDir2, nospace + 0.02f, mockDiskChecker, + mockLedgerDirsListener, true); + + // bring the disk usages to less than the threshold, + // but more than the LWM. + // should still be readonly + setUsageAndThenVerify(curDir1, nospace - 0.05f, curDir2, nospace - 0.05f, mockDiskChecker, + mockLedgerDirsListener, true); + + // bring one dir diskusage to less than lwm, + // the other dir to be more than lwm, but the + // overall diskusage to be more than lwm + // should still be readonly + setUsageAndThenVerify(curDir1, lwm - 0.03f, curDir2, lwm + 0.07f, mockDiskChecker, mockLedgerDirsListener, + true); + + // bring one dir diskusage to much less than lwm, + // the other dir to be more than storage threahold, but the + // overall diskusage is less than lwm + // should goto readwrite + setUsageAndThenVerify(curDir1, lwm - 0.17f, curDir2, nospace + 0.03f, mockDiskChecker, mockLedgerDirsListener, + false); + assertTrue("Only one LedgerDir should be writable", dirsManager.getWritableLedgerDirs().size() == 1); + + // bring both the dirs below lwm + // should still be readwrite + setUsageAndThenVerify(curDir1, lwm - 0.03f, curDir2, lwm - 0.02f, mockDiskChecker, mockLedgerDirsListener, + false); + assertTrue("Both the LedgerDirs should be writable", dirsManager.getWritableLedgerDirs().size() == 2); + + // bring both the dirs above lwm but < threshold + // should still be readwrite + setUsageAndThenVerify(curDir1, lwm + 0.02f, curDir2, lwm + 0.08f, mockDiskChecker, mockLedgerDirsListener, + false); + } + + private void setUsageAndThenVerify(File dir1, float dir1Usage, File dir2, float dir2Usage, + MockDiskChecker mockDiskChecker, MockLedgerDirsListener mockLedgerDirsListener, boolean verifyReadOnly) + throws InterruptedException { + HashMap usageMap = new HashMap(); + usageMap.put(dir1, dir1Usage); + usageMap.put(dir2, dir2Usage); + mockDiskChecker.setUsageMap(usageMap); + Thread.sleep((diskCheckInterval * 2) + 100); + if (verifyReadOnly) { + assertTrue(mockLedgerDirsListener.readOnly); + } else { + assertFalse(mockLedgerDirsListener.readOnly); + } + } + private class MockDiskChecker extends DiskChecker { - private float used; + private volatile float used; + private volatile Map usageMap = null; public MockDiskChecker(float threshold, float warnThreshold) { super(threshold, warnThreshold); @@ -172,23 +335,48 @@ public class TestLedgerDirsManager { @Override public float checkDir(File dir) throws DiskErrorException, DiskOutOfSpaceException, DiskWarnThresholdException { - if (used > getDiskUsageThreshold()) { - throw new DiskOutOfSpaceException("", used); + float dirUsage = getDirUsage(dir); + + if (dirUsage > getDiskUsageThreshold()) { + throw new DiskOutOfSpaceException("", dirUsage); + } + if (dirUsage > getDiskUsageWarnThreshold()) { + throw new DiskWarnThresholdException("", dirUsage); } - if (used > getDiskUsageWarnThreshold()) { - throw new DiskWarnThresholdException("", used); + return dirUsage; + } + + @Override + public float getTotalDiskUsage(List dirs) { + float accumulatedDiskUsage = 0f; + for (File dir : dirs) { + accumulatedDiskUsage += getDirUsage(dir); } - return used; + return (accumulatedDiskUsage / dirs.size()); + } + + public float getDirUsage(File dir) { + float dirUsage; + if ((usageMap == null) || (!usageMap.containsKey(dir))) { + dirUsage = used; + } else { + dirUsage = usageMap.get(dir); + } + return dirUsage; } public void setUsage(float usage) { this.used = usage; } + + public void setUsageMap(Map usageMap) { + this.usageMap = usageMap; + } } private class MockLedgerDirsListener implements LedgerDirsListener { - public boolean readOnly; + public volatile boolean readOnly; public MockLedgerDirsListener() { reset();