hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From weic...@apache.org
Subject hadoop git commit: HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt incorrectly. Contributed by Wei-Chiu Chuang and Yongjun Zhang.
Date Fri, 16 Dec 2016 21:41:16 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 2123b664c -> bbf380a49


HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt incorrectly. Contributed
by Wei-Chiu Chuang and Yongjun Zhang.

(cherry picked from commit 0cb99db9d91d113e7fbe229f90a61a33433cecb9)

Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java

(cherry picked from commit 18be0447cde622dfaaad27f7c2b9cccb30469fef)

Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bbf380a4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bbf380a4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bbf380a4

Branch: refs/heads/branch-2.7
Commit: bbf380a493bb3e7a61f5095d0f631eb5e07b446b
Parents: 2123b66
Author: Wei-Chiu Chuang <weichiu@apache.org>
Authored: Fri Dec 16 13:40:20 2016 -0800
Committer: Wei-Chiu Chuang <weichiu@apache.org>
Committed: Fri Dec 16 13:40:20 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hdfs/server/datanode/BlockScanner.java      |  10 +-
 .../hdfs/server/datanode/BlockSender.java       |  17 +++-
 .../hdfs/server/datanode/FinalizedReplica.java  |  29 ++++++
 .../hdfs/server/datanode/VolumeScanner.java     |   9 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  |  11 ++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  28 +-----
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  39 ++++++++
 .../server/datanode/SimulatedFSDataset.java     |   6 ++
 .../hdfs/server/datanode/TestBlockScanner.java  | 100 +++++++++++++++++++
 .../server/datanode/TestDirectoryScanner.java   |   6 ++
 .../datanode/extdataset/ExternalVolumeImpl.java |   6 ++
 12 files changed, 232 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f8fe1e5..342bc67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -207,6 +207,9 @@ Release 2.7.4 - UNRELEASED
 
     HDFS-11229. HDFS-11056 failed to close meta file. (Wei-Chiu Chuang)
 
