Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8A046200BFE for ; Mon, 2 Jan 2017 01:44:42 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 888EE160B38; Mon, 2 Jan 2017 00:44:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 38CE9160B47 for ; Mon, 2 Jan 2017 01:44:40 +0100 (CET) Received: (qmail 6367 invoked by uid 500); 2 Jan 2017 00:44:21 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 4418 invoked by uid 99); 2 Jan 2017 00:44:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 02 Jan 2017 00:44:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9AA8BDFD9D; Mon, 2 Jan 2017 00:44:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Mon, 02 Jan 2017 00:45:00 -0000 Message-Id: <005a882ebd9b4e4b80f1b8562f91946f@git.apache.org> In-Reply-To: <832006ff43594532991993d77d7c7ffc@git.apache.org> References: <832006ff43594532991993d77d7c7ffc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [42/50] [abbrv] hadoop git commit: HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao. archived-at: Mon, 02 Jan 2017 00:44:42 -0000 HDFS-11274. Datanode should only check the failed volume upon IO errors. Contributed by Xiaoyu Yao. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/603f3ef1 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/603f3ef1 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/603f3ef1 Branch: refs/heads/YARN-5085 Commit: 603f3ef1386048111940b66f3a0750ab84d0588f Parents: ce3613c Author: Xiaoyu Yao Authored: Wed Dec 28 22:08:13 2016 -0800 Committer: Arpit Agarwal Committed: Wed Dec 28 22:08:13 2016 -0800 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockReceiver.java | 12 +- .../server/datanode/CountingFileIoEvents.java | 3 +- .../hadoop/hdfs/server/datanode/DataNode.java | 91 ++++++++----- .../server/datanode/DefaultFileIoEvents.java | 2 +- .../hdfs/server/datanode/FileIoEvents.java | 36 ++++-- .../hdfs/server/datanode/FileIoProvider.java | 89 +++++++------ .../server/datanode/ProfilingFileIoEvents.java | 2 +- .../hdfs/server/datanode/ReplicaInfo.java | 2 +- .../server/datanode/checker/AsyncChecker.java | 5 +- .../datanode/checker/DatasetVolumeChecker.java | 71 ++++++---- .../checker/StorageLocationChecker.java | 8 +- .../datanode/checker/ThrottledAsyncChecker.java | 19 ++- .../datanode/fsdataset/impl/BlockPoolSlice.java | 5 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 2 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 3 +- .../fsdataset/impl/FsVolumeImplBuilder.java | 4 +- .../server/datanode/SimulatedFSDataset.java | 2 +- .../datanode/TestDataNodeHotSwapVolumes.java | 2 +- .../TestDataNodeVolumeFailureReporting.java | 15 ++- .../checker/TestDatasetVolumeChecker.java | 49 ++++--- .../TestDatasetVolumeCheckerFailures.java | 23 ---- .../checker/TestThrottledAsyncChecker.java | 128 +++++++++++-------- 22 files changed, 338 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index b3aee11..567597d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -278,10 +278,9 @@ class BlockReceiver implements Closeable { IOException cause = DatanodeUtil.getCauseIfDiskError(ioe); DataNode.LOG.warn("IOException in BlockReceiver constructor" + (cause == null ? "" : ". Cause is "), cause); - - if (cause != null) { // possible disk error + if (cause != null) { ioe = cause; - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider } throw ioe; @@ -363,9 +362,8 @@ class BlockReceiver implements Closeable { if (measuredFlushTime) { datanode.metrics.addFlushNanos(flushTotalNanos); } - // disk check if(ioe != null) { - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider throw ioe; } } @@ -792,7 +790,7 @@ class BlockReceiver implements Closeable { manageWriterOsCache(offsetInBlock); } } catch (IOException iex) { - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider throw iex; } } @@ -1430,7 +1428,7 @@ class BlockReceiver implements Closeable { } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { - datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java index a70c151..7c6bfd6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/CountingFileIoEvents.java @@ -37,7 +37,7 @@ import java.util.concurrent.atomic.AtomicLong; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class CountingFileIoEvents implements FileIoEvents { +public class CountingFileIoEvents extends FileIoEvents { private final Map counts; private static class Counts { @@ -90,7 +90,6 @@ public class CountingFileIoEvents implements FileIoEvents { public void onFailure( @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { counts.get(op).failures.incrementAndGet(); - } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 4436e58..e893c5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -372,6 +372,7 @@ public class DataNode extends ReconfigurableBase SaslDataTransferClient saslClient; SaslDataTransferServer saslServer; private ObjectName dataNodeInfoBeanName; + // Test verification only private volatile long lastDiskErrorCheck; private String supergroup; private boolean isPermissionEnabled; @@ -412,7 +413,7 @@ public class DataNode extends ReconfigurableBase this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); - this.fileIoProvider = new FileIoProvider(conf); + this.fileIoProvider = new FileIoProvider(conf, this); this.fileDescriptorPassingDisabledReason = null; this.maxNumberOfBlocksToLog = 0; this.confVersion = null; @@ -438,7 +439,7 @@ public class DataNode extends ReconfigurableBase this.tracer = createTracer(conf); this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); - this.fileIoProvider = new FileIoProvider(conf); + this.fileIoProvider = new FileIoProvider(conf, this); this.blockScanner = new BlockScanner(this); this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, @@ -786,7 +787,7 @@ public class DataNode extends ReconfigurableBase /** * Remove volumes from DataNode. - * See {@link #removeVolumes(Set, boolean)} for details. + * See {@link #removeVolumes(Collection, boolean)} for details. * * @param locations the StorageLocations of the volumes to be removed. * @throws IOException @@ -809,7 +810,7 @@ public class DataNode extends ReconfigurableBase *
    Reset configuration DATA_DIR and {@link #dataDirs} to represent * active volumes.
* - * @param absoluteVolumePaths the absolute path of volumes. + * @param storageLocations the absolute path of volumes. * @param clearFailure if true, clears the failure information related to the * volumes. * @throws IOException @@ -1293,7 +1294,7 @@ public class DataNode extends ReconfigurableBase * If conf's CONFIG_PROPERTY_SIMULATED property is set * then a simulated storage based data node is created. * - * @param dataDirs - only for a non-simulated storage data node + * @param dataDirectories - only for a non-simulated storage data node * @throws IOException */ void startDataNode(List dataDirectories, @@ -2045,14 +2046,33 @@ public class DataNode extends ReconfigurableBase } tracer.close(); } - - + /** - * Check if there is a disk failure asynchronously and if so, handle the error + * Check if there is a disk failure asynchronously + * and if so, handle the error. */ + @VisibleForTesting public void checkDiskErrorAsync() { volumeChecker.checkAllVolumesAsync( data, (healthyVolumes, failedVolumes) -> { + if (failedVolumes.size() > 0) { + LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}", + failedVolumes.size(), failedVolumes); + } else { + LOG.debug("checkDiskErrorAsync: no volume failures detected"); + } + lastDiskErrorCheck = Time.monotonicNow(); + handleVolumeFailures(failedVolumes); + }); + } + + /** + * Check if there is a disk failure asynchronously + * and if so, handle the error. + */ + public void checkDiskErrorAsync(FsVolumeSpi volume) { + volumeChecker.checkVolume( + volume, (healthyVolumes, failedVolumes) -> { if (failedVolumes.size() > 0) { LOG.warn("checkDiskErrorAsync callback got {} failed volumes: {}", failedVolumes.size(), failedVolumes); @@ -2064,9 +2084,10 @@ public class DataNode extends ReconfigurableBase }); } - private void handleDiskError(String errMsgr) { + private void handleDiskError(String failedVolumes) { final boolean hasEnoughResources = data.hasEnoughResource(); - LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResources); + LOG.warn("DataNode.handleDiskError on : [" + failedVolumes + + "] Keep Running: " + hasEnoughResources); // If we have enough active valid volumes then we do not want to // shutdown the DN completely. @@ -2076,7 +2097,7 @@ public class DataNode extends ReconfigurableBase //inform NameNodes for(BPOfferService bpos: blockPoolManager.getAllNamenodeThreads()) { - bpos.trySendErrorReport(dpError, errMsgr); + bpos.trySendErrorReport(dpError, failedVolumes); } if(hasEnoughResources) { @@ -2084,7 +2105,8 @@ public class DataNode extends ReconfigurableBase return; // do not shutdown } - LOG.warn("DataNode is shutting down: " + errMsgr); + LOG.warn("DataNode is shutting down due to failed volumes: [" + + failedVolumes + "]"); shouldRun = false; } @@ -2447,8 +2469,11 @@ public class DataNode extends ReconfigurableBase } LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0] + " got ", ie); - // check if there are any disk problem - checkDiskErrorAsync(); + // disk check moved to FileIoProvider + IOException cause = DatanodeUtil.getCauseIfDiskError(ie); + if (cause != null) { // possible disk error + LOG.warn("IOException in DataTransfer#run(). Cause is ", cause); + } } finally { decrementXmitsInProgress(); IOUtils.closeStream(blockSender); @@ -3234,29 +3259,37 @@ public class DataNode extends ReconfigurableBase } private void handleVolumeFailures(Set unhealthyVolumes) { + if (unhealthyVolumes.isEmpty()) { + LOG.debug("handleVolumeFailures done with empty " + + "unhealthyVolumes"); + return; + } + data.handleVolumeFailures(unhealthyVolumes); Set unhealthyLocations = new HashSet<>( unhealthyVolumes.size()); - if (!unhealthyVolumes.isEmpty()) { - StringBuilder sb = new StringBuilder("DataNode failed volumes:"); - for (FsVolumeSpi vol : unhealthyVolumes) { - unhealthyLocations.add(vol.getStorageLocation()); - sb.append(vol.getStorageLocation()).append(";"); - } + StringBuilder sb = new StringBuilder("DataNode failed volumes:"); + for (FsVolumeSpi vol : unhealthyVolumes) { + unhealthyLocations.add(vol.getStorageLocation()); + sb.append(vol.getStorageLocation()).append(";"); + } - try { - // Remove all unhealthy volumes from DataNode. - removeVolumes(unhealthyLocations, false); - } catch (IOException e) { - LOG.warn("Error occurred when removing unhealthy storage dirs: " - + e.getMessage(), e); - } - LOG.info(sb.toString()); - handleDiskError(sb.toString()); + try { + // Remove all unhealthy volumes from DataNode. + removeVolumes(unhealthyLocations, false); + } catch (IOException e) { + LOG.warn("Error occurred when removing unhealthy storage dirs: " + + e.getMessage(), e); } + if (LOG.isDebugEnabled()) { + LOG.debug(sb.toString()); + } + // send blockreport regarding volume failure + handleDiskError(sb.toString()); } + @VisibleForTesting public long getLastDiskErrorCheck() { return lastDiskErrorCheck; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java index bd4932b..6a12aae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DefaultFileIoEvents.java @@ -31,7 +31,7 @@ import javax.annotation.Nullable; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public final class DefaultFileIoEvents implements FileIoEvents { +public final class DefaultFileIoEvents extends FileIoEvents { @Override public long beforeMetadataOp( @Nullable FsVolumeSpi volume, OPERATION op) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java index 48e703f..10f2a0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoEvents.java @@ -32,7 +32,7 @@ import javax.annotation.Nullable; */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface FileIoEvents { +public abstract class FileIoEvents { /** * Invoked before a filesystem metadata operation. @@ -42,7 +42,7 @@ public interface FileIoEvents { * @return timestamp at which the operation was started. 0 if * unavailable. */ - long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op); + abstract long beforeMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op); /** * Invoked after a filesystem metadata operation has completed. @@ -52,7 +52,8 @@ public interface FileIoEvents { * @param begin timestamp at which the operation was started. 0 * if unavailable. */ - void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, long begin); + abstract void afterMetadataOp(@Nullable FsVolumeSpi volume, OPERATION op, + long begin); /** * Invoked before a read/write/flush/channel transfer operation. @@ -63,7 +64,8 @@ public interface FileIoEvents { * @return timestamp at which the operation was started. 0 if * unavailable. */ - long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, long len); + abstract long beforeFileIo(@Nullable FsVolumeSpi volume, OPERATION op, + long len); /** @@ -76,22 +78,38 @@ public interface FileIoEvents { * @return timestamp at which the operation was started. 0 if * unavailable. */ - void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op, - long begin, long len); + abstract void afterFileIo(@Nullable FsVolumeSpi volume, OPERATION op, + long begin, long len); /** * Invoked if an operation fails with an exception. - * @param volume target volume for the operation. Null if unavailable. + * @param volume target volume for the operation. Null if unavailable. * @param op type of operation. * @param e Exception encountered during the operation. * @param begin time at which the operation was started. */ - void onFailure( + abstract void onFailure( @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin); /** + * Invoked by FileIoProvider if an operation fails with an exception. + * @param datanode datanode that runs volume check upon volume io failure + * @param volume target volume for the operation. Null if unavailable. + * @param op type of operation. + * @param e Exception encountered during the operation. + * @param begin time at which the operation was started. + */ + void onFailure(DataNode datanode, + @Nullable FsVolumeSpi volume, OPERATION op, Exception e, long begin) { + onFailure(volume, op, e, begin); + if (datanode != null && volume != null) { + datanode.checkDiskErrorAsync(volume); + } + } + + /** * Return statistics as a JSON string. * @return */ - @Nullable String getStatistics(); + @Nullable abstract String getStatistics(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java index 2344114..f961049 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FileIoProvider.java @@ -79,12 +79,16 @@ public class FileIoProvider { FileIoProvider.class); private final FileIoEvents eventHooks; + private final DataNode datanode; /** * @param conf Configuration object. May be null. When null, * the event handlers are no-ops. + * @param datanode datanode that owns this FileIoProvider. Used for + * IO error based volume checker callback */ - public FileIoProvider(@Nullable Configuration conf) { + public FileIoProvider(@Nullable Configuration conf, + final DataNode datanode) { if (conf != null) { final Class clazz = conf.getClass( DFSConfigKeys.DFS_DATANODE_FILE_IO_EVENTS_CLASS_KEY, @@ -94,6 +98,7 @@ public class FileIoProvider { } else { eventHooks = new DefaultFileIoEvents(); } + this.datanode = datanode; } /** @@ -139,7 +144,7 @@ public class FileIoProvider { f.flush(); eventHooks.afterFileIo(volume, FLUSH, begin, 0); } catch (Exception e) { - eventHooks.onFailure(volume, FLUSH, e, begin); + eventHooks.onFailure(datanode, volume, FLUSH, e, begin); throw e; } } @@ -157,7 +162,7 @@ public class FileIoProvider { fos.getChannel().force(true); eventHooks.afterFileIo(volume, SYNC, begin, 0); } catch (Exception e) { - eventHooks.onFailure(volume, SYNC, e, begin); + eventHooks.onFailure(datanode, volume, SYNC, e, begin); throw e; } } @@ -176,7 +181,7 @@ public class FileIoProvider { NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, numBytes, flags); eventHooks.afterFileIo(volume, SYNC, begin, 0); } catch (Exception e) { - eventHooks.onFailure(volume, SYNC, e, begin); + eventHooks.onFailure(datanode, volume, SYNC, e, begin); throw e; } } @@ -196,7 +201,7 @@ public class FileIoProvider { identifier, outFd, offset, length, flags); eventHooks.afterMetadataOp(volume, FADVISE, begin); } catch (Exception e) { - eventHooks.onFailure(volume, FADVISE, e, begin); + eventHooks.onFailure(datanode, volume, FADVISE, e, begin); throw e; } } @@ -214,7 +219,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, DELETE, begin); return deleted; } catch (Exception e) { - eventHooks.onFailure(volume, DELETE, e, begin); + eventHooks.onFailure(datanode, volume, DELETE, e, begin); throw e; } } @@ -236,7 +241,7 @@ public class FileIoProvider { } return deleted; } catch (Exception e) { - eventHooks.onFailure(volume, DELETE, e, begin); + eventHooks.onFailure(datanode, volume, DELETE, e, begin); throw e; } } @@ -264,7 +269,7 @@ public class FileIoProvider { waitTime, transferTime); eventHooks.afterFileIo(volume, TRANSFER, begin, count); } catch (Exception e) { - eventHooks.onFailure(volume, TRANSFER, e, begin); + eventHooks.onFailure(datanode, volume, TRANSFER, e, begin); throw e; } } @@ -285,7 +290,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, OPEN, begin); return created; } catch (Exception e) { - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -312,7 +317,7 @@ public class FileIoProvider { return fis; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fis); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -328,7 +333,7 @@ public class FileIoProvider { * @param f File object. * @param append if true, then bytes will be written to the end of the * file rather than the beginning. - * @param FileOutputStream to the given file object. + * @return FileOutputStream to the given file object. * @throws FileNotFoundException */ public FileOutputStream getFileOutputStream( @@ -342,7 +347,7 @@ public class FileIoProvider { return fos; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fos); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -372,7 +377,7 @@ public class FileIoProvider { * before delegating to the wrapped stream. * * @param volume target volume. null if unavailable. - * @param f File object. + * @param fd File descriptor object. * @return FileOutputStream to the given file object. * @throws FileNotFoundException */ @@ -407,7 +412,7 @@ public class FileIoProvider { return fis; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fis); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -438,7 +443,7 @@ public class FileIoProvider { return fis; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(fis); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -468,7 +473,7 @@ public class FileIoProvider { return raf; } catch(Exception e) { org.apache.commons.io.IOUtils.closeQuietly(raf); - eventHooks.onFailure(volume, OPEN, e, begin); + eventHooks.onFailure(datanode, volume, OPEN, e, begin); throw e; } } @@ -487,7 +492,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, DELETE, begin); return deleted; } catch(Exception e) { - eventHooks.onFailure(volume, DELETE, e, begin); + eventHooks.onFailure(datanode, volume, DELETE, e, begin); throw e; } } @@ -508,7 +513,7 @@ public class FileIoProvider { FileUtil.replaceFile(src, target); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -530,7 +535,7 @@ public class FileIoProvider { Storage.rename(src, target); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -552,7 +557,7 @@ public class FileIoProvider { FileUtils.moveFile(src, target); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -576,7 +581,7 @@ public class FileIoProvider { Files.move(src, target, options); eventHooks.afterMetadataOp(volume, MOVE, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MOVE, e, begin); + eventHooks.onFailure(datanode, volume, MOVE, e, begin); throw e; } } @@ -600,7 +605,7 @@ public class FileIoProvider { Storage.nativeCopyFileUnbuffered(src, target, preserveFileDate); eventHooks.afterFileIo(volume, NATIVE_COPY, begin, length); } catch(Exception e) { - eventHooks.onFailure(volume, NATIVE_COPY, e, begin); + eventHooks.onFailure(datanode, volume, NATIVE_COPY, e, begin); throw e; } } @@ -625,7 +630,7 @@ public class FileIoProvider { isDirectory = !created && dir.isDirectory(); eventHooks.afterMetadataOp(volume, MKDIRS, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MKDIRS, e, begin); + eventHooks.onFailure(datanode, volume, MKDIRS, e, begin); throw e; } @@ -651,7 +656,7 @@ public class FileIoProvider { succeeded = dir.isDirectory() || dir.mkdirs(); eventHooks.afterMetadataOp(volume, MKDIRS, begin); } catch(Exception e) { - eventHooks.onFailure(volume, MKDIRS, e, begin); + eventHooks.onFailure(datanode, volume, MKDIRS, e, begin); throw e; } @@ -677,7 +682,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return children; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -687,7 +692,7 @@ public class FileIoProvider { * {@link FileUtil#listFiles(File)}. * * @param volume target volume. null if unavailable. - * @param Driectory to be listed. + * @param dir directory to be listed. * @return array of strings representing the directory entries. * @throws IOException */ @@ -699,7 +704,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return children; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -722,7 +727,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return children; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -744,7 +749,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, LIST, begin); return count; } catch(Exception e) { - eventHooks.onFailure(volume, LIST, e, begin); + eventHooks.onFailure(datanode, volume, LIST, e, begin); throw e; } } @@ -763,7 +768,7 @@ public class FileIoProvider { eventHooks.afterMetadataOp(volume, EXISTS, begin); return exists; } catch(Exception e) { - eventHooks.onFailure(volume, EXISTS, e, begin); + eventHooks.onFailure(datanode, volume, EXISTS, e, begin); throw e; } } @@ -804,7 +809,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, 1); return b; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -820,7 +825,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -836,7 +841,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -878,7 +883,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, 1); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -893,7 +898,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, b.length); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -908,7 +913,7 @@ public class FileIoProvider { super.write(b, off, len); eventHooks.afterFileIo(volume, WRITE, begin, len); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -936,7 +941,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, 1); return b; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -949,7 +954,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -962,7 +967,7 @@ public class FileIoProvider { eventHooks.afterFileIo(volume, READ, begin, numBytesRead); return numBytesRead; } catch(Exception e) { - eventHooks.onFailure(volume, READ, e, begin); + eventHooks.onFailure(datanode, volume, READ, e, begin); throw e; } } @@ -974,7 +979,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, 1); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -986,7 +991,7 @@ public class FileIoProvider { super.write(b); eventHooks.afterFileIo(volume, WRITE, begin, b.length); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } @@ -998,7 +1003,7 @@ public class FileIoProvider { super.write(b, off, len); eventHooks.afterFileIo(volume, WRITE, begin, len); } catch(Exception e) { - eventHooks.onFailure(volume, WRITE, e, begin); + eventHooks.onFailure(datanode, volume, WRITE, e, begin); throw e; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java index 5835fe8..affd093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProfilingFileIoEvents.java @@ -30,7 +30,7 @@ import javax.annotation.Nullable; * related operations on datanode volumes. */ @InterfaceAudience.Private -class ProfilingFileIoEvents implements FileIoEvents { +class ProfilingFileIoEvents extends FileIoEvents { @Override public long beforeMetadataOp(@Nullable FsVolumeSpi volume, http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index d3006c8..65e9ba7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -47,7 +47,7 @@ abstract public class ReplicaInfo extends Block /** This is used by some tests and FsDatasetUtil#computeChecksum. */ private static final FileIoProvider DEFAULT_FILE_IO_PROVIDER = - new FileIoProvider(null); + new FileIoProvider(null, null); /** * Constructor http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java index 1d534a3..997c0cb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/AsyncChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -43,10 +44,10 @@ public interface AsyncChecker { * @param context the interpretation of the context depends on the * target. * - * @return returns a {@link ListenableFuture} that can be used to + * @return returns a {@link Optional of ListenableFuture} that can be used to * retrieve the result of the asynchronous check. */ - ListenableFuture schedule(Checkable target, K context); + Optional> schedule(Checkable target, K context); /** * Cancel all executing checks and wait for them to complete. http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java index ba09d23..cab6122 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/DatasetVolumeChecker.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; @@ -191,18 +192,26 @@ public class DatasetVolumeChecker { for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); - allVolumes.add(reference.getVolume()); - ListenableFuture future = + Optional> olf = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); LOG.info("Scheduled health check for volume {}", reference.getVolume()); - Futures.addCallback(future, new ResultHandler( - reference, healthyVolumes, failedVolumes, numVolumes, new Callback() { - @Override - public void call(Set ignored1, - Set ignored2) { + if (olf.isPresent()) { + allVolumes.add(reference.getVolume()); + Futures.addCallback(olf.get(), + new ResultHandler(reference, healthyVolumes, failedVolumes, + numVolumes, new Callback() { + @Override + public void call(Set ignored1, + Set ignored2) { + latch.countDown(); + } + })); + } else { + IOUtils.cleanup(null, reference); + if (numVolumes.decrementAndGet() == 0) { latch.countDown(); } - })); + } } // Wait until our timeout elapses, after which we give up on @@ -263,18 +272,26 @@ public class DatasetVolumeChecker { final Set healthyVolumes = new HashSet<>(); final Set failedVolumes = new HashSet<>(); final AtomicLong numVolumes = new AtomicLong(references.size()); + boolean added = false; LOG.info("Checking {} volumes", references.size()); for (int i = 0; i < references.size(); ++i) { final FsVolumeReference reference = references.getReference(i); // The context parameter is currently ignored. - ListenableFuture future = + Optional> olf = delegateChecker.schedule(reference.getVolume(), IGNORED_CONTEXT); - Futures.addCallback(future, new ResultHandler( - reference, healthyVolumes, failedVolumes, numVolumes, callback)); + if (olf.isPresent()) { + added = true; + Futures.addCallback(olf.get(), + new ResultHandler(reference, healthyVolumes, failedVolumes, + numVolumes, callback)); + } else { + IOUtils.cleanup(null, reference); + numVolumes.decrementAndGet(); + } } numAsyncDatasetChecks.incrementAndGet(); - return true; + return added; } /** @@ -291,7 +308,7 @@ public class DatasetVolumeChecker { } /** - * Check a single volume, returning a {@link ListenableFuture} + * Check a single volume asynchronously, returning a {@link ListenableFuture} * that can be used to retrieve the final result. * * If the volume cannot be referenced then it is already closed and @@ -305,21 +322,31 @@ public class DatasetVolumeChecker { public boolean checkVolume( final FsVolumeSpi volume, Callback callback) { + if (volume == null) { + LOG.debug("Cannot schedule check on null volume"); + return false; + } + FsVolumeReference volumeReference; try { volumeReference = volume.obtainReference(); } catch (ClosedChannelException e) { // The volume has already been closed. - callback.call(new HashSet<>(), new HashSet<>()); return false; } - ListenableFuture future = + + Optional> olf = delegateChecker.schedule(volume, IGNORED_CONTEXT); - numVolumeChecks.incrementAndGet(); - Futures.addCallback(future, new ResultHandler( - volumeReference, new HashSet<>(), new HashSet<>(), - new AtomicLong(1), callback)); - return true; + if (olf.isPresent()) { + numVolumeChecks.incrementAndGet(); + Futures.addCallback(olf.get(), + new ResultHandler(volumeReference, new HashSet<>(), new HashSet<>(), + new AtomicLong(1), callback)); + return true; + } else { + IOUtils.cleanup(null, volumeReference); + } + return false; } /** @@ -343,8 +370,8 @@ public class DatasetVolumeChecker { * successful, add the volume here. * @param failedVolumes set of failed volumes. If the disk check fails, * add the volume here. - * @param semaphore semaphore used to trigger callback invocation. - * @param callback invoked when the semaphore can be successfully acquired. + * @param volumeCounter volumeCounter used to trigger callback invocation. + * @param callback invoked when the volumeCounter reaches 0. */ ResultHandler(FsVolumeReference reference, Set healthyVolumes, http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java index 7337ad0..6e323e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/StorageLocationChecker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; import static org.apache.hadoop.hdfs.DFSConfigKeys.*; +import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -158,8 +159,11 @@ public class StorageLocationChecker { // Start parallel disk check operations on all StorageLocations. for (StorageLocation location : dataDirs) { goodLocations.put(location, true); - futures.put(location, - delegateChecker.schedule(location, context)); + Optional> olf = + delegateChecker.schedule(location, context); + if (olf.isPresent()) { + futures.put(location, olf.get()); + } } if (maxVolumeFailuresTolerated >= dataDirs.size()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java index d0ee3d2..83c554d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -101,13 +102,11 @@ public class ThrottledAsyncChecker implements AsyncChecker { * will receive the same Future. */ @Override - public synchronized ListenableFuture schedule( - final Checkable target, - final K context) { - LOG.debug("Scheduling a check of {}", target); - + public Optional> schedule(Checkable target, + K context) { + LOG.info("Scheduling a check for {}", target); if (checksInProgress.containsKey(target)) { - return checksInProgress.get(target); + return Optional.absent(); } if (completedChecks.containsKey(target)) { @@ -115,11 +114,9 @@ public class ThrottledAsyncChecker implements AsyncChecker { final long msSinceLastCheck = timer.monotonicNow() - result.completedAt; if (msSinceLastCheck < minMsBetweenChecks) { LOG.debug("Skipped checking {}. Time since last check {}ms " + - "is less than the min gap {}ms.", + "is less than the min gap {}ms.", target, msSinceLastCheck, minMsBetweenChecks); - return result.result != null ? - Futures.immediateFuture(result.result) : - Futures.immediateFailedFuture(result.exception); + return Optional.absent(); } } @@ -132,7 +129,7 @@ public class ThrottledAsyncChecker implements AsyncChecker { }); checksInProgress.put(target, lf); addResultCachingCallback(target, lf); - return lf; + return Optional.of(lf); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 8273ebb..c8df300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -271,7 +271,10 @@ class BlockPoolSlice { new FileOutputStream(outFile), "UTF-8")) { // mtime is written last, so that truncated writes won't be valid. out.write(Long.toString(used) + " " + Long.toString(timer.now())); - fileIoProvider.flush(volume, out); + // This is only called as part of the volume shutdown. + // We explicitly avoid calling flush with fileIoProvider which triggers + // volume check upon io exception to avoid cyclic volume checks. + out.flush(); } } catch (IOException ioe) { // If write failed, the volume might be bad. Since the cache file is http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 0d5a12c..d1f8f05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -1828,7 +1828,7 @@ class FsDatasetImpl implements FsDatasetSpi { return r; } // if file is not null, but doesn't exist - possibly disk failed - datanode.checkDiskErrorAsync(); + datanode.checkDiskErrorAsync(r.getVolume()); } if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 753c083..042ef6e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -1321,8 +1321,7 @@ public class FsVolumeImpl implements FsVolumeSpi { this, dir, BlockDirFilter.INSTANCE); } catch (IOException ioe) { LOG.warn("Exception occured while compiling report: ", ioe); - // Initiate a check on disk failure. - dataset.datanode.checkDiskErrorAsync(); + // Volume error check moved to FileIoProvider. // Ignore this directory and proceed. return report; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java index 5371eda..427f81b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImplBuilder.java @@ -69,7 +69,7 @@ public class FsVolumeImplBuilder { FsVolumeImpl build() throws IOException { return new FsVolumeImpl( dataset, storageID, sd, - fileIoProvider != null ? fileIoProvider : new FileIoProvider(null), - conf); + fileIoProvider != null ? fileIoProvider : + new FileIoProvider(null, null), conf); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 8472eca..cd3befd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -624,7 +624,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { } registerMBean(datanodeUuid); - this.fileIoProvider = new FileIoProvider(conf); + this.fileIoProvider = new FileIoProvider(conf, datanode); this.storage = new SimulatedStorage( conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java index e31e783..96d1a28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java @@ -807,7 +807,7 @@ public class TestDataNodeHotSwapVolumes { DataNodeTestUtils.injectDataDirFailure(dirToFail); // Call and wait DataNode to detect disk failure. long lastDiskErrorCheck = dn.getLastDiskErrorCheck(); - dn.checkDiskErrorAsync(); + dn.checkDiskErrorAsync(failedVolume); while (dn.getLastDiskErrorCheck() == lastDiskErrorCheck) { Thread.sleep(100); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java index 3d37b10..3015e61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureReporting.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; import static org.junit.Assert.assertArrayEquals; @@ -319,6 +319,12 @@ public class TestDataNodeVolumeFailureReporting { DFSTestUtil.createFile(fs, file1, 1024, (short)3, 1L); DFSTestUtil.waitReplication(fs, file1, (short)3); + // Create additional file to trigger failure based volume check on dn1Vol2 + // and dn2Vol2. + Path file2 = new Path("/test2"); + DFSTestUtil.createFile(fs, file2, 1024, (short)3, 1L); + DFSTestUtil.waitReplication(fs, file2, (short)3); + ArrayList dns = cluster.getDataNodes(); assertTrue("DN1 should be up", dns.get(0).isDatanodeUp()); assertTrue("DN2 should be up", dns.get(1).isDatanodeUp()); @@ -538,8 +544,6 @@ public class TestDataNodeVolumeFailureReporting { private void checkFailuresAtDataNode(DataNode dn, long expectedVolumeFailuresCounter, boolean expectCapacityKnown, String... expectedFailedVolumes) throws Exception { - assertCounter("VolumeFailures", expectedVolumeFailuresCounter, - getMetrics(dn.getMetrics().name())); FsDatasetSpi fsd = dn.getFSDataset(); StringBuilder strBuilder = new StringBuilder(); strBuilder.append("expectedFailedVolumes is "); @@ -551,6 +555,11 @@ public class TestDataNodeVolumeFailureReporting { strBuilder.append(expected + ","); } LOG.info(strBuilder.toString()); + final long actualVolumeFailures = + getLongCounter("VolumeFailures", getMetrics(dn.getMetrics().name())); + assertTrue("Actual async detected volume failures should be greater or " + + "equal than " + expectedFailedVolumes, + actualVolumeFailures >= expectedVolumeFailuresCounter); assertEquals(expectedFailedVolumes.length, fsd.getNumFailedVolumes()); assertArrayEquals(expectedFailedVolumes, convertToAbsolutePaths(fsd.getFailedStorageLocations())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java index 50096ba..f5bb807 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeChecker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -104,24 +105,28 @@ public class TestDatasetVolumeChecker { /** * Request a check and ensure it triggered {@link FsVolumeSpi#check}. */ - checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { - @Override - public void call(Set healthyVolumes, - Set failedVolumes) { - numCallbackInvocations.incrementAndGet(); - if (expectedVolumeHealth != null && expectedVolumeHealth != FAILED) { - assertThat(healthyVolumes.size(), is(1)); - assertThat(failedVolumes.size(), is(0)); - } else { - assertThat(healthyVolumes.size(), is(0)); - assertThat(failedVolumes.size(), is(1)); - } - } - }); + boolean result = + checker.checkVolume(volume, new DatasetVolumeChecker.Callback() { + @Override + public void call(Set healthyVolumes, + Set failedVolumes) { + numCallbackInvocations.incrementAndGet(); + if (expectedVolumeHealth != null && + expectedVolumeHealth != FAILED) { + assertThat(healthyVolumes.size(), is(1)); + assertThat(failedVolumes.size(), is(0)); + } else { + assertThat(healthyVolumes.size(), is(0)); + assertThat(failedVolumes.size(), is(1)); + } + } + }); // Ensure that the check was invoked at least once. verify(volume, times(1)).check(anyObject()); - assertThat(numCallbackInvocations.get(), is(1L)); + if (result) { + assertThat(numCallbackInvocations.get(), is(1L)); + } } /** @@ -173,7 +178,7 @@ public class TestDatasetVolumeChecker { checker.setDelegateChecker(new DummyChecker()); final AtomicLong numCallbackInvocations = new AtomicLong(0); - checker.checkAllVolumesAsync( + boolean result = checker.checkAllVolumesAsync( dataset, new DatasetVolumeChecker.Callback() { @Override public void call( @@ -193,7 +198,9 @@ public class TestDatasetVolumeChecker { }); // The callback should be invoked exactly once. - assertThat(numCallbackInvocations.get(), is(1L)); + if (result) { + assertThat(numCallbackInvocations.get(), is(1L)); + } // Ensure each volume's check() method was called exactly once. for (FsVolumeSpi volume : volumes) { @@ -207,15 +214,17 @@ public class TestDatasetVolumeChecker { */ static class DummyChecker implements AsyncChecker { + @Override - public ListenableFuture schedule( + public Optional> schedule( Checkable target, VolumeCheckContext context) { try { - return Futures.immediateFuture(target.check(context)); + return Optional.of( + Futures.immediateFuture(target.check(context))); } catch (Exception e) { LOG.info("check routine threw exception " + e); - return Futures.immediateFailedFuture(e); + return Optional.of(Futures.immediateFailedFuture(e)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java index 16c333b..0fe892d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestDatasetVolumeCheckerFailures.java @@ -132,29 +132,6 @@ public class TestDatasetVolumeCheckerFailures { assertThat(checker.getNumSkippedChecks(), is(1L)); } - @Test(timeout=60000) - public void testMinGapIsEnforcedForASyncChecks() throws Exception { - final List volumes = - TestDatasetVolumeChecker.makeVolumes(1, VolumeCheckResult.HEALTHY); - final FsDatasetSpi dataset = - TestDatasetVolumeChecker.makeDataset(volumes); - final DatasetVolumeChecker checker = new DatasetVolumeChecker(conf, timer); - - checker.checkAllVolumesAsync(dataset, null); - assertThat(checker.getNumAsyncDatasetChecks(), is(1L)); - - // Re-check without advancing the timer. Ensure the check is skipped. - checker.checkAllVolumesAsync(dataset, null); - assertThat(checker.getNumAsyncDatasetChecks(), is(1L)); - assertThat(checker.getNumSkippedChecks(), is(1L)); - - // Re-check after advancing the timer. Ensure the check is performed. - timer.advance(MIN_DISK_CHECK_GAP_MS); - checker.checkAllVolumesAsync(dataset, null); - assertThat(checker.getNumAsyncDatasetChecks(), is(2L)); - assertThat(checker.getNumSkippedChecks(), is(1L)); - } - /** * Create a mock FsVolumeSpi whose {@link FsVolumeSpi#check} routine * hangs forever. http://git-wip-us.apache.org/repos/asf/hadoop/blob/603f3ef1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java index 70795ca..c171c0f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -18,15 +18,14 @@ package org.apache.hadoop.hdfs.server.datanode.checker; +import com.google.common.base.Optional; import com.google.common.base.Supplier; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.FakeTimer; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +37,7 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.core.Is.isA; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -53,9 +49,6 @@ public class TestThrottledAsyncChecker { LoggerFactory.getLogger(TestThrottledAsyncChecker.class); private static final long MIN_ERROR_CHECK_GAP = 1000; - @Rule - public ExpectedException thrown = ExpectedException.none(); - /** * Test various scheduling combinations to ensure scheduling and * throttling behave as expected. @@ -70,34 +63,34 @@ public class TestThrottledAsyncChecker { getExecutorService()); // check target1 and ensure we get back the expected result. - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); // Check target1 again without advancing the timer. target1 should not - // be checked again and the cached result should be returned. - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); + // be checked again. + assertFalse(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); // Schedule target2 scheduled without advancing the timer. // target2 should be checked as it has never been checked before. - assertTrue(checker.schedule(target2, true).get()); - assertThat(target2.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target2, true).isPresent()); + waitTestCheckableCheckCount(target2, 1L); // Advance the timer but just short of the min gap. // Neither target1 nor target2 should be checked again. timer.advance(MIN_ERROR_CHECK_GAP - 1); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); - assertTrue(checker.schedule(target2, true).get()); - assertThat(target2.numChecks.get(), is(1L)); + assertFalse(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); + assertFalse(checker.schedule(target2, true).isPresent()); + waitTestCheckableCheckCount(target2, 1L); // Advance the timer again. // Both targets should be checked now. timer.advance(MIN_ERROR_CHECK_GAP); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(2L)); - assertTrue(checker.schedule(target2, true).get()); - assertThat(target1.numChecks.get(), is(2L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 2L); + assertTrue(checker.schedule(target2, true).isPresent()); + waitTestCheckableCheckCount(target2, 2L); } @Test (timeout=60000) @@ -109,13 +102,16 @@ public class TestThrottledAsyncChecker { new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - ListenableFuture lf = checker.schedule(target, true); - Futures.addCallback(lf, callback); + Optional> olf = + checker.schedule(target, true); + if (olf.isPresent()) { + Futures.addCallback(olf.get(), callback); + } // Request immediate cancellation. checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); try { - assertFalse(lf.get()); + assertFalse(olf.get().get()); fail("Failed to get expected InterruptedException"); } catch (ExecutionException ee) { assertTrue(ee.getCause() instanceof InterruptedException); @@ -130,27 +126,33 @@ public class TestThrottledAsyncChecker { ThrottledAsyncChecker checker = new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - final ListenableFuture lf1 = checker.schedule(target, true); - final ListenableFuture lf2 = checker.schedule(target, true); + final Optional> olf1 = + checker.schedule(target, true); - // Ensure that concurrent requests return the same future object. - assertTrue(lf1 == lf2); + final Optional> olf2 = + checker.schedule(target, true); + + // Ensure that concurrent requests return the future object + // for the first caller. + assertTrue(olf1.isPresent()); + assertFalse(olf2.isPresent()); // Unblock the latch and wait for it to finish execution. target.latch.countDown(); - lf1.get(); + olf1.get().get(); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { - // We should not get back the same future as before. + // We should get an absent Optional. // This can take a short while until the internal callback in // ThrottledAsyncChecker is scheduled for execution. // Also this should not trigger a new check operation as the timer // was not advanced. If it does trigger a new check then the test // will fail with a timeout. - final ListenableFuture lf3 = checker.schedule(target, true); - return lf3 != lf2; + final Optional> olf3 = + checker.schedule(target, true); + return !olf3.isPresent(); } }, 100, 10000); } @@ -168,15 +170,29 @@ public class TestThrottledAsyncChecker { new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - assertTrue(checker.schedule(target1, true).get()); - assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); timer.advance(MIN_ERROR_CHECK_GAP + 1); - assertFalse(checker.schedule(target1, false).get()); - assertThat(target1.numChecks.get(), is(2L)); + assertTrue(checker.schedule(target1, false).isPresent()); + waitTestCheckableCheckCount(target1, 2L); + } + private void waitTestCheckableCheckCount(TestCheckableBase target, + long expectedChecks) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + // This can take a short while until the internal callback in + // ThrottledAsyncChecker is scheduled for execution. + // If it does trigger a new check then the test + // will fail with a timeout. + return target.getTotalChecks() == expectedChecks; + } + }, 100, 10000); + } /** - * Ensure that the exeption from a failed check is cached + * Ensure that the exception from a failed check is cached * and returned without re-running the check when the minimum * gap has not elapsed. * @@ -190,13 +206,11 @@ public class TestThrottledAsyncChecker { new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, getExecutorService()); - thrown.expectCause(isA(DummyException.class)); - checker.schedule(target1, true).get(); - assertThat(target1.numChecks.get(), is(1L)); + assertTrue(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); - thrown.expectCause(isA(DummyException.class)); - checker.schedule(target1, true).get(); - assertThat(target1.numChecks.get(), is(2L)); + assertFalse(checker.schedule(target1, true).isPresent()); + waitTestCheckableCheckCount(target1, 1L); } /** @@ -206,28 +220,38 @@ public class TestThrottledAsyncChecker { return new ScheduledThreadPoolExecutor(1); } + private abstract static class TestCheckableBase + implements Checkable { + protected final AtomicLong numChecks = new AtomicLong(0); + + public long getTotalChecks() { + return numChecks.get(); + } + + public void incrTotalChecks() { + numChecks.incrementAndGet(); + } + } + /** * A Checkable that just returns its input. */ private static class NoOpCheckable - implements Checkable { - private final AtomicLong numChecks = new AtomicLong(0); + extends TestCheckableBase { @Override public Boolean check(Boolean context) { - numChecks.incrementAndGet(); + incrTotalChecks(); return context; } } private static class ThrowingCheckable - implements Checkable { - private final AtomicLong numChecks = new AtomicLong(0); + extends TestCheckableBase { @Override public Boolean check(Boolean context) throws DummyException { - numChecks.incrementAndGet(); + incrTotalChecks(); throw new DummyException(); } - } private static class DummyException extends Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org