hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ji...@apache.org
Subject [2/3] hadoop git commit: HDFS-3689. Add support for variable length block. Contributed by Jing Zhao.
Date Thu, 12 Feb 2015 18:26:26 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
index cbcaa79..bd6f76c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
@@ -66,6 +66,7 @@ enum CreateFlagProto {
   OVERWRITE = 0x02; // Truncate/overwrite a file. Same as POSIX O_TRUNC
   APPEND = 0x04;    // Append to a file
   LAZY_PERSIST = 0x10; // File with reduced durability guarantees.
+  NEW_BLOCK = 0x20; // Write data to a new block when appending
 }
 
 message CreateRequestProto {
@@ -86,6 +87,7 @@ message CreateResponseProto {
 message AppendRequestProto {
   required string src = 1;
   required string clientName = 2;
+  optional uint32 flag = 3; // bits set using CreateFlag
 }
 
 message AppendResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
index e50f14b..5b78fe6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto
@@ -89,6 +89,7 @@ message CloseEventProto {
 
 message AppendEventProto {
   required string path = 1;
+  optional bool newBlock = 2 [default = false];
 }
 
 message RenameEventProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
index d3e9d23..de4da5f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java
@@ -159,6 +159,22 @@ public class AppendTestUtil {
     }
   }
 
+  public static void check(DistributedFileSystem fs, Path p, int position,
+      int length) throws IOException {
+    byte[] buf = new byte[length];
+    int i = 0;
+    try {
+      FSDataInputStream in = fs.open(p);
+      in.read(position, buf, 0, buf.length);
+      for(i = position; i < length + position; i++) {
+        assertEquals((byte) i, buf[i - position]);
+      }
+      in.close();
+    } catch(IOException ioe) {
+      throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
+    }
+  }
+
   /**
    *  create a buffer that contains the entire test file data.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 8e659c3..b917b86 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1178,6 +1178,9 @@ public class DFSTestUtil {
     FSDataOutputStream s = filesystem.create(pathFileCreate);
     // OP_CLOSE 9
     s.close();
+    // OP_APPEND 47
+    FSDataOutputStream s2 = filesystem.append(pathFileCreate, 4096, null);
+    s2.close();
     // OP_SET_STORAGE_POLICY 45
     filesystem.setStoragePolicy(pathFileCreate,
         HdfsConstants.HOT_STORAGE_POLICY_NAME);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
index 75a4ad4..4f449d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.XAttrSetFlag;
@@ -71,7 +72,7 @@ public class TestDFSInotifyEventInputStream {
    */
   @Test
   public void testOpcodeCount() {
-    Assert.assertEquals(48, FSEditLogOpCodes.values().length);
+    Assert.assertEquals(49, FSEditLogOpCodes.values().length);
   }
 
 
@@ -109,7 +110,8 @@ public class TestDFSInotifyEventInputStream {
       os.write(new byte[BLOCK_SIZE]);
       os.close(); // CloseOp -> CloseEvent
       // AddOp -> AppendEvent
-      os = client.append("/file2", BLOCK_SIZE, null, null);
+      os = client.append("/file2", BLOCK_SIZE, EnumSet.of(CreateFlag.APPEND),
+          null, null);
       os.write(new byte[BLOCK_SIZE]);
       os.close(); // CloseOp -> CloseEvent
       Thread.sleep(10); // so that the atime will get updated on the next line
@@ -182,13 +184,14 @@ public class TestDFSInotifyEventInputStream {
       Assert.assertTrue(ce2.getFileSize() > 0);
       Assert.assertTrue(ce2.getTimestamp() > 0);
 
-      // AddOp
+      // AppendOp
       batch = waitForNextEvents(eis);
       Assert.assertEquals(1, batch.getEvents().length);
       txid = checkTxid(batch, txid);
       Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND);
       Event.AppendEvent append2 = (Event.AppendEvent)batch.getEvents()[0];
       Assert.assertEquals("/file2", append2.getPath());
+      Assert.assertFalse(append2.toNewBlock());
 
       // CloseOp
       batch = waitForNextEvents(eis);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
index a733ad0..ff0b9d7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java
@@ -25,10 +25,12 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HardLink;
@@ -345,7 +347,46 @@ public class TestFileAppend{
       cluster.shutdown();
     }
   }
+
+  /** Test two consecutive appends on a file with a full block. */
+  @Test
+  public void testAppend2Twice() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    final DistributedFileSystem fs1 = cluster.getFileSystem();
+    final FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(conf);
+    try {
+      final Path p = new Path("/testAppendTwice/foo");
+      final int len = 1 << 16;
+      final byte[] fileContents = AppendTestUtil.initBuffer(len);
+
+      {
+        // create a new file with a full block.
+        FSDataOutputStream out = fs2.create(p, true, 4096, (short)1, len);
+        out.write(fileContents, 0, len);
+        out.close();
+      }
   
+      //1st append does not add any data so that the last block remains full
+      //and the last block in INodeFileUnderConstruction is a BlockInfo
+      //but not BlockInfoUnderConstruction.
+      ((DistributedFileSystem) fs2).append(p,
+          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+
+      // 2nd append should get AlreadyBeingCreatedException
+      fs1.append(p);
+      Assert.fail();
+    } catch(RemoteException re) {
+      AppendTestUtil.LOG.info("Got an exception:", re);
+      Assert.assertEquals(AlreadyBeingCreatedException.class.getName(),
+          re.getClassName());
+    } finally {
+      fs2.close();
+      fs1.close();
+      cluster.shutdown();
+    }
+  }
+
   /** Tests appending after soft-limit expires. */
   @Test
   public void testAppendAfterSoftLimit() 
@@ -388,6 +429,54 @@ public class TestFileAppend{
     }
   }
 
+  /** Tests appending after soft-limit expires. */
+  @Test
+  public void testAppend2AfterSoftLimit() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
+    //Set small soft-limit for lease
+    final long softLimit = 1L;
+    final long hardLimit = 9999999L;
+
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
+        .build();
+    cluster.setLeasePeriod(softLimit, hardLimit);
+    cluster.waitActive();
+
+    DistributedFileSystem fs = cluster.getFileSystem();
+    DistributedFileSystem fs2 = new DistributedFileSystem();
+    fs2.initialize(fs.getUri(), conf);
+
+    final Path testPath = new Path("/testAppendAfterSoftLimit");
+    final byte[] fileContents = AppendTestUtil.initBuffer(32);
+
+    // create a new file without closing
+    FSDataOutputStream out = fs.create(testPath);
+    out.write(fileContents);
+
+    //Wait for > soft-limit
+    Thread.sleep(250);
+
+    try {
+      FSDataOutputStream appendStream2 = fs2.append(testPath,
+          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+      appendStream2.write(fileContents);
+      appendStream2.close();
+      assertEquals(fileContents.length, fs.getFileStatus(testPath).getLen());
+      // make sure we now have 1 block since the first writer was revoked
+      LocatedBlocks blks = fs.getClient().getLocatedBlocks(testPath.toString(),
+          0L);
+      assertEquals(1, blks.getLocatedBlocks().size());
+      for (LocatedBlock blk : blks.getLocatedBlocks()) {
+        assertEquals(fileContents.length, blk.getBlockSize());
+      }
+    } finally {
+      fs.close();
+      fs2.close();
+      cluster.shutdown();
+    }
+  }
+
   /**
    * Old replica of the block should not be accepted as valid for append/read
    */
@@ -441,4 +530,77 @@ public class TestFileAppend{
     }
   }
 
+  /**
+   * Old replica of the block should not be accepted as valid for append/read
+   */
+  @Test
+  public void testMultiAppend2() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.set("dfs.client.block.write.replace-datanode-on-failure.enable",
+        "false");
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
+        .build();
+    DistributedFileSystem fs = null;
+    final String hello = "hello\n";
+    try {
+      fs = cluster.getFileSystem();
+      Path path = new Path("/test");
+      FSDataOutputStream out = fs.create(path);
+      out.writeBytes(hello);
+      out.close();
+
+      // stop one datanode
+      DataNodeProperties dnProp = cluster.stopDataNode(0);
+      String dnAddress = dnProp.datanode.getXferAddress().toString();
+      if (dnAddress.startsWith("/")) {
+        dnAddress = dnAddress.substring(1);
+      }
+
+      // append again to bump genstamps
+      for (int i = 0; i < 2; i++) {
+        out = fs.append(path,
+            EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+        out.writeBytes(hello);
+        out.close();
+      }
+
+      // re-open and make the block state as underconstruction
+      out = fs.append(path, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+          4096, null);
+      cluster.restartDataNode(dnProp, true);
+      // wait till the block report comes
+      Thread.sleep(2000);
+      out.writeBytes(hello);
+      out.close();
+      // check the block locations
+      LocatedBlocks blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
+      // since we append the file 3 time, we should be 4 blocks
+      assertEquals(4, blocks.getLocatedBlocks().size());
+      for (LocatedBlock block : blocks.getLocatedBlocks()) {
+        assertEquals(hello.length(), block.getBlockSize());
+      }
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < 4; i++) {
+        sb.append(hello);
+      }
+      final byte[] content = sb.toString().getBytes();
+      AppendTestUtil.checkFullFile(fs, path, content.length, content,
+          "Read /test");
+
+      // restart namenode to make sure the editlog can be properly applied
+      cluster.restartNameNode(true);
+      cluster.waitActive();
+      AppendTestUtil.checkFullFile(fs, path, content.length, content,
+          "Read /test");
+      blocks = fs.getClient().getLocatedBlocks(path.toString(), 0L);
+      // since we append the file 3 time, we should be 4 blocks
+      assertEquals(4, blocks.getLocatedBlocks().size());
+      for (LocatedBlock block : blocks.getLocatedBlocks()) {
+        assertEquals(hello.length(), block.getBlockSize());
+      }
+    } finally {
+      IOUtils.closeStream(fs);
+      cluster.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
index 6c942b5..dd4fe14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend2.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -24,13 +25,17 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
 import org.apache.hadoop.io.IOUtils;
@@ -61,11 +66,7 @@ public class TestFileAppend2 {
   final int numberOfFiles = 50;
   final int numThreads = 10;
   final int numAppendsPerThread = 20;
-/***
-  int numberOfFiles = 1;
-  int numThreads = 1;
-  int numAppendsPerThread = 2000;
-****/
+
   Workload[] workload = null;
   final ArrayList<Path> testFiles = new ArrayList<Path>();
   volatile static boolean globalStatus = true;
@@ -223,16 +224,170 @@ public class TestFileAppend2 {
     }
   }
 
+  /**
+   * Creates one file, writes a few bytes to it and then closed it.
+   * Reopens the same file for appending using append2 API, write all blocks and
+   * then close. Verify that all data exists in file.
+   */
+  @Test
+  public void testSimpleAppend2() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    if (simulatedStorage) {
+      SimulatedFSDataset.setFactory(conf);
+    }
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY, 50);
+    fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    DistributedFileSystem fs = cluster.getFileSystem();
+    try {
+      { // test appending to a file.
+        // create a new file.
+        Path file1 = new Path("/simpleAppend.dat");
+        FSDataOutputStream stm = AppendTestUtil.createFile(fs, file1, 1);
+        System.out.println("Created file simpleAppend.dat");
+
+        // write to file
+        int mid = 186;   // io.bytes.per.checksum bytes
+        System.out.println("Writing " + mid + " bytes to file " + file1);
+        stm.write(fileContents, 0, mid);
+        stm.close();
+        System.out.println("Wrote and Closed first part of file.");
+
+        // write to file
+        int mid2 = 607;   // io.bytes.per.checksum bytes
+        System.out.println("Writing " + mid + " bytes to file " + file1);
+        stm = fs.append(file1,
+            EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+        stm.write(fileContents, mid, mid2-mid);
+        stm.close();
+        System.out.println("Wrote and Closed second part of file.");
+
+        // write the remainder of the file
+        stm = fs.append(file1,
+            EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+        // ensure getPos is set to reflect existing size of the file
+        assertTrue(stm.getPos() > 0);
+        System.out.println("Writing " + (AppendTestUtil.FILE_SIZE - mid2) +
+            " bytes to file " + file1);
+        stm.write(fileContents, mid2, AppendTestUtil.FILE_SIZE - mid2);
+        System.out.println("Written second part of file");
+        stm.close();
+        System.out.println("Wrote and Closed second part of file.");
+
+        // verify that entire file is good
+        AppendTestUtil.checkFullFile(fs, file1, AppendTestUtil.FILE_SIZE,
+            fileContents, "Read 2");
+        // also make sure there three different blocks for the file
+        List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
+            file1.toString(), 0L).getLocatedBlocks();
+        assertEquals(12, blocks.size()); // the block size is 1024
+        assertEquals(mid, blocks.get(0).getBlockSize());
+        assertEquals(mid2 - mid, blocks.get(1).getBlockSize());
+        for (int i = 2; i < 11; i++) {
+          assertEquals(AppendTestUtil.BLOCK_SIZE, blocks.get(i).getBlockSize());
+        }
+        assertEquals((AppendTestUtil.FILE_SIZE - mid2)
+            % AppendTestUtil.BLOCK_SIZE, blocks.get(11).getBlockSize());
+      }
+
+      { // test appending to an non-existing file.
+        FSDataOutputStream out = null;
+        try {
+          out = fs.append(new Path("/non-existing.dat"),
+              EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+          fail("Expected to have FileNotFoundException");
+        } catch(java.io.FileNotFoundException fnfe) {
+          System.out.println("Good: got " + fnfe);
+          fnfe.printStackTrace(System.out);
+        } finally {
+          IOUtils.closeStream(out);
+        }
+      }
+
+      { // test append permission.
+        // set root to all writable
+        Path root = new Path("/");
+        fs.setPermission(root, new FsPermission((short)0777));
+        fs.close();
+
+        // login as a different user
+        final UserGroupInformation superuser =
+          UserGroupInformation.getCurrentUser();
+        String username = "testappenduser";
+        String group = "testappendgroup";
+        assertFalse(superuser.getShortUserName().equals(username));
+        assertFalse(Arrays.asList(superuser.getGroupNames()).contains(group));
+        UserGroupInformation appenduser = UserGroupInformation
+            .createUserForTesting(username, new String[] { group });
+
+        fs = (DistributedFileSystem) DFSTestUtil.getFileSystemAs(appenduser,
+            conf);
+
+        // create a file
+        Path dir = new Path(root, getClass().getSimpleName());
+        Path foo = new Path(dir, "foo.dat");
+        FSDataOutputStream out = null;
+        int offset = 0;
+        try {
+          out = fs.create(foo);
+          int len = 10 + AppendTestUtil.nextInt(100);
+          out.write(fileContents, offset, len);
+          offset += len;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+
+        // change dir and foo to minimal permissions.
+        fs.setPermission(dir, new FsPermission((short)0100));
+        fs.setPermission(foo, new FsPermission((short)0200));
+
+        // try append, should success
+        out = null;
+        try {
+          out = fs.append(foo,
+              EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+          int len = 10 + AppendTestUtil.nextInt(100);
+          out.write(fileContents, offset, len);
+          offset += len;
+        } finally {
+          IOUtils.closeStream(out);
+        }
+
+        // change dir and foo to all but no write on foo.
+        fs.setPermission(foo, new FsPermission((short)0577));
+        fs.setPermission(dir, new FsPermission((short)0777));
+
+        // try append, should fail
+        out = null;
+        try {
+          out = fs.append(foo,
+              EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+          fail("Expected to have AccessControlException");
+        } catch(AccessControlException ace) {
+          System.out.println("Good: got " + ace);
+          ace.printStackTrace(System.out);
+        } finally {
+          IOUtils.closeStream(out);
+        }
+      }
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
   //
   // an object that does a bunch of appends to files
   //
   class Workload extends Thread {
     private final int id;
     private final MiniDFSCluster cluster;
+    private final boolean appendToNewBlock;
 
-    Workload(MiniDFSCluster cluster, int threadIndex) {
+    Workload(MiniDFSCluster cluster, int threadIndex, boolean append2) {
       id = threadIndex;
       this.cluster = cluster;
+      this.appendToNewBlock = append2;
     }
 
     // create a bunch of files. Write to them and then verify.
@@ -255,7 +410,7 @@ public class TestFileAppend2 {
         long len = 0;
         int sizeToAppend = 0;
         try {
-          FileSystem fs = cluster.getFileSystem();
+          DistributedFileSystem fs = cluster.getFileSystem();
 
           // add a random number of bytes to file
           len = fs.getFileStatus(testfile).getLen();
@@ -279,7 +434,9 @@ public class TestFileAppend2 {
                              " appending " + sizeToAppend + " bytes " +
                              " to file " + testfile +
                              " of size " + len);
-          FSDataOutputStream stm = fs.append(testfile);
+          FSDataOutputStream stm = appendToNewBlock ? fs.append(testfile,
+                  EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null)
+              : fs.append(testfile);
           stm.write(fileContents, (int)len, sizeToAppend);
           stm.close();
 
@@ -292,7 +449,7 @@ public class TestFileAppend2 {
                                  " expected size " + (len + sizeToAppend) +
                                  " waiting for namenode metadata update.");
               Thread.sleep(5000);
-            } catch (InterruptedException e) {;}
+            } catch (InterruptedException e) {}
           }
 
           assertTrue("File " + testfile + " size is " + 
@@ -300,7 +457,7 @@ public class TestFileAppend2 {
                      " but expected " + (len + sizeToAppend),
                     fs.getFileStatus(testfile).getLen() == (len + sizeToAppend));
 
-          AppendTestUtil.checkFullFile(fs, testfile, (int)(len + sizeToAppend),
+          AppendTestUtil.checkFullFile(fs, testfile, (int) (len + sizeToAppend),
               fileContents, "Read 2");
         } catch (Throwable e) {
           globalStatus = false;
@@ -325,10 +482,8 @@ public class TestFileAppend2 {
 
   /**
    * Test that appends to files at random offsets.
-   * @throws IOException an exception might be thrown
    */
-  @Test
-  public void testComplexAppend() throws IOException {
+  private void testComplexAppend(boolean appendToNewBlock) throws IOException {
     fileContents = AppendTestUtil.initBuffer(AppendTestUtil.FILE_SIZE);
     Configuration conf = new HdfsConfiguration();
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
@@ -360,7 +515,7 @@ public class TestFileAppend2 {
       // Create threads and make them run workload concurrently.
       workload = new Workload[numThreads];
       for (int i = 0; i < numThreads; i++) {
-        workload[i] = new Workload(cluster, i);
+        workload[i] = new Workload(cluster, i, appendToNewBlock);
         workload[i].start();
       }
 
@@ -384,4 +539,14 @@ public class TestFileAppend2 {
     //
     assertTrue("testComplexAppend Worker encountered exceptions.", globalStatus);
   }
+
+  @Test
+  public void testComplexAppend() throws IOException {
+    testComplexAppend(false);
+  }
+
+  @Test
+  public void testComplexAppend2() throws IOException {
+    testComplexAppend(true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
index 84912f1..6fbe37f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend3.java
@@ -24,20 +24,20 @@ import static org.junit.Assert.fail;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.fs.CreateFlag;
 import org.mockito.invocation.InvocationOnMock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import org.mockito.stubbing.Answer;
 
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSClientAdapter;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -116,6 +117,32 @@ public class TestFileAppend3  {
     AppendTestUtil.check(fs, p, len1 + len2);
   }
 
+  @Test
+  public void testTC1ForAppend2() throws Exception {
+    final Path p = new Path("/TC1/foo2");
+
+    //a. Create file and write one block of data. Close file.
+    final int len1 = (int) BLOCK_SIZE;
+    {
+      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+          BLOCK_SIZE);
+      AppendTestUtil.write(out, 0, len1);
+      out.close();
+    }
+
+    // Reopen file to append. Append half block of data. Close file.
+    final int len2 = (int) BLOCK_SIZE / 2;
+    {
+      FSDataOutputStream out = fs.append(p,
+          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+      AppendTestUtil.write(out, len1, len2);
+      out.close();
+    }
+
+    // b. Reopen file and read 1.5 blocks worth of data. Close file.
+    AppendTestUtil.check(fs, p, len1 + len2);
+  }
+
   /**
    * TC2: Append on non-block boundary.
    * @throws IOException an exception might be thrown
@@ -147,6 +174,40 @@ public class TestFileAppend3  {
     AppendTestUtil.check(fs, p, len1 + len2);
   }
 
+  @Test
+  public void testTC2ForAppend2() throws Exception {
+    final Path p = new Path("/TC2/foo2");
+
+    //a. Create file with one and a half block of data. Close file.
+    final int len1 = (int) (BLOCK_SIZE + BLOCK_SIZE / 2);
+    {
+      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+          BLOCK_SIZE);
+      AppendTestUtil.write(out, 0, len1);
+      out.close();
+    }
+
+    AppendTestUtil.check(fs, p, len1);
+
+    //   Reopen file to append quarter block of data. Close file.
+    final int len2 = (int) BLOCK_SIZE / 4;
+    {
+      FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+          4096, null);
+      AppendTestUtil.write(out, len1, len2);
+      out.close();
+    }
+
+    // b. Reopen file and read 1.75 blocks of data. Close file.
+    AppendTestUtil.check(fs, p, len1 + len2);
+    List<LocatedBlock> blocks = fs.getClient().getLocatedBlocks(
+        p.toString(), 0L).getLocatedBlocks();
+    Assert.assertEquals(3, blocks.size());
+    Assert.assertEquals(BLOCK_SIZE, blocks.get(0).getBlockSize());
+    Assert.assertEquals(BLOCK_SIZE / 2, blocks.get(1).getBlockSize());
+    Assert.assertEquals(BLOCK_SIZE / 4, blocks.get(2).getBlockSize());
+  }
+
   /**
    * TC5: Only one simultaneous append.
    * @throws IOException an exception might be thrown
@@ -174,18 +235,63 @@ public class TestFileAppend3  {
       AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
     }
 
+    try {
+      ((DistributedFileSystem) AppendTestUtil
+          .createHdfsWithDifferentUsername(conf)).append(p,
+          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+      fail("This should fail.");
+    } catch(IOException ioe) {
+      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+    }
+
     //d. On Machine M1, close file.
     out.close();        
   }
 
+  @Test
+  public void testTC5ForAppend2() throws Exception {
+    final Path p = new Path("/TC5/foo2");
+
+    // a. Create file on Machine M1. Write half block to it. Close file.
+    {
+      FSDataOutputStream out = fs.create(p, false, buffersize, REPLICATION,
+          BLOCK_SIZE);
+      AppendTestUtil.write(out, 0, (int)(BLOCK_SIZE/2));
+      out.close();
+    }
+
+    // b. Reopen file in "append" mode on Machine M1.
+    FSDataOutputStream out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK),
+        4096, null);
+
+    // c. On Machine M2, reopen file in "append" mode. This should fail.
+    try {
+      ((DistributedFileSystem) AppendTestUtil
+          .createHdfsWithDifferentUsername(conf)).append(p,
+          EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null);
+      fail("This should fail.");
+    } catch(IOException ioe) {
+      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+    }
+
+    try {
+      AppendTestUtil.createHdfsWithDifferentUsername(conf).append(p);
+      fail("This should fail.");
+    } catch(IOException ioe) {
+      AppendTestUtil.LOG.info("GOOD: got an exception", ioe);
+    }
+
+    // d. On Machine M1, close file.
+    out.close();
+  }
+
   /**
    * TC7: Corrupted replicas are present.
    * @throws IOException an exception might be thrown
    */
-  @Test
-  public void testTC7() throws Exception {
+  private void testTC7(boolean appendToNewBlock) throws Exception {
     final short repl = 2;
-    final Path p = new Path("/TC7/foo");
+    final Path p = new Path("/TC7/foo" + (appendToNewBlock ? "0" : "1"));
     System.out.println("p=" + p);
     
     //a. Create file with replication factor of 2. Write half block of data. Close file.
@@ -219,7 +325,8 @@ public class TestFileAppend3  {
     //c. Open file in "append mode".  Append a new block worth of data. Close file.
     final int len2 = (int)BLOCK_SIZE; 
     {
-      FSDataOutputStream out = fs.append(p);
+      FSDataOutputStream out = appendToNewBlock ?
+          fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) : fs.append(p);
       AppendTestUtil.write(out, len1, len2);
       out.close();
     }
@@ -228,13 +335,21 @@ public class TestFileAppend3  {
     AppendTestUtil.check(fs, p, len1 + len2);
   }
 
+  @Test
+  public void testTC7() throws Exception {
+    testTC7(false);
+  }
+
+  @Test
+  public void testTC7ForAppend2() throws Exception {
+    testTC7(true);
+  }
+
   /**
    * TC11: Racing rename
-   * @throws IOException an exception might be thrown
    */
-  @Test
-  public void testTC11() throws Exception {
-    final Path p = new Path("/TC11/foo");
+  private void testTC11(boolean appendToNewBlock) throws Exception {
+    final Path p = new Path("/TC11/foo" + (appendToNewBlock ? "0" : "1"));
     System.out.println("p=" + p);
 
     //a. Create file and write one block of data. Close file.
@@ -246,7 +361,9 @@ public class TestFileAppend3  {
     }
 
     //b. Reopen file in "append" mode. Append half block of data.
-    FSDataOutputStream out = fs.append(p);
+    FSDataOutputStream out = appendToNewBlock ?
+        fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+        fs.append(p);
     final int len2 = (int)BLOCK_SIZE/2; 
     AppendTestUtil.write(out, len1, len2);
     out.hflush();
@@ -278,13 +395,21 @@ public class TestFileAppend3  {
     }
   }
 
+  @Test
+  public void testTC11() throws Exception {
+    testTC11(false);
+  }
+
+  @Test
+  public void testTC11ForAppend2() throws Exception {
+    testTC11(true);
+  }
+
   /** 
    * TC12: Append to partial CRC chunk
-   * @throws IOException an exception might be thrown
    */
-  @Test
-  public void testTC12() throws Exception {
-    final Path p = new Path("/TC12/foo");
+  private void testTC12(boolean appendToNewBlock) throws Exception {
+    final Path p = new Path("/TC12/foo" + (appendToNewBlock ? "0" : "1"));
     System.out.println("p=" + p);
     
     //a. Create file with a block size of 64KB
@@ -300,23 +425,43 @@ public class TestFileAppend3  {
     //b. Reopen file in "append" mode. Append another 5877 bytes of data. Close file.
     final int len2 = 5877; 
     {
-      FSDataOutputStream out = fs.append(p);
+      FSDataOutputStream out = appendToNewBlock ?
+          fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+          fs.append(p);
       AppendTestUtil.write(out, len1, len2);
       out.close();
     }
 
     //c. Reopen file and read 25687+5877 bytes of data from file. Close file.
     AppendTestUtil.check(fs, p, len1 + len2);
+    if (appendToNewBlock) {
+      LocatedBlocks blks = fs.dfs.getLocatedBlocks(p.toString(), 0);
+      Assert.assertEquals(2, blks.getLocatedBlocks().size());
+      Assert.assertEquals(len1, blks.getLocatedBlocks().get(0).getBlockSize());
+      Assert.assertEquals(len2, blks.getLocatedBlocks().get(1).getBlockSize());
+      AppendTestUtil.check(fs, p, 0, len1);
+      AppendTestUtil.check(fs, p, len1, len2);
+    }
   }
-  
-  /** Append to a partial CRC chunk and 
-   * the first write does not fill up the partial CRC trunk
-   * *
-   * @throws IOException
-   */
+
   @Test
-  public void testAppendToPartialChunk() throws IOException {
-    final Path p = new Path("/partialChunk/foo");
+  public void testTC12() throws Exception {
+    testTC12(false);
+  }
+
+  @Test
+  public void testTC12ForAppend2() throws Exception {
+    testTC12(true);
+  }
+
+  /**
+   * Append to a partial CRC chunk and the first write does not fill up the
+   * partial CRC trunk
+   */
+  private void testAppendToPartialChunk(boolean appendToNewBlock)
+      throws IOException {
+    final Path p = new Path("/partialChunk/foo"
+        + (appendToNewBlock ? "0" : "1"));
     final int fileLen = 513;
     System.out.println("p=" + p);
     
@@ -331,7 +476,9 @@ public class TestFileAppend3  {
     System.out.println("Wrote 1 byte and closed the file " + p);
 
     // append to file
-    stm = fs.append(p);
+    stm = appendToNewBlock ?
+        fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+        fs.append(p);
     // Append to a partial CRC trunk
     stm.write(fileContents, 1, 1);
     stm.hflush();
@@ -340,7 +487,9 @@ public class TestFileAppend3  {
     System.out.println("Append 1 byte and closed the file " + p);
 
     // write the remainder of the file
-    stm = fs.append(p);
+    stm = appendToNewBlock ?
+        fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.NEW_BLOCK), 4096, null) :
+        fs.append(p);
 
     // ensure getPos is set to reflect existing size of the file
     assertEquals(2, stm.getPos());
@@ -439,4 +588,14 @@ public class TestFileAppend3  {
     // if append was called with a stale file stat.
     doSmallAppends(file, fs, 20);
   }
+
+  @Test
+  public void testAppendToPartialChunk() throws IOException {
+    testAppendToPartialChunk(false);
+  }
+
+  @Test
+  public void testAppendToPartialChunkforAppend2() throws IOException {
+    testAppendToPartialChunk(true);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
index 0bca23d..a2b344c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppendRestart.java
@@ -99,10 +99,11 @@ public class TestFileAppendRestart {
       // OP_ADD to create file
       // OP_ADD_BLOCK for first block
       // OP_CLOSE to close file
-      // OP_ADD to reopen file
+      // OP_APPEND to reopen file
       // OP_ADD_BLOCK for second block
       // OP_CLOSE to close file
-      assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+      assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+      assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
       assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
       assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);
 
@@ -112,13 +113,14 @@ public class TestFileAppendRestart {
       // OP_ADD to create file
       // OP_ADD_BLOCK for first block
       // OP_CLOSE to close file
-      // OP_ADD to re-establish the lease
+      // OP_APPEND to re-establish the lease
       // OP_UPDATE_BLOCKS from the updatePipeline call (increments genstamp of last block)
       // OP_ADD_BLOCK at the start of the second block
       // OP_CLOSE to close file
       // Total: 2 OP_ADDs, 1 OP_UPDATE_BLOCKS, 2 OP_ADD_BLOCKs, and 2 OP_CLOSEs
        //       in addition to the ones above
-      assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+      assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_ADD).held);
+      assertEquals(2, (int)counts.get(FSEditLogOpCodes.OP_APPEND).held);
       assertEquals(1, (int)counts.get(FSEditLogOpCodes.OP_UPDATE_BLOCKS).held);
       assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_ADD_BLOCK).held);
       assertEquals(2+2, (int)counts.get(FSEditLogOpCodes.OP_CLOSE).held);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
index 7c3df7d..a33ad18 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHFlush.java
@@ -31,7 +31,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.log4j.Level;
 import org.junit.Test;
 
@@ -121,7 +123,66 @@ public class TestHFlush {
       cluster.shutdown();
     }
   }
-  
+
+  /**
+   * Test hsync with END_BLOCK flag.
+   */
+  @Test
+  public void hSyncEndBlock_00() throws IOException {
+    final int preferredBlockSize = 1024;
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, preferredBlockSize);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .build();
+    DistributedFileSystem fileSystem = cluster.getFileSystem();
+    FSDataOutputStream stm = null;
+    try {
+      Path path = new Path("/" + fName);
+      stm = fileSystem.create(path, true, 4096, (short) 2,
+          AppendTestUtil.BLOCK_SIZE);
+      System.out.println("Created file " + path.toString());
+      ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.END_BLOCK));
+      long currentFileLength = fileSystem.getFileStatus(path).getLen();
+      assertEquals(0L, currentFileLength);
+      LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+      assertEquals(0, blocks.getLocatedBlocks().size());
+
+      // write a block and call hsync(end_block) at the block boundary
+      stm.write(new byte[preferredBlockSize]);
+      ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.END_BLOCK));
+      currentFileLength = fileSystem.getFileStatus(path).getLen();
+      assertEquals(preferredBlockSize, currentFileLength);
+      blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+      assertEquals(1, blocks.getLocatedBlocks().size());
+
+      // call hsync then call hsync(end_block) immediately
+      stm.write(new byte[preferredBlockSize / 2]);
+      stm.hsync();
+      ((DFSOutputStream) stm.getWrappedStream()).hsync(EnumSet
+          .of(SyncFlag.END_BLOCK));
+      currentFileLength = fileSystem.getFileStatus(path).getLen();
+      assertEquals(preferredBlockSize + preferredBlockSize / 2,
+          currentFileLength);
+      blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+      assertEquals(2, blocks.getLocatedBlocks().size());
+
+      stm.write(new byte[preferredBlockSize / 4]);
+      stm.hsync();
+      currentFileLength = fileSystem.getFileStatus(path).getLen();
+      assertEquals(preferredBlockSize + preferredBlockSize / 2
+          + preferredBlockSize / 4, currentFileLength);
+      blocks = fileSystem.dfs.getLocatedBlocks(path.toString(), 0);
+      assertEquals(3, blocks.getLocatedBlocks().size());
+    } finally {
+      IOUtils.cleanup(null, stm, fileSystem);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
   /**
    * The test calls
    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@@ -136,6 +197,29 @@ public class TestHFlush {
   /**
    * The test calls
    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#END_BLOCK}.
+   */
+  @Test
+  public void hSyncEndBlock_01() throws IOException {
+    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+        (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK));
+  }
+
+  /**
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
+   * while requiring the semantic of {@link SyncFlag#END_BLOCK} and
+   * {@link SyncFlag#UPDATE_LENGTH}.
+   */
+  @Test
+  public void hSyncEndBlockAndUpdateLength() throws IOException {
+    doTheJob(new HdfsConfiguration(), fName, AppendTestUtil.BLOCK_SIZE,
+        (short) 2, true, EnumSet.of(SyncFlag.END_BLOCK, SyncFlag.UPDATE_LENGTH));
+  }
+
+  /**
+   * The test calls
+   * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
    * while requiring the semantic of {@link SyncFlag#UPDATE_LENGTH}.
    * Similar with {@link #hFlush_02()} , it writes a file with a custom block
    * size so the writes will be happening across block' boundaries
@@ -152,7 +236,20 @@ public class TestHFlush {
     doTheJob(conf, fName, customBlockSize, (short) 2, true,
         EnumSet.of(SyncFlag.UPDATE_LENGTH));
   }
-  
+
+  @Test
+  public void hSyncEndBlock_02() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    int customPerChecksumSize = 512;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short) 2, true,
+        EnumSet.of(SyncFlag.END_BLOCK));
+  }
+
   /**
    * The test calls
    * {@link #doTheJob(Configuration, String, long, short, boolean, EnumSet)}
@@ -173,7 +270,20 @@ public class TestHFlush {
     doTheJob(conf, fName, customBlockSize, (short) 2, true,
         EnumSet.of(SyncFlag.UPDATE_LENGTH));
   }
-  
+
+  @Test
+  public void hSyncEndBlock_03() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    int customPerChecksumSize = 400;
+    int customBlockSize = customPerChecksumSize * 3;
+    // Modify defaul filesystem settings
+    conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, customPerChecksumSize);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, customBlockSize);
+
+    doTheJob(conf, fName, customBlockSize, (short) 2, true,
+        EnumSet.of(SyncFlag.END_BLOCK));
+  }
+
   /**
    * The method starts new cluster with defined Configuration; creates a file
    * with specified block_size and writes 10 equal sections in it; it also calls
@@ -197,12 +307,13 @@ public class TestHFlush {
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
                                                .numDataNodes(replicas).build();
     // Make sure we work with DFS in order to utilize all its functionality
-    DistributedFileSystem fileSystem =
-        cluster.getFileSystem();
+    DistributedFileSystem fileSystem = cluster.getFileSystem();
 
     FSDataInputStream is;
     try {
       Path path = new Path(fileName);
+      final String pathName = new Path(fileSystem.getWorkingDirectory(), path)
+          .toUri().getPath();
       FSDataOutputStream stm = fileSystem.create(path, false, 4096, replicas,
           block_size);
       System.out.println("Created file " + fileName);
@@ -210,7 +321,8 @@ public class TestHFlush {
       int tenth = AppendTestUtil.FILE_SIZE/SECTIONS;
       int rounding = AppendTestUtil.FILE_SIZE - tenth * SECTIONS;
       for (int i=0; i<SECTIONS; i++) {
-        System.out.println("Writing " + (tenth * i) + " to " + (tenth * (i+1)) + " section to file " + fileName);
+        System.out.println("Writing " + (tenth * i) + " to "
+            + (tenth * (i + 1)) + " section to file " + fileName);
         // write to the file
         stm.write(fileContent, tenth * i, tenth);
         
@@ -227,7 +339,11 @@ public class TestHFlush {
           assertEquals(
             "File size doesn't match for hsync/hflush with updating the length",
             tenth * (i + 1), currentFileLength);
+        } else if (isSync && syncFlags.contains(SyncFlag.END_BLOCK)) {
+          LocatedBlocks blocks = fileSystem.dfs.getLocatedBlocks(pathName, 0);
+          assertEquals(i + 1, blocks.getLocatedBlocks().size());
         }
+
         byte [] toRead = new byte[tenth];
         byte [] expected = new byte[tenth];
         System.arraycopy(fileContent, tenth * i, expected, 0, tenth);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
index b84989f..15580a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLeaseRecovery.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Test;
@@ -124,7 +127,8 @@ public class TestLeaseRecovery {
     }
 
     DataNode.LOG.info("dfs.dfs.clientName=" + dfs.dfs.clientName);
-    cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName);
+    cluster.getNameNodeRpc().append(filestr, dfs.dfs.clientName,
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
 
     // expire lease to trigger block recovery.
     waitLeaseRecovery(cluster);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
index 84ac2a5..a4df4ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestLazyPersistFiles.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
@@ -28,6 +29,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -234,7 +236,8 @@ public class TestLazyPersistFiles extends LazyPersistTestCase {
     makeTestFile(path, BLOCK_SIZE, true);
 
     try {
-      client.append(path.toString(), BUFFER_LENGTH, null, null).close();
+      client.append(path.toString(), BUFFER_LENGTH,
+          EnumSet.of(CreateFlag.APPEND), null, null).close();
       fail("Append to LazyPersist file did not fail as expected");
     } catch (Throwable t) {
       LOG.info("Got expected exception ", t);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
index 6d1f452..ddf5a3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestHDFSConcat.java
@@ -40,9 +40,12 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -99,7 +102,7 @@ public class TestHDFSConcat {
     HdfsFileStatus fStatus;
     FSDataInputStream stm;
     
-    String trg = new String("/trg");
+    String trg = "/trg";
     Path trgPath = new Path(trg);
     DFSTestUtil.createFile(dfs, trgPath, fileLen, REPL_FACTOR, 1);
     fStatus  = nn.getFileInfo(trg);
@@ -112,7 +115,7 @@ public class TestHDFSConcat {
     long [] lens = new long [numFiles];
     
     
-    int i = 0;
+    int i;
     for(i=0; i<files.length; i++) {
       files[i] = new Path("/file"+i);
       Path path = files[i];
@@ -385,6 +388,75 @@ public class TestHDFSConcat {
     } catch (Exception e) {
       // exspected
     }
- 
+  }
+
+  /**
+   * make sure we update the quota correctly after concat
+   */
+  @Test
+  public void testConcatWithQuotaDecrease() throws IOException {
+    final short srcRepl = 3; // note this is different with REPL_FACTOR
+    final int srcNum = 10;
+    final Path foo = new Path("/foo");
+    final Path[] srcs = new Path[srcNum];
+    final Path target = new Path(foo, "target");
+    DFSTestUtil.createFile(dfs, target, blockSize, REPL_FACTOR, 0L);
+
+    dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
+
+    for (int i = 0; i < srcNum; i++) {
+      srcs[i] = new Path(foo, "src" + i);
+      DFSTestUtil.createFile(dfs, srcs[i], blockSize * 2, srcRepl, 0L);
+    }
+
+    ContentSummary summary = dfs.getContentSummary(foo);
+    Assert.assertEquals(11, summary.getFileCount());
+    Assert.assertEquals(blockSize * REPL_FACTOR +
+            blockSize * 2 * srcRepl * srcNum, summary.getSpaceConsumed());
+
+    dfs.concat(target, srcs);
+    summary = dfs.getContentSummary(foo);
+    Assert.assertEquals(1, summary.getFileCount());
+    Assert.assertEquals(
+        blockSize * REPL_FACTOR + blockSize * 2 * REPL_FACTOR * srcNum,
+        summary.getSpaceConsumed());
+  }
+
+  @Test
+  public void testConcatWithQuotaIncrease() throws IOException {
+    final short repl = 3;
+    final int srcNum = 10;
+    final Path foo = new Path("/foo");
+    final Path bar = new Path(foo, "bar");
+    final Path[] srcs = new Path[srcNum];
+    final Path target = new Path(bar, "target");
+    DFSTestUtil.createFile(dfs, target, blockSize, repl, 0L);
+
+    final long dsQuota = blockSize * repl + blockSize * srcNum * REPL_FACTOR;
+    dfs.setQuota(foo, Long.MAX_VALUE - 1, dsQuota);
+
+    for (int i = 0; i < srcNum; i++) {
+      srcs[i] = new Path(bar, "src" + i);
+      DFSTestUtil.createFile(dfs, srcs[i], blockSize, REPL_FACTOR, 0L);
+    }
+
+    ContentSummary summary = dfs.getContentSummary(bar);
+    Assert.assertEquals(11, summary.getFileCount());
+    Assert.assertEquals(dsQuota, summary.getSpaceConsumed());
+
+    try {
+      dfs.concat(target, srcs);
+      fail("QuotaExceededException expected");
+    } catch (RemoteException e) {
+      Assert.assertTrue(
+          e.unwrapRemoteException() instanceof QuotaExceededException);
+    }
+
+    dfs.setQuota(foo, Long.MAX_VALUE - 1, Long.MAX_VALUE - 1);
+    dfs.concat(target, srcs);
+    summary = dfs.getContentSummary(bar);
+    Assert.assertEquals(1, summary.getFileCount());
+    Assert.assertEquals(blockSize * repl * (srcNum + 1),
+        summary.getSpaceConsumed());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
index 3084f26..2e6b4a3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeRetryCache.java
@@ -232,14 +232,18 @@ public class TestNamenodeRetryCache {
     
     // Retried append requests succeed
     newCall();
-    LastBlockWithStatus b = nnRpc.append(src, "holder");
-    Assert.assertEquals(b, nnRpc.append(src, "holder"));
-    Assert.assertEquals(b, nnRpc.append(src, "holder"));
+    LastBlockWithStatus b = nnRpc.append(src, "holder",
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
+    Assert.assertEquals(b, nnRpc.append(src, "holder",
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
+    Assert.assertEquals(b, nnRpc.append(src, "holder",
+        new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))));
     
     // non-retried call fails
     newCall();
     try {
-      nnRpc.append(src, "holder");
+      nnRpc.append(src, "holder",
+          new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
       Assert.fail("testAppend - expected exception is not thrown");
     } catch (Exception e) {
       // Expected
@@ -409,7 +413,7 @@ public class TestNamenodeRetryCache {
 
     LightWeightCache<CacheEntry, CacheEntry> cacheSet = 
         (LightWeightCache<CacheEntry, CacheEntry>) namesystem.getRetryCache().getCacheSet();
-    assertEquals(24, cacheSet.size());
+    assertEquals(25, cacheSet.size());
     
     Map<CacheEntry, CacheEntry> oldEntries = 
         new HashMap<CacheEntry, CacheEntry>();
@@ -428,7 +432,7 @@ public class TestNamenodeRetryCache {
     assertTrue(namesystem.hasRetryCache());
     cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) namesystem
         .getRetryCache().getCacheSet();
-    assertEquals(24, cacheSet.size());
+    assertEquals(25, cacheSet.size());
     iter = cacheSet.iterator();
     while (iter.hasNext()) {
       CacheEntry entry = iter.next();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
index 7579f6e..c0d320c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRetryCacheWithHA.java
@@ -163,7 +163,7 @@ public class TestRetryCacheWithHA {
     FSNamesystem fsn0 = cluster.getNamesystem(0);
     LightWeightCache<CacheEntry, CacheEntry> cacheSet = 
         (LightWeightCache<CacheEntry, CacheEntry>) fsn0.getRetryCache().getCacheSet();
-    assertEquals(24, cacheSet.size());
+    assertEquals(25, cacheSet.size());
     
     Map<CacheEntry, CacheEntry> oldEntries = 
         new HashMap<CacheEntry, CacheEntry>();
@@ -184,7 +184,7 @@ public class TestRetryCacheWithHA {
     FSNamesystem fsn1 = cluster.getNamesystem(1);
     cacheSet = (LightWeightCache<CacheEntry, CacheEntry>) fsn1
         .getRetryCache().getCacheSet();
-    assertEquals(24, cacheSet.size());
+    assertEquals(25, cacheSet.size());
     iter = cacheSet.iterator();
     while (iter.hasNext()) {
       CacheEntry entry = iter.next();
@@ -438,7 +438,8 @@ public class TestRetryCacheWithHA {
 
     @Override
     void invoke() throws Exception {
-      lbk = client.getNamenode().append(fileName, client.getClientName());
+      lbk = client.getNamenode().append(fileName, client.getClientName(),
+          new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
     }
     
     // check if the inode of the file is under construction
@@ -701,7 +702,8 @@ public class TestRetryCacheWithHA {
       final Path filePath = new Path(file);
       DFSTestUtil.createFile(dfs, filePath, BlockSize, DataNodes, 0);
       // append to the file and leave the last block under construction
-      out = this.client.append(file, BlockSize, null, null);
+      out = this.client.append(file, BlockSize, EnumSet.of(CreateFlag.APPEND),
+          null, null);
       byte[] appendContent = new byte[100];
       new Random().nextBytes(appendContent);
       out.write(appendContent);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2156e38d/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
index dce3f47..da8c190 100644
Binary files a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored and b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored differ


Mime
View raw message