+    HDFS-11160. VolumeScanner reports write-in-progress replicas as corrupt
+    incorrectly. Contributed by Wei-Chiu Chuang and Yongjun Zhang.
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
index be6aa83..11732ee 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -64,7 +64,15 @@ public class BlockScanner {
   /**
    * The scanner configuration.
    */
-  private final Conf conf;
+  private Conf conf;
+
+  @VisibleForTesting
+  void setConf(Conf conf) {
+    this.conf = conf;
+    for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+      entry.getValue().setConf(conf);
+    }
+  }
 
   /**
    * The cached scanner configuration.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 82fea00..c042190 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -231,14 +231,24 @@ class BlockSender implements java.io.Closeable {
         Preconditions.checkArgument(sendChecksum,
             "If verifying checksum, currently must also send it.");
       }
-      
+
+      // if there is a append write happening right after the BlockSender
+      // is constructed, the last partial checksum maybe overwritten by the
+      // append, the BlockSender need to use the partial checksum before
+      // the append write.
+      ChunkChecksum chunkChecksum = null;
       final long replicaVisibleLength;
       synchronized(datanode.data) { 
         replica = getReplica(block, datanode);
         replicaVisibleLength = replica.getVisibleLength();
+        if (replica instanceof FinalizedReplica) {
+          // Load last checksum in case the replica is being written
+          // concurrently
+          final FinalizedReplica frep = (FinalizedReplica) replica;
+          chunkChecksum = frep.getLastChecksumAndDataLen();
+        }
       }
       // if there is a write in progress
-      ChunkChecksum chunkChecksum = null;
       if (replica instanceof ReplicaBeingWritten) {
         final ReplicaBeingWritten rbw = (ReplicaBeingWritten)replica;
         waitForMinLength(rbw, startOffset + length);
@@ -482,7 +492,7 @@ class BlockSender implements java.io.Closeable {
               bytesOnDisk));
     }
   }
-  
+
   /**
    * Converts an IOExcpetion (not subclasses) to SocketException.
    * This is typically done to indicate to upper layers that the error 
@@ -556,7 +566,6 @@ class BlockSender implements java.io.Closeable {
       if (lastDataPacket && lastChunkChecksum != null) {
         int start = checksumOff + checksumDataLen - checksumSize;
         byte[] updatedChecksum = lastChunkChecksum.getChecksum();
-        
         if (updatedChecksum != null) {
           System.arraycopy(updatedChecksum, 0, buf, start, checksumSize);
         }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
index cc32874..d8fa60a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
 
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -101,4 +103,31 @@ public class FinalizedReplica extends ReplicaInfo {
     return super.toString()
         + "\n  unlinked          =" + unlinked;
   }
+
+  /**
+   * gets the last chunk checksum and the length of the block corresponding
+   * to that checksum.
+   * Note, need to be called with the FsDataset lock acquired. May improve to
+   * lock only the FsVolume in the future.
+   * @throws IOException
+   */
+  public ChunkChecksum getLastChecksumAndDataLen() throws IOException {
+    ChunkChecksum chunkChecksum = null;
+    try {
+      byte[] lastChecksum = getVolume().loadLastPartialChunkChecksum(
+          getBlockFile(), getMetaFile());
+      if (lastChecksum != null) {
+        chunkChecksum =
+            new ChunkChecksum(getVisibleLength(), lastChecksum);
+      }
+    } catch (FileNotFoundException e) {
+      // meta file is lost. Try to continue anyway.
+      DataNode.LOG.warn("meta file " + getMetaFile() +
+          " is missing!");
+    } catch (IOException ioe) {
+      DataNode.LOG.warn("Unable to read checksum from meta file " +
+          getMetaFile(), ioe);
+    }
+    return chunkChecksum;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
index d0300f6..a40cbc8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -69,7 +69,12 @@ public class VolumeScanner extends Thread {
   /**
    * The configuration.
    */
-  private final Conf conf;
+  private Conf conf;
+
+  @VisibleForTesting
+  void setConf(Conf conf) {
+    this.conf = conf;
+  }
 
   /**
    * The DataNode this VolumEscanner is associated with.
@@ -431,6 +436,7 @@ public class VolumeScanner extends Thread {
     if (block == null) {
       return -1; // block not found.
     }
+    LOG.debug("start scanning block {}", block);
     BlockSender blockSender = null;
     try {
       blockSender = new BlockSender(block, 0, -1,
@@ -612,6 +618,7 @@ public class VolumeScanner extends Thread {
               break;
             }
             if (timeout > 0) {
+              LOG.debug("{}: wait for {} milliseconds", this, timeout);
               wait(timeout);
               if (stopping) {
                 break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index e6ace44..aee5967 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -179,4 +179,15 @@ public interface FsVolumeSpi {
    * Get the FSDatasetSpi which this volume is a part of.
    */
   public FsDatasetSpi getDataset();
+
+  /**
+   * Load last partial chunk checksum from checksum file.
+   * Need to be called with FsDataset lock acquired.
+   * @param blockFile
+   * @param metaFile
+   * @return the last partial checksum
+   * @throws IOException
+   */
+  byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
+      throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 2b121ac..ec86337 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1080,30 +1080,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     return new ReplicaHandler(replica, ref);
   }
 
-
-  private byte[] loadLastPartialChunkChecksum(
-      File blockFile, File metaFile) throws IOException {
-    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
-    final int checksumSize = dcs.getChecksumSize();
-    final long onDiskLen = blockFile.length();
-    final int bytesPerChecksum = dcs.getBytesPerChecksum();
-
-    if (onDiskLen % bytesPerChecksum == 0) {
-      // the last chunk is a complete one. No need to preserve its checksum
-      // because it will not be modified.
-      return null;
-    }
-
-    int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
-        (int)(onDiskLen / bytesPerChecksum * checksumSize);
-    byte[] lastChecksum = new byte[checksumSize];
-    try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
-      raf.seek(offsetInChecksum);
-      raf.read(lastChecksum, 0, checksumSize);
-    }
-    return lastChecksum;
-  }
-
   /** Append to a finalized replica
    * Change a finalized replica to be a RBW replica and 
    * bump its generation stamp to be the newGS
@@ -1139,7 +1115,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         v, newBlkFile.getParentFile(), Thread.currentThread(), bytesReserved);
 
     // load last checksum and datalen
-    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
+    byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(
         replicaInfo.getBlockFile(), replicaInfo.getMetaFile());
     newReplicaInfo.setLastChecksumAndDataLen(
         replicaInfo.getNumBytes(), lastChunkChecksum);
@@ -1470,7 +1446,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     // load last checksum and datalen
     final File destMeta = FsDatasetUtil.getMetaFile(dest,
         b.getGenerationStamp());
-    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
+    byte[] lastChunkChecksum = v.loadLastPartialChunkChecksum(dest, destMeta);
     rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
     // overwrite the RBW in the volume map
     volumeMap.add(b.getBlockPoolId(), rbw);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index b10e2c0..92db57a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.nio.channels.ClosedChannelException;
 import java.io.OutputStreamWriter;
 import java.nio.file.Files;
@@ -51,6 +52,7 @@ import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.CloseableReferenceCount;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -953,5 +956,41 @@ public class FsVolumeImpl implements FsVolumeSpi {
   DatanodeStorage toDatanodeStorage() {
     return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
   }
+
+
+  @Override
+  public byte[] loadLastPartialChunkChecksum(
+      File blockFile, File metaFile) throws IOException {
+    // readHeader closes the temporary FileInputStream.
+    DataChecksum dcs = BlockMetadataHeader
+        .readHeader(metaFile).getChecksum();
+    final int checksumSize = dcs.getChecksumSize();
+    final long onDiskLen = blockFile.length();
+    final int bytesPerChecksum = dcs.getBytesPerChecksum();
+
+    if (onDiskLen % bytesPerChecksum == 0) {
+      // the last chunk is a complete one. No need to preserve its checksum
+      // because it will not be modified.
+      return null;
+    }
+
+    long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+        (onDiskLen / bytesPerChecksum) * checksumSize;
+    byte[] lastChecksum = new byte[checksumSize];
+    try (RandomAccessFile raf = new RandomAccessFile(metaFile, "r")) {
+      raf.seek(offsetInChecksum);
+      int readBytes = raf.read(lastChecksum, 0, checksumSize);
+      if (readBytes == -1) {
+        throw new IOException("Expected to read " + checksumSize +
+            " bytes from offset " + offsetInChecksum +
+            " but reached end of file.");
+      } else if (readBytes != checksumSize) {
+        throw new IOException("Expected to read " + checksumSize +
+            " bytes from offset " + offsetInChecksum + " but read " +
+            readBytes + " bytes.");
+      }
+    }
+    return lastChecksum;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 951aa44..63b6bb4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -506,6 +506,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi>
{
     public FsDatasetSpi getDataset() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public byte[] loadLastPartialChunkChecksum(
+        File blockFile, File metaFile) throws IOException {
+      return null;
+    }
   }
 
   private final Map<String, Map<Block, BInfo>> blockMap

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
index e20fa80..900cf93 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -34,8 +34,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Supplier;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.AppendTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -812,4 +816,100 @@ public class TestBlockScanner {
       info.blocksScanned = 0;
     }
   }
+
+  /**
+   * Test concurrent append and scan.
+   * @throws Exception
+   */
+  @Test(timeout=120000)
+  public void testAppendWhileScanning() throws Exception {
+    GenericTestUtils.setLogLevel(DataNode.LOG, Level.ALL);
+    Configuration conf = new Configuration();
+    // throttle the block scanner: 1MB per second
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576);
+    // Set a really long scan period.
+    conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+    conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+        TestScanResultHandler.class.getName());
+    conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+    final int numExpectedFiles = 1;
+    final int numExpectedBlocks = 1;
+    final int numNameServices = 1;
+    // the initial file length can not be too small.
+    // Otherwise checksum file stream buffer will be pre-filled and
+    // BlockSender will not see the updated checksum.
+    final int initialFileLength = 2*1024*1024+100;
+    final TestContext ctx = new TestContext(conf, numNameServices);
+    // create one file, with one block.
+    ctx.createFiles(0, numExpectedFiles, initialFileLength);
+    final TestScanResultHandler.Info info =
+        TestScanResultHandler.getInfo(ctx.volumes.get(0));
+    String storageID = ctx.volumes.get(0).getStorageID();
+    synchronized (info) {
+      info.sem = new Semaphore(numExpectedBlocks*2);
+      info.shouldRun = true;
+      info.notify();
+    }
+    // VolumeScanner scans the first block when DN starts.
+    // Due to throttler, this should take approximately 2 seconds.
+    waitForRescan(info, numExpectedBlocks);
+
+    // update throttler to schedule rescan immediately.
+    // this number must be larger than initial file length, otherwise
+    // throttler prevents immediate rescan.
+    conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
+        initialFileLength+32*1024);
+    BlockScanner.Conf newConf = new BlockScanner.Conf(conf);
+    ctx.datanode.getBlockScanner().setConf(newConf);
+    // schedule the first block for scanning
+    ExtendedBlock first = ctx.getFileBlock(0, 0);
+    ctx.datanode.getBlockScanner().markSuspectBlock(storageID, first);
+
+    // append the file before VolumeScanner completes scanning the block,
+    // which takes approximately 2 seconds to complete.
+    FileSystem fs = ctx.cluster.getFileSystem();
+    FSDataOutputStream os = fs.append(ctx.getPath(0));
+    long seed = -1;
+    int size = 200;
+    final byte[] bytes = AppendTestUtil.randomBytes(seed, size);
+    os.write(bytes);
+    os.hflush();
+    os.close();
+    fs.close();
+
+    // verify that volume scanner does not find bad blocks after append.
+    waitForRescan(info, numExpectedBlocks);
+
+    GenericTestUtils.setLogLevel(DataNode.LOG, Level.INFO);
+  }
+
+  private void waitForRescan(final TestScanResultHandler.Info info,
+      final int numExpectedBlocks)
+      throws TimeoutException, InterruptedException {
+    LOG.info("Waiting for the first 1 blocks to be scanned.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        synchronized (info) {
+          if (info.blocksScanned >= numExpectedBlocks) {
+            LOG.info("info = {}.  blockScanned has now reached 1.", info);
+            return true;
+          } else {
+            LOG.info("info = {}.  Waiting for blockScanned to reach 1.", info);
+            return false;
+          }
+        }
+      }
+    }, 1000, 30000);
+
+    synchronized (info) {
+      assertEquals("Expected 1 good block.",
+          numExpectedBlocks, info.goodBlocks.size());
+      info.goodBlocks.clear();
+      assertEquals("Expected 1 blocksScanned",
+          numExpectedBlocks, info.blocksScanned);
+      assertEquals("Did not expect bad blocks.", 0, info.badBlocks.size());
+      info.blocksScanned = 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 62b2ca9..f060f3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -613,6 +613,12 @@ public class TestDirectoryScanner {
     public FsDatasetSpi getDataset() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public byte[] loadLastPartialChunkChecksum(
+        File blockFile, File metaFile) throws IOException {
+      return null;
+    }
   }
 
   private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bbf380a4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index b00d78f..00648e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -88,6 +88,12 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
   }
 
   @Override
+  public byte[] loadLastPartialChunkChecksum(
+      File blockFile, File metaFile) throws IOException {
+    return null;
+  }
+
+  @Override
   public BlockIterator loadBlockIterator(String bpid, String name)
       throws IOException {
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message