hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kih...@apache.org
Subject git commit: HDFS-7203. Concurrent appending to the same file can cause data corruption. Contributed by Kihwal Lee.
Date Wed, 08 Oct 2014 20:07:33 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d218ab58f -> b39c8c312


HDFS-7203. Concurrent appending to the same file can cause data
corruption. Contributed by Kihwal Lee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b39c8c31
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b39c8c31
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b39c8c31

Branch: refs/heads/branch-2
Commit: b39c8c31282d11cd646341e3ca9aab85394e8d4d
Parents: d218ab5
Author: Kihwal Lee <kihwal@apache.org>
Authored: Wed Oct 8 15:05:13 2014 -0500
Committer: Kihwal Lee <kihwal@apache.org>
Committed: Wed Oct 8 15:07:12 2014 -0500

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 14 ++--
 .../org/apache/hadoop/hdfs/TestFileAppend3.java | 83 ++++++++++++++++++++
 3 files changed, 91 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39c8c31/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 4c502fa..c254992 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -564,6 +564,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-7181. Remove incorrect precondition check on key length in
     FileEncryptionInfo. (wang)
 
+    HDFS-7203. Concurrent appending to the same file can cause data corruption
+    (kihwal)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HDFS-6387. HDFS CLI admin tool for creating & deleting an

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39c8c31/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index f54d325..362c62d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -1666,7 +1666,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
         }
         return null;
       }
-      return callAppend(stat, src, buffersize, progress);
+      return callAppend(src, buffersize, progress);
     }
     return null;
   }
@@ -1738,7 +1738,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   }
 
   /** Method to get stream returned by append call */
-  private DFSOutputStream callAppend(HdfsFileStatus stat, String src,
+  private DFSOutputStream callAppend(String src,
       int buffersize, Progressable progress) throws IOException {
     LocatedBlock lastBlock = null;
     try {
@@ -1752,8 +1752,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
                                      UnresolvedPathException.class,
                                      SnapshotAccessControlException.class);
     }
+    HdfsFileStatus newStat = getFileInfo(src);
     return DFSOutputStream.newStreamForAppend(this, src, buffersize, progress,
-        lastBlock, stat, dfsClientConf.createChecksum());
+        lastBlock, newStat, dfsClientConf.createChecksum());
   }
   
   /**
@@ -1777,12 +1778,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   private DFSOutputStream append(String src, int buffersize, Progressable progress) 
       throws IOException {
     checkOpen();
-    HdfsFileStatus stat = getFileInfo(src);
-    if (stat == null) { // No file found
-      throw new FileNotFoundException("failed to append to non-existent file "
-          + src + " on client " + clientName);
-    }
-    final DFSOutputStream result = callAppend(stat, src, buffersize, progress);
+    final DFSOutputStream result = callAppend(src, buffersize, progress);
     beginFileLease(result.getFileId(), result);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b39c8c31/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
index 66a04e7..d5de0ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
@@ -25,15 +25,23 @@ import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
+import org.mockito.invocation.InvocationOnMock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import org.mockito.stubbing.Answer;
+
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSClientAdapter;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -361,4 +369,79 @@ public class TestFileAppend3  {
     AppendTestUtil.checkFullFile(fs, p, fileLen,
         fileContents, "Failed to append to a partial chunk");
   }
+
+  // Do small appends.
+  void doSmallAppends(Path file, DistributedFileSystem fs, int iterations)
+    throws IOException {
+    for (int i = 0; i < iterations; i++) {
+      FSDataOutputStream stm;
+      try {
+        stm = fs.append(file);
+      } catch (IOException e) {
+        // If another thread is already appending, skip this time.
+        continue;
+      }
+      // Failure in write or close will be terminal.
+      AppendTestUtil.write(stm, 0, 123);
+      stm.close();
+    }
+  }
+
+
+  @Test
+  public void testSmallAppendRace()  throws Exception {
+    final Path file = new Path("/testSmallAppendRace");
+    final String fName = file.toUri().getPath();
+
+    // Create the file and write a small amount of data.
+    FSDataOutputStream stm = fs.create(file);
+    AppendTestUtil.write(stm, 0, 123);
+    stm.close();
+
+    // Introduce a delay between getFileInfo and calling append() against NN.
+    final DFSClient client = DFSClientAdapter.getDFSClient(fs);
+    DFSClient spyClient = spy(client);
+    when(spyClient.getFileInfo(fName)).thenAnswer(new Answer<HdfsFileStatus>() {
+      @Override
+      public HdfsFileStatus answer(InvocationOnMock invocation){
+        try {
+          HdfsFileStatus stat = client.getFileInfo(fName);
+          Thread.sleep(100);
+          return stat;
+        } catch (Exception e) {
+          return null;
+        }
+      }
+    });
+
+    DFSClientAdapter.setDFSClient(fs, spyClient);
+
+    // Create two threads for doing appends to the same file.
+    Thread worker1 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          doSmallAppends(file, fs, 20);
+        } catch (IOException e) {
+        }
+      }
+    };
+
+    Thread worker2 = new Thread() {
+      @Override
+      public void run() {
+        try {
+          doSmallAppends(file, fs, 20);
+        } catch (IOException e) {
+        }
+      }
+    };
+
+    worker1.start();
+    worker2.start();
+
+    // append will fail when the file size crosses the checksum chunk boundary,
+    // if append was called with a stale file stat.
+    doSmallAppends(file, fs, 20);
+  }
 }


Mime
View raw message