hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From weic...@apache.org
Subject hadoop git commit: HDFS-11056. Concurrent append and read operations lead to checksum error. Contributed by Wei-Chiu Chuang.
Date Wed, 09 Nov 2016 17:16:59 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 367c3d412 -> c619e9b43


HDFS-11056. Concurrent append and read operations lead to checksum error. Contributed by Wei-Chiu
Chuang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c619e9b4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c619e9b4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c619e9b4

Branch: refs/heads/trunk
Commit: c619e9b43fd00ba0e59a98ae09685ff719bb722b
Parents: 367c3d4
Author: Wei-Chiu Chuang <weichiu@apache.org>
Authored: Wed Nov 9 09:15:51 2016 -0800
Committer: Wei-Chiu Chuang <weichiu@apache.org>
Committed: Wed Nov 9 09:16:50 2016 -0800

----------------------------------------------------------------------
 .../datanode/fsdataset/impl/FsVolumeImpl.java   | 41 +++++++++++
 .../org/apache/hadoop/hdfs/TestFileAppend.java  | 71 ++++++++++++++++++++
 .../fsdataset/impl/FsDatasetImplTestUtils.java  | 14 ++++
 3 files changed, 126 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c619e9b4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 1627865..5880b3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
+import java.io.RandomAccessFile;
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
 import java.nio.file.Files;
@@ -47,6 +48,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
@@ -59,6 +61,7 @@ import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
 import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
@@ -1102,6 +1105,28 @@ public class FsVolumeImpl implements FsVolumeSpi {
   }
 
 
+  private byte[] loadLastPartialChunkChecksum(
+      File blockFile, File metaFile) throws IOException {
+    DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
+    final int checksumSize = dcs.getChecksumSize();
+    final long onDiskLen = blockFile.length();
+    final int bytesPerChecksum = dcs.getBytesPerChecksum();
+
+    if (onDiskLen % bytesPerChecksum == 0) {
+      // the last chunk is a complete one. No need to preserve its checksum
+      // because it will not be modified.
+      return null;
+    }
+
+    int offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
+        (int)(onDiskLen / bytesPerChecksum * checksumSize);
+    byte[] lastChecksum = new byte[checksumSize];
+    RandomAccessFile raf = new RandomAccessFile(metaFile, "r");
+    raf.seek(offsetInChecksum);
+    raf.read(lastChecksum, 0, checksumSize);
+    return lastChecksum;
+  }
+
   public ReplicaInPipeline append(String bpid, ReplicaInfo replicaInfo,
       long newGS, long estimateBlockLen) throws IOException {
 
@@ -1126,6 +1151,13 @@ public class FsVolumeImpl implements FsVolumeSpi {
         .setBytesToReserve(bytesReserved)
         .buildLocalReplicaInPipeline();
 
+    // load last checksum and datalen
+    LocalReplica localReplica = (LocalReplica)replicaInfo;
+    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(
+        localReplica.getBlockFile(), localReplica.getMetaFile());
+    newReplicaInfo.setLastChecksumAndDataLen(
+        replicaInfo.getNumBytes(), lastChunkChecksum);
+
     // rename meta file to rbw directory
     // rename block file to rbw directory
     newReplicaInfo.moveReplicaFrom(replicaInfo, newBlkFile);
@@ -1170,6 +1202,12 @@ public class FsVolumeImpl implements FsVolumeSpi {
         .setBytesToReserve(0)
         .buildLocalReplicaInPipeline();
     rbw.setBytesAcked(visible);
+
+    // load last checksum and datalen
+    final File destMeta = FsDatasetUtil.getMetaFile(dest,
+        b.getGenerationStamp());
+    byte[] lastChunkChecksum = loadLastPartialChunkChecksum(dest, destMeta);
+    rbw.setLastChecksumAndDataLen(numBytes, lastChunkChecksum);
     return rbw;
   }
 
@@ -1206,6 +1244,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
         .setDirectoryToUse(blockFile.getParentFile())
         .setBytesToReserve(newlength)
         .buildLocalReplicaInPipeline();
+    // In theory, this rbw replica needs to reload last chunk checksum,
+    // but it is immediately converted to finalized state within the same lock,
+    // so no need to update it.
     return newReplicaInfo;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c619e9b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index 3e8561f..20cec6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -24,10 +24,12 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -44,10 +46,16 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Time;
 import org.junit.Assert;
 import org.junit.Test;
@@ -61,6 +69,8 @@ public class TestFileAppend{
 
   private static byte[] fileContents = null;
 
+  static final DataChecksum DEFAULT_CHECKSUM =
+      DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
   //
   // writes to file but does not close it
   //
@@ -655,4 +665,65 @@ public class TestFileAppend{
       cluster.shutdown();
     }
   }
+
+  @Test(timeout = 10000)
+  public void testConcurrentAppendRead()
+      throws IOException, TimeoutException, InterruptedException {
+    // Create a finalized replica and append to it
+    // Read block data and checksum. Verify checksum.
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    conf.setInt("dfs.min.replication", 1);
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      FsDatasetSpi<?> dataSet = DataNodeTestUtils.getFSDataset(dn);
+
+      // create a file with 1 byte of data.
+      long initialFileLength = 1;
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path fileName = new Path("/appendCorruptBlock");
+      DFSTestUtil.createFile(fs, fileName, initialFileLength, (short) 1, 0);
+      DFSTestUtil.waitReplication(fs, fileName, (short) 1);
+      Assert.assertTrue("File not created", fs.exists(fileName));
+
+      // Call FsDatasetImpl#append to append the block file,
+      // which converts it to a rbw replica.
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
+      long newGS = block.getGenerationStamp()+1;
+      ReplicaHandler
+          replicaHandler = dataSet.append(block, newGS, initialFileLength);
+
+      // write data to block file
+      ReplicaBeingWritten rbw =
+          (ReplicaBeingWritten)replicaHandler.getReplica();
+      ReplicaOutputStreams
+          outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM);
+      OutputStream dataOutput = outputStreams.getDataOut();
+
+      byte[] appendBytes = new byte[1];
+      dataOutput.write(appendBytes, 0, 1);
+      dataOutput.flush();
+      dataOutput.close();
+
+      // update checksum file
+      final int smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
+      FsDatasetUtil.computeChecksum(
+          rbw.getMetaFile(), rbw.getMetaFile(), rbw.getBlockFile(),
+          smallBufferSize, conf);
+
+      // read the block
+      // the DataNode BlockSender should read from the rbw replica's in-memory
+      // checksum, rather than on-disk checksum. Otherwise it will see a
+      // checksum mismatch error.
+      final byte[] readBlock = DFSTestUtil.readFileBuffer(fs, fileName);
+      assertEquals("should have read only one byte!", 1, readBlock.length);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c619e9b4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
index 07ddb59..05b41fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImplTestUtils.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -43,10 +44,13 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.log4j.Level;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileChannel;
@@ -67,6 +71,8 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
       LogFactory.getLog(FsDatasetImplTestUtils.class);
   private final FsDatasetImpl dataset;
 
+  private static final DataChecksum DEFAULT_CHECKSUM =
+      DataChecksum.newDataChecksum(DataChecksum.Type.CRC32C, 512);
   /**
    * By default we assume 2 data directories (volumes) per DataNode.
    */
@@ -245,9 +251,17 @@ public class FsDatasetImplTestUtils implements FsDatasetTestUtils {
     dataset.volumeMap.add(block.getBlockPoolId(), info);
     info.getBlockFile().createNewFile();
     info.getMetaFile().createNewFile();
+    saveMetaFileHeader(info.getMetaFile());
     return info;
   }
 
+  private void saveMetaFileHeader(File metaFile) throws IOException {
+    DataOutputStream metaOut = new DataOutputStream(
+        new FileOutputStream(metaFile));
+    BlockMetadataHeader.writeHeader(metaOut, DEFAULT_CHECKSUM);
+    metaOut.close();
+  }
+
   @Override
   public Replica createReplicaInPipeline(ExtendedBlock block)
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message