hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject hadoop git commit: HDFS-11011. Add unit tests for HDFS command 'dfsadmin -set/clrSpaceQuota'. Contributed by Xiaobing Zhou.
Date Tue, 25 Oct 2016 18:24:55 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 262518fa5 -> e0fc749a4


HDFS-11011. Add unit tests for HDFS command 'dfsadmin -set/clrSpaceQuota'. Contributed by Xiaobing Zhou.

(cherry picked from commit 85f6fec0370b3cd94d6c2f19920c0b6d33f127c6)
(cherry picked from commit c2ba4b650168be4fad7ffdb546b8a0d985268393)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e0fc749a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e0fc749a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e0fc749a

Branch: refs/heads/branch-2.8
Commit: e0fc749a4632b8b61f25ecfb724f3c82dc54b994
Parents: 262518f
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Tue Oct 25 11:12:04 2016 -0700
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Tue Oct 25 11:22:07 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/TestQuota.java  | 2059 +++++++++++-------
 .../apache/hadoop/hdfs/tools/TestDFSAdmin.java  |    3 +
 2 files changed, 1223 insertions(+), 839 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0fc749a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
index 6563dac..e32d830 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
@@ -18,8 +18,12 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -28,6 +32,8 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
 import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
@@ -46,14 +52,91 @@ import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.hdfs.web.WebHdfsConstants;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
 import org.junit.Assert;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
 
 /** A class for testing quota-related commands */
 public class TestQuota {
   
+  private static Configuration conf = null;
+  private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream();
+  private static final ByteArrayOutputStream ERR_STREAM = new ByteArrayOutputStream();
+  private static final PrintStream OLD_OUT = System.out;
+  private static final PrintStream OLD_ERR = System.err;
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem dfs;
+  private static FileSystem webhdfs;
+  /* set a smaller block size so that we can test with smaller space quotas */
+  private static final int DEFAULT_BLOCK_SIZE = 512;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    conf = new HdfsConfiguration();
+    conf.set(
+        MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+        GenericTestUtils.getTestDir("my-test-quota").getAbsolutePath());
+    conf.setInt("dfs.content-summary.limit", 4);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
+    /*
+     * Make it relinquish locks. When run serially, the result should be
+     * identical.
+     */
+    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
+    restartCluster();
+
+    dfs = (DistributedFileSystem) cluster.getFileSystem();
+    redirectStream();
+
+    final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    final String webhdfsuri = WebHdfsConstants.WEBHDFS_SCHEME + "://" + nnAddr;
+    System.out.println("webhdfsuri=" + webhdfsuri);
+    webhdfs = new Path(webhdfsuri).getFileSystem(conf);
+  }
+
+  private static void restartCluster() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster.waitActive();
+  }
+
+  private static void redirectStream() {
+    System.setOut(new PrintStream(OUT_STREAM));
+    System.setErr(new PrintStream(ERR_STREAM));
+  }
+
+  private static void resetStream() {
+    OUT_STREAM.reset();
+    ERR_STREAM.reset();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    try {
+      System.out.flush();
+      System.err.flush();
+    } finally {
+      System.setOut(OLD_OUT);
+      System.setErr(OLD_ERR);
+    }
+
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+
+    resetStream();
+  }
+
   private void runCommand(DFSAdmin admin, boolean expectError, String... args) 
                          throws Exception {
     runCommand(admin, args, expectError);
@@ -91,473 +174,459 @@ public class TestQuota {
    */
   @Test
   public void testQuotaCommands() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    // set a smaller block size so that we can test with smaller 
-    // Space quotas
-    final int DEFAULT_BLOCK_SIZE = 512;
-    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
-    // Make it relinquish locks. When run serially, the result should
-    // be identical.
-    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    final FileSystem fs = cluster.getFileSystem();
-    assertTrue("Not a HDFS: "+fs.getUri(),
-                fs instanceof DistributedFileSystem);
-    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
     DFSAdmin admin = new DFSAdmin(conf);
+    final Path dir = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+
+    final int fileLen = 1024;
+    final short replication = 5;
+    final long spaceQuota = fileLen * replication * 15 / 8;
+
+    // 1: create a directory test and set its quota to be 3
+    final Path parent = new Path(dir, "test");
+    assertTrue(dfs.mkdirs(parent));
+    String[] args = new String[]{"-setQuota", "3", parent.toString()};
+    runCommand(admin, args, false);
+
+    //try setting space quota with a 'binary prefix'
+    runCommand(admin, false, "-setSpaceQuota", "2t", parent.toString());
+    assertEquals(2L<<40, dfs.getContentSummary(parent).getSpaceQuota());
+
+    // set diskspace quota to 10000
+    runCommand(admin, false, "-setSpaceQuota",
+               Long.toString(spaceQuota), parent.toString());
+
+    // 2: create directory /test/data0
+    final Path childDir0 = new Path(parent, "data0");
+    assertTrue(dfs.mkdirs(childDir0));
+
+    // 3: create a file /test/datafile0
+    final Path childFile0 = new Path(parent, "datafile0");
+    DFSTestUtil.createFile(dfs, childFile0, fileLen, replication, 0);
     
+    // 4: count -q /test
+    ContentSummary c = dfs.getContentSummary(parent);
+    compareQuotaUsage(c, dfs, parent);
+    assertEquals(c.getFileCount()+c.getDirectoryCount(), 3);
+    assertEquals(c.getQuota(), 3);
+    assertEquals(c.getSpaceConsumed(), fileLen*replication);
+    assertEquals(c.getSpaceQuota(), spaceQuota);
+
+    // 5: count -q /test/data0
+    c = dfs.getContentSummary(childDir0);
+    compareQuotaUsage(c, dfs, childDir0);
+    assertEquals(c.getFileCount()+c.getDirectoryCount(), 1);
+    assertEquals(c.getQuota(), -1);
+    // check disk space consumed
+    c = dfs.getContentSummary(parent);
+    compareQuotaUsage(c, dfs, parent);
+    assertEquals(c.getSpaceConsumed(), fileLen*replication);
+
+    // 6: create a directory /test/data1
+    final Path childDir1 = new Path(parent, "data1");
+    boolean hasException = false;
     try {
-      final int fileLen = 1024;
-      final short replication = 5;
-      final long spaceQuota = fileLen * replication * 15 / 8;
-
-      // 1: create a directory /test and set its quota to be 3
-      final Path parent = new Path("/test");
-      assertTrue(dfs.mkdirs(parent));
-      String[] args = new String[]{"-setQuota", "3", parent.toString()};
-      runCommand(admin, args, false);
-
-      //try setting space quota with a 'binary prefix'
-      runCommand(admin, false, "-setSpaceQuota", "2t", parent.toString());
-      assertEquals(2L<<40, dfs.getContentSummary(parent).getSpaceQuota());
-      
-      // set diskspace quota to 10000 
-      runCommand(admin, false, "-setSpaceQuota", 
-                 Long.toString(spaceQuota), parent.toString());
-      
-      // 2: create directory /test/data0
-      final Path childDir0 = new Path(parent, "data0");
-      assertTrue(dfs.mkdirs(childDir0));
+      assertFalse(dfs.mkdirs(childDir1));
+    } catch (QuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
 
-      // 3: create a file /test/datafile0
-      final Path childFile0 = new Path(parent, "datafile0");
-      DFSTestUtil.createFile(fs, childFile0, fileLen, replication, 0);
-      
-      // 4: count -q /test
-      ContentSummary c = dfs.getContentSummary(parent);
-      compareQuotaUsage(c, dfs, parent);
-      assertEquals(c.getFileCount()+c.getDirectoryCount(), 3);
-      assertEquals(c.getQuota(), 3);
-      assertEquals(c.getSpaceConsumed(), fileLen*replication);
-      assertEquals(c.getSpaceQuota(), spaceQuota);
-      
-      // 5: count -q /test/data0
-      c = dfs.getContentSummary(childDir0);
-      compareQuotaUsage(c, dfs, childDir0);
-      assertEquals(c.getFileCount()+c.getDirectoryCount(), 1);
-      assertEquals(c.getQuota(), -1);
-      // check disk space consumed
-      c = dfs.getContentSummary(parent);
-      compareQuotaUsage(c, dfs, parent);
-      assertEquals(c.getSpaceConsumed(), fileLen*replication);
-
-      // 6: create a directory /test/data1
-      final Path childDir1 = new Path(parent, "data1");
-      boolean hasException = false;
-      try {
-        assertFalse(dfs.mkdirs(childDir1));
-      } catch (QuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      
-      OutputStream fout;
-      
-      // 7: create a file /test/datafile1
-      final Path childFile1 = new Path(parent, "datafile1");
-      hasException = false;
-      try {
-        fout = dfs.create(childFile1);
-      } catch (QuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      
-      // 8: clear quota /test
-      runCommand(admin, new String[]{"-clrQuota", parent.toString()}, false);
-      c = dfs.getContentSummary(parent);
-      compareQuotaUsage(c, dfs, parent);
-      assertEquals(c.getQuota(), -1);
-      assertEquals(c.getSpaceQuota(), spaceQuota);
-      
-      // 9: clear quota /test/data0
-      runCommand(admin, new String[]{"-clrQuota", childDir0.toString()}, false);
-      c = dfs.getContentSummary(childDir0);
-      compareQuotaUsage(c, dfs, childDir0);
-      assertEquals(c.getQuota(), -1);
-      
-      // 10: create a file /test/datafile1
-      fout = dfs.create(childFile1, replication);
-      
-      // 10.s: but writing fileLen bytes should result in an quota exception
-      try {
-        fout.write(new byte[fileLen]);
-        fout.close();
-        Assert.fail();
-      } catch (QuotaExceededException e) {
-        IOUtils.closeStream(fout);
-      }
-      
-      //delete the file
-      dfs.delete(childFile1, false);
-      
-      // 9.s: clear diskspace quota
-      runCommand(admin, false, "-clrSpaceQuota", parent.toString());
-      c = dfs.getContentSummary(parent);
-      compareQuotaUsage(c, dfs, parent);
-      assertEquals(c.getQuota(), -1);
-      assertEquals(c.getSpaceQuota(), -1);       
-      
-      // now creating childFile1 should succeed
-      DFSTestUtil.createFile(dfs, childFile1, fileLen, replication, 0);
-      
-      // 11: set the quota of /test to be 1
-      // HADOOP-5872 - we can set quota even if it is immediately violated 
-      args = new String[]{"-setQuota", "1", parent.toString()};
-      runCommand(admin, args, false);
-      runCommand(admin, false, "-setSpaceQuota",  // for space quota
-                 Integer.toString(fileLen), args[2]);
-      
-      // 12: set the quota of /test/data0 to be 1
-      args = new String[]{"-setQuota", "1", childDir0.toString()};
-      runCommand(admin, args, false);
-      
-      // 13: not able create a directory under data0
-      hasException = false;
-      try {
-        assertFalse(dfs.mkdirs(new Path(childDir0, "in")));
-      } catch (QuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      c = dfs.getContentSummary(childDir0);
-      compareQuotaUsage(c, dfs, childDir0);
-      assertEquals(c.getDirectoryCount()+c.getFileCount(), 1);
-      assertEquals(c.getQuota(), 1);
-      
-      // 14a: set quota on a non-existent directory
-      Path nonExistentPath = new Path("/test1");
-      assertFalse(dfs.exists(nonExistentPath));
-      args = new String[]{"-setQuota", "1", nonExistentPath.toString()};
-      runCommand(admin, args, true);
-      runCommand(admin, true, "-setSpaceQuota", "1g", // for space quota
-                 nonExistentPath.toString());
-      
-      // 14b: set quota on a file
-      assertTrue(dfs.isFile(childFile0));
-      args[1] = childFile0.toString();
-      runCommand(admin, args, true);
-      // same for space quota
-      runCommand(admin, true, "-setSpaceQuota", "1t", args[1]);
-      
-      // 15a: clear quota on a file
-      args[0] = "-clrQuota";
-      runCommand(admin, args, true);
-      runCommand(admin, true, "-clrSpaceQuota", args[1]);
-      
-      // 15b: clear quota on a non-existent directory
-      args[1] = nonExistentPath.toString();
-      runCommand(admin, args, true);
-      runCommand(admin, true, "-clrSpaceQuota", args[1]);
-      
-      // 16a: set the quota of /test to be 0
-      args = new String[]{"-setQuota", "0", parent.toString()};
-      runCommand(admin, args, true);
-      runCommand(admin, false, "-setSpaceQuota", "0", args[2]);
-      
-      // 16b: set the quota of /test to be -1
-      args[1] = "-1";
-      runCommand(admin, args, true);
-      runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
-      
-      // 16c: set the quota of /test to be Long.MAX_VALUE+1
-      args[1] = String.valueOf(Long.MAX_VALUE+1L);
-      runCommand(admin, args, true);
-      runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
-      
-      // 16d: set the quota of /test to be a non integer
-      args[1] = "33aa1.5";
-      runCommand(admin, args, true);
-      runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
-      
-      // 16e: set space quota with a value larger than Long.MAX_VALUE
-      runCommand(admin, true, "-setSpaceQuota", 
-                 (Long.MAX_VALUE/1024/1024 + 1024) + "m", args[2]);
-      
-      // 17:  setQuota by a non-administrator
-      final String username = "userxx";
-      UserGroupInformation ugi = 
-        UserGroupInformation.createUserForTesting(username, 
-                                                  new String[]{"groupyy"});
-      
-      final String[] args2 = args.clone(); // need final ref for doAs block
-      ugi.doAs(new PrivilegedExceptionAction<Object>() {
-        @Override
-        public Object run() throws Exception {
-          assertEquals("Not running as new user", username, 
-              UserGroupInformation.getCurrentUser().getShortUserName());
-          DFSAdmin userAdmin = new DFSAdmin(conf);
-          
-          args2[1] = "100";
-          runCommand(userAdmin, args2, true);
-          runCommand(userAdmin, true, "-setSpaceQuota", "1g", args2[2]);
-          
-          // 18: clrQuota by a non-administrator
-          String[] args3 = new String[] {"-clrQuota", parent.toString()};
-          runCommand(userAdmin, args3, true);
-          runCommand(userAdmin, true, "-clrSpaceQuota",  args3[1]); 
-          
-          return null;
-        }
-      });
-
-      // 19: clrQuota on the root directory ("/") should fail
-      runCommand(admin, true, "-clrQuota", "/");
-
-      // 20: setQuota on the root directory ("/") should succeed
-      runCommand(admin, false, "-setQuota", "1000000", "/");
-
-      runCommand(admin, true, "-clrQuota", "/");
-      runCommand(admin, false, "-clrSpaceQuota", "/");
-      runCommand(admin, new String[]{"-clrQuota", parent.toString()}, false);
-      runCommand(admin, false, "-clrSpaceQuota", parent.toString());
-
-
-      // 2: create directory /test/data2
-      final Path childDir2 = new Path(parent, "data2");
-      assertTrue(dfs.mkdirs(childDir2));
-
-
-      final Path childFile2 = new Path(childDir2, "datafile2");
-      final Path childFile3 = new Path(childDir2, "datafile3");
-      final long spaceQuota2 = DEFAULT_BLOCK_SIZE * replication;
-      final long fileLen2 = DEFAULT_BLOCK_SIZE;
-      // set space quota to a real low value 
-      runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), childDir2.toString());
-      // clear space quota
-      runCommand(admin, false, "-clrSpaceQuota", childDir2.toString());
-      // create a file that is greater than the size of space quota
-      DFSTestUtil.createFile(fs, childFile2, fileLen2, replication, 0);
-
-      // now set space quota again. This should succeed
-      runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), childDir2.toString());
-
-      hasException = false;
-      try {
-        DFSTestUtil.createFile(fs, childFile3, fileLen2, replication, 0);
-      } catch (DSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
+    OutputStream fout;
+
+    // 7: create a file /test/datafile1
+    final Path childFile1 = new Path(parent, "datafile1");
+    hasException = false;
+    try {
+      fout = dfs.create(childFile1);
+    } catch (QuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
 
-      // now test the same for root
-      final Path childFile4 = new Path("/", "datafile2");
-      final Path childFile5 = new Path("/", "datafile3");
+    // 8: clear quota /test
+    runCommand(admin, new String[]{"-clrQuota", parent.toString()}, false);
+    c = dfs.getContentSummary(parent);
+    compareQuotaUsage(c, dfs, parent);
+    assertEquals(c.getQuota(), -1);
+    assertEquals(c.getSpaceQuota(), spaceQuota);
 
-      runCommand(admin, true, "-clrQuota", "/");
-      runCommand(admin, false, "-clrSpaceQuota", "/");
-      // set space quota to a real low value 
-      runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), "/");
-      runCommand(admin, false, "-clrSpaceQuota", "/");
-      DFSTestUtil.createFile(fs, childFile4, fileLen2, replication, 0);
-      runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), "/");
+    // 9: clear quota /test/data0
+    runCommand(admin, new String[]{"-clrQuota", childDir0.toString()}, false);
+    c = dfs.getContentSummary(childDir0);
+    compareQuotaUsage(c, dfs, childDir0);
+    assertEquals(c.getQuota(), -1);
 
-      hasException = false;
-      try {
-        DFSTestUtil.createFile(fs, childFile5, fileLen2, replication, 0);
-      } catch (DSQuotaExceededException e) {
-        hasException = true;
+    // 10: create a file /test/datafile1
+    fout = dfs.create(childFile1, replication);
+
+    // 10.s: but writing fileLen bytes should result in an quota exception
+    try {
+      fout.write(new byte[fileLen]);
+      fout.close();
+      Assert.fail();
+    } catch (QuotaExceededException e) {
+      IOUtils.closeStream(fout);
+    }
+
+    //delete the file
+    dfs.delete(childFile1, false);
+
+    // 9.s: clear diskspace quota
+    runCommand(admin, false, "-clrSpaceQuota", parent.toString());
+    c = dfs.getContentSummary(parent);
+    compareQuotaUsage(c, dfs, parent);
+    assertEquals(c.getQuota(), -1);
+    assertEquals(c.getSpaceQuota(), -1);
+
+    // now creating childFile1 should succeed
+    DFSTestUtil.createFile(dfs, childFile1, fileLen, replication, 0);
+
+    // 11: set the quota of /test to be 1
+    // HADOOP-5872 - we can set quota even if it is immediately violated
+    args = new String[]{"-setQuota", "1", parent.toString()};
+    runCommand(admin, args, false);
+    runCommand(admin, false, "-setSpaceQuota",  // for space quota
+               Integer.toString(fileLen), args[2]);
+
+    // 12: set the quota of /test/data0 to be 1
+    args = new String[]{"-setQuota", "1", childDir0.toString()};
+    runCommand(admin, args, false);
+
+    // 13: not able create a directory under data0
+    hasException = false;
+    try {
+      assertFalse(dfs.mkdirs(new Path(childDir0, "in")));
+    } catch (QuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+    c = dfs.getContentSummary(childDir0);
+    compareQuotaUsage(c, dfs, childDir0);
+    assertEquals(c.getDirectoryCount()+c.getFileCount(), 1);
+    assertEquals(c.getQuota(), 1);
+
+    // 14a: set quota on a non-existent directory
+    Path nonExistentPath = new Path(dir, "test1");
+    assertFalse(dfs.exists(nonExistentPath));
+    args = new String[]{"-setQuota", "1", nonExistentPath.toString()};
+    runCommand(admin, args, true);
+    runCommand(admin, true, "-setSpaceQuota", "1g", // for space quota
+               nonExistentPath.toString());
+
+    // 14b: set quota on a file
+    assertTrue(dfs.isFile(childFile0));
+    args[1] = childFile0.toString();
+    runCommand(admin, args, true);
+    // same for space quota
+    runCommand(admin, true, "-setSpaceQuota", "1t", args[1]);
+
+    // 15a: clear quota on a file
+    args[0] = "-clrQuota";
+    runCommand(admin, args, true);
+    runCommand(admin, true, "-clrSpaceQuota", args[1]);
+
+    // 15b: clear quota on a non-existent directory
+    args[1] = nonExistentPath.toString();
+    runCommand(admin, args, true);
+    runCommand(admin, true, "-clrSpaceQuota", args[1]);
+
+    // 16a: set the quota of /test to be 0
+    args = new String[]{"-setQuota", "0", parent.toString()};
+    runCommand(admin, args, true);
+    runCommand(admin, false, "-setSpaceQuota", "0", args[2]);
+
+    // 16b: set the quota of /test to be -1
+    args[1] = "-1";
+    runCommand(admin, args, true);
+    runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
+
+    // 16c: set the quota of /test to be Long.MAX_VALUE+1
+    args[1] = String.valueOf(Long.MAX_VALUE+1L);
+    runCommand(admin, args, true);
+    runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
+
+    // 16d: set the quota of /test to be a non integer
+    args[1] = "33aa1.5";
+    runCommand(admin, args, true);
+    runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
+
+    // 16e: set space quota with a value larger than Long.MAX_VALUE
+    runCommand(admin, true, "-setSpaceQuota",
+               (Long.MAX_VALUE/1024/1024 + 1024) + "m", args[2]);
+
+    // 17:  setQuota by a non-administrator
+    final String username = "userxx";
+    UserGroupInformation ugi =
+      UserGroupInformation.createUserForTesting(username,
+                                                new String[]{"groupyy"});
+
+    final String[] args2 = args.clone(); // need final ref for doAs block
+    ugi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        assertEquals("Not running as new user", username,
+            UserGroupInformation.getCurrentUser().getShortUserName());
+        DFSAdmin userAdmin = new DFSAdmin(conf);
+
+        args2[1] = "100";
+        runCommand(userAdmin, args2, true);
+        runCommand(userAdmin, true, "-setSpaceQuota", "1g", args2[2]);
+
+        // 18: clrQuota by a non-administrator
+        String[] args3 = new String[] {"-clrQuota", parent.toString()};
+        runCommand(userAdmin, args3, true);
+        runCommand(userAdmin, true, "-clrSpaceQuota",  args3[1]);
+
+        return null;
       }
-      assertTrue(hasException);
+    });
 
-      assertEquals(5, cluster.getNamesystem().getFSDirectory().getYieldCount());
-    } finally {
-      cluster.shutdown();
+    // 19: clrQuota on the root directory ("/") should fail
+    runCommand(admin, true, "-clrQuota", "/");
+
+    // 20: setQuota on the root directory ("/") should succeed
+    runCommand(admin, false, "-setQuota", "1000000", "/");
+
+    runCommand(admin, true, "-clrQuota", "/");
+    runCommand(admin, false, "-clrSpaceQuota", "/");
+    runCommand(admin, new String[]{"-clrQuota", parent.toString()}, false);
+    runCommand(admin, false, "-clrSpaceQuota", parent.toString());
+
+
+    // 2: create directory /test/data2
+    final Path childDir2 = new Path(parent, "data2");
+    assertTrue(dfs.mkdirs(childDir2));
+
+
+    final Path childFile2 = new Path(childDir2, "datafile2");
+    final Path childFile3 = new Path(childDir2, "datafile3");
+    final long spaceQuota2 = DEFAULT_BLOCK_SIZE * replication;
+    final long fileLen2 = DEFAULT_BLOCK_SIZE;
+    // set space quota to a real low value
+    runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), childDir2.toString());
+    // clear space quota
+    runCommand(admin, false, "-clrSpaceQuota", childDir2.toString());
+    // create a file that is greater than the size of space quota
+    DFSTestUtil.createFile(dfs, childFile2, fileLen2, replication, 0);
+
+    // now set space quota again. This should succeed
+    runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), childDir2.toString());
+
+    hasException = false;
+    try {
+      DFSTestUtil.createFile(dfs, childFile3, fileLen2, replication, 0);
+    } catch (DSQuotaExceededException e) {
+      hasException = true;
     }
+    assertTrue(hasException);
+
+    // now test the same for root
+    final Path childFile4 = new Path(dir, "datafile2");
+    final Path childFile5 = new Path(dir, "datafile3");
+
+    runCommand(admin, true, "-clrQuota", "/");
+    runCommand(admin, false, "-clrSpaceQuota", "/");
+    // set space quota to a real low value
+    runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), "/");
+    runCommand(admin, false, "-clrSpaceQuota", "/");
+    DFSTestUtil.createFile(dfs, childFile4, fileLen2, replication, 0);
+    runCommand(admin, false, "-setSpaceQuota", Long.toString(spaceQuota2), "/");
+
+    hasException = false;
+    try {
+      DFSTestUtil.createFile(dfs, childFile5, fileLen2, replication, 0);
+    } catch (DSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+
+    assertEquals(5, cluster.getNamesystem().getFSDirectory().getYieldCount());
+
+    /*
+     * clear sapce quota for root, otherwise other tests may fail due to
+     * insufficient space quota.
+     */
+    runCommand(admin, false, "-clrSpaceQuota", "/");
   }
   
   /** Test commands that change the size of the name space:
    *  mkdirs, rename, and delete */
   @Test
   public void testNamespaceCommands() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    // Make it relinquish locks. When run serially, the result should
-    // be identical.
-    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
+
+    // 1: create directory nqdir0/qdir1/qdir20/nqdir30
+    assertTrue(dfs.mkdirs(new Path(parent, "nqdir0/qdir1/qdir20/nqdir30")));
+
+    // 2: set the quota of nqdir0/qdir1 to be 6
+    final Path quotaDir1 = new Path(parent, "nqdir0/qdir1");
+    dfs.setQuota(quotaDir1, 6, HdfsConstants.QUOTA_DONT_SET);
+    ContentSummary c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 3);
+    assertEquals(c.getQuota(), 6);
+
+    // 3: set the quota of nqdir0/qdir1/qdir20 to be 7
+    final Path quotaDir2 = new Path(parent, "nqdir0/qdir1/qdir20");
+    dfs.setQuota(quotaDir2, 7, HdfsConstants.QUOTA_DONT_SET);
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 2);
+    assertEquals(c.getQuota(), 7);
+
+    // 4: Create directory nqdir0/qdir1/qdir21 and set its quota to 2
+    final Path quotaDir3 = new Path(parent, "nqdir0/qdir1/qdir21");
+    assertTrue(dfs.mkdirs(quotaDir3));
+    dfs.setQuota(quotaDir3, 2, HdfsConstants.QUOTA_DONT_SET);
+    c = dfs.getContentSummary(quotaDir3);
+    compareQuotaUsage(c, dfs, quotaDir3);
+    assertEquals(c.getDirectoryCount(), 1);
+    assertEquals(c.getQuota(), 2);
+
+    // 5: Create directory nqdir0/qdir1/qdir21/nqdir32
+    Path tempPath = new Path(quotaDir3, "nqdir32");
+    assertTrue(dfs.mkdirs(tempPath));
+    c = dfs.getContentSummary(quotaDir3);
+    compareQuotaUsage(c, dfs, quotaDir3);
+    assertEquals(c.getDirectoryCount(), 2);
+    assertEquals(c.getQuota(), 2);
+
+    // 6: Create directory nqdir0/qdir1/qdir21/nqdir33
+    tempPath = new Path(quotaDir3, "nqdir33");
+    boolean hasException = false;
+    try {
+      assertFalse(dfs.mkdirs(tempPath));
+    } catch (NSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+    c = dfs.getContentSummary(quotaDir3);
+    compareQuotaUsage(c, dfs, quotaDir3);
+    assertEquals(c.getDirectoryCount(), 2);
+    assertEquals(c.getQuota(), 2);
+
+    // 7: Create directory nqdir0/qdir1/qdir20/nqdir31
+    tempPath = new Path(quotaDir2, "nqdir31");
+    assertTrue(dfs.mkdirs(tempPath));
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 3);
+    assertEquals(c.getQuota(), 7);
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 6);
+    assertEquals(c.getQuota(), 6);
+
+    // 8: Create directory nqdir0/qdir1/qdir20/nqdir33
+    tempPath = new Path(quotaDir2, "nqdir33");
+    hasException = false;
+    try {
+      assertFalse(dfs.mkdirs(tempPath));
+    } catch (NSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+
+    // 9: Move nqdir0/qdir1/qdir21/nqdir32 nqdir0/qdir1/qdir20/nqdir30
+    tempPath = new Path(quotaDir2, "nqdir30");
+    dfs.rename(new Path(quotaDir3, "nqdir32"), tempPath);
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 4);
+    assertEquals(c.getQuota(), 7);
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 6);
+    assertEquals(c.getQuota(), 6);
+
+    // 10: Move nqdir0/qdir1/qdir20/nqdir30 to nqdir0/qdir1/qdir21
+    hasException = false;
+    try {
+      assertFalse(dfs.rename(tempPath, quotaDir3));
+    } catch (NSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+    assertTrue(dfs.exists(tempPath));
+    assertFalse(dfs.exists(new Path(quotaDir3, "nqdir30")));
     
+    // 10.a: Rename nqdir0/qdir1/qdir20/nqdir30 to nqdir0/qdir1/qdir21/nqdir32
+    hasException = false;
     try {
-      // 1: create directory /nqdir0/qdir1/qdir20/nqdir30
-      assertTrue(dfs.mkdirs(new Path("/nqdir0/qdir1/qdir20/nqdir30")));
-
-      // 2: set the quota of /nqdir0/qdir1 to be 6
-      final Path quotaDir1 = new Path("/nqdir0/qdir1");
-      dfs.setQuota(quotaDir1, 6, HdfsConstants.QUOTA_DONT_SET);
-      ContentSummary c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 3);
-      assertEquals(c.getQuota(), 6);
-
-      // 3: set the quota of /nqdir0/qdir1/qdir20 to be 7
-      final Path quotaDir2 = new Path("/nqdir0/qdir1/qdir20");
-      dfs.setQuota(quotaDir2, 7, HdfsConstants.QUOTA_DONT_SET);
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 2);
-      assertEquals(c.getQuota(), 7);
-
-      // 4: Create directory /nqdir0/qdir1/qdir21 and set its quota to 2
-      final Path quotaDir3 = new Path("/nqdir0/qdir1/qdir21");
-      assertTrue(dfs.mkdirs(quotaDir3));
-      dfs.setQuota(quotaDir3, 2, HdfsConstants.QUOTA_DONT_SET);
-      c = dfs.getContentSummary(quotaDir3);
-      compareQuotaUsage(c, dfs, quotaDir3);
-      assertEquals(c.getDirectoryCount(), 1);
-      assertEquals(c.getQuota(), 2);
-
-      // 5: Create directory /nqdir0/qdir1/qdir21/nqdir32
-      Path tempPath = new Path(quotaDir3, "nqdir32");
-      assertTrue(dfs.mkdirs(tempPath));
-      c = dfs.getContentSummary(quotaDir3);
-      compareQuotaUsage(c, dfs, quotaDir3);
-      assertEquals(c.getDirectoryCount(), 2);
-      assertEquals(c.getQuota(), 2);
-
-      // 6: Create directory /nqdir0/qdir1/qdir21/nqdir33
-      tempPath = new Path(quotaDir3, "nqdir33");
-      boolean hasException = false;
-      try {
-        assertFalse(dfs.mkdirs(tempPath));
-      } catch (NSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      c = dfs.getContentSummary(quotaDir3);
-      compareQuotaUsage(c, dfs, quotaDir3);
-      assertEquals(c.getDirectoryCount(), 2);
-      assertEquals(c.getQuota(), 2);
-
-      // 7: Create directory /nqdir0/qdir1/qdir20/nqdir31
-      tempPath = new Path(quotaDir2, "nqdir31");
-      assertTrue(dfs.mkdirs(tempPath));
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 3);
-      assertEquals(c.getQuota(), 7);
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 6);
-      assertEquals(c.getQuota(), 6);
-
-      // 8: Create directory /nqdir0/qdir1/qdir20/nqdir33
-      tempPath = new Path(quotaDir2, "nqdir33");
-      hasException = false;
-      try {
-        assertFalse(dfs.mkdirs(tempPath));
-      } catch (NSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-
-      // 9: Move /nqdir0/qdir1/qdir21/nqdir32 /nqdir0/qdir1/qdir20/nqdir30
-      tempPath = new Path(quotaDir2, "nqdir30");
-      dfs.rename(new Path(quotaDir3, "nqdir32"), tempPath);
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 4);
-      assertEquals(c.getQuota(), 7);
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 6);
-      assertEquals(c.getQuota(), 6);
-
-      // 10: Move /nqdir0/qdir1/qdir20/nqdir30 to /nqdir0/qdir1/qdir21
-      hasException = false;
-      try {
-        assertFalse(dfs.rename(tempPath, quotaDir3));
-      } catch (NSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      assertTrue(dfs.exists(tempPath));
-      assertFalse(dfs.exists(new Path(quotaDir3, "nqdir30")));
-      
-      // 10.a: Rename /nqdir0/qdir1/qdir20/nqdir30 to /nqdir0/qdir1/qdir21/nqdir32
-      hasException = false;
-      try {
-        assertFalse(dfs.rename(tempPath, new Path(quotaDir3, "nqdir32")));
-      } catch (QuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      assertTrue(dfs.exists(tempPath));
-      assertFalse(dfs.exists(new Path(quotaDir3, "nqdir32")));
-
-      // 11: Move /nqdir0/qdir1/qdir20/nqdir30 to /nqdir0
-      assertTrue(dfs.rename(tempPath, new Path("/nqdir0")));
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 2);
-      assertEquals(c.getQuota(), 7);
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 4);
-      assertEquals(c.getQuota(), 6);
-
-      // 12: Create directory /nqdir0/nqdir30/nqdir33
-      assertTrue(dfs.mkdirs(new Path("/nqdir0/nqdir30/nqdir33")));
-
-      // 13: Move /nqdir0/nqdir30 /nqdir0/qdir1/qdir20/qdir30
-      hasException = false;
-      try {
-        assertFalse(dfs.rename(new Path("/nqdir0/nqdir30"), tempPath));
-      } catch (NSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-
-      // 14: Move /nqdir0/qdir1/qdir21 /nqdir0/qdir1/qdir20
-      assertTrue(dfs.rename(quotaDir3, quotaDir2));
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 4);
-      assertEquals(c.getQuota(), 6);
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 3);
-      assertEquals(c.getQuota(), 7);
-      tempPath = new Path(quotaDir2, "qdir21");
-      c = dfs.getContentSummary(tempPath);
-      compareQuotaUsage(c, dfs, tempPath);
-      assertEquals(c.getDirectoryCount(), 1);
-      assertEquals(c.getQuota(), 2);
-
-      // 15: Delete /nqdir0/qdir1/qdir20/qdir21
-      dfs.delete(tempPath, true);
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 2);
-      assertEquals(c.getQuota(), 7);
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 3);
-      assertEquals(c.getQuota(), 6);
-
-      // 16: Move /nqdir0/qdir30 /nqdir0/qdir1/qdir20
-      assertTrue(dfs.rename(new Path("/nqdir0/nqdir30"), quotaDir2));
-      c = dfs.getContentSummary(quotaDir2);
-      compareQuotaUsage(c, dfs, quotaDir2);
-      assertEquals(c.getDirectoryCount(), 5);
-      assertEquals(c.getQuota(), 7);
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getDirectoryCount(), 6);
-      assertEquals(c.getQuota(), 6);
-      assertEquals(14, cluster.getNamesystem().getFSDirectory().getYieldCount());
-    } finally {
-      cluster.shutdown();
+      assertFalse(dfs.rename(tempPath, new Path(quotaDir3, "nqdir32")));
+    } catch (QuotaExceededException e) {
+      hasException = true;
     }
+    assertTrue(hasException);
+    assertTrue(dfs.exists(tempPath));
+    assertFalse(dfs.exists(new Path(quotaDir3, "nqdir32")));
+
+    // 11: Move nqdir0/qdir1/qdir20/nqdir30 to nqdir0
+    assertTrue(dfs.rename(tempPath, new Path(parent, "nqdir0")));
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 2);
+    assertEquals(c.getQuota(), 7);
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 4);
+    assertEquals(c.getQuota(), 6);
+
+    // 12: Create directory nqdir0/nqdir30/nqdir33
+    assertTrue(dfs.mkdirs(new Path(parent, "nqdir0/nqdir30/nqdir33")));
+
+    // 13: Move nqdir0/nqdir30 nqdir0/qdir1/qdir20/qdir30
+    hasException = false;
+    try {
+      assertFalse(dfs.rename(new Path(parent, "nqdir0/nqdir30"), tempPath));
+    } catch (NSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+
+    // 14: Move nqdir0/qdir1/qdir21 nqdir0/qdir1/qdir20
+    assertTrue(dfs.rename(quotaDir3, quotaDir2));
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 4);
+    assertEquals(c.getQuota(), 6);
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 3);
+    assertEquals(c.getQuota(), 7);
+    tempPath = new Path(quotaDir2, "qdir21");
+    c = dfs.getContentSummary(tempPath);
+    compareQuotaUsage(c, dfs, tempPath);
+    assertEquals(c.getDirectoryCount(), 1);
+    assertEquals(c.getQuota(), 2);
+
+    // 15: Delete nqdir0/qdir1/qdir20/qdir21
+    dfs.delete(tempPath, true);
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 2);
+    assertEquals(c.getQuota(), 7);
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 3);
+    assertEquals(c.getQuota(), 6);
+
+    // 16: Move nqdir0/qdir30 nqdir0/qdir1/qdir20
+    assertTrue(dfs.rename(new Path(parent, "nqdir0/nqdir30"), quotaDir2));
+    c = dfs.getContentSummary(quotaDir2);
+    compareQuotaUsage(c, dfs, quotaDir2);
+    assertEquals(c.getDirectoryCount(), 5);
+    assertEquals(c.getQuota(), 7);
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getDirectoryCount(), 6);
+    assertEquals(c.getQuota(), 6);
   }
   
   /**
@@ -568,270 +637,256 @@ public class TestQuota {
    */
   @Test
   public void testSpaceCommands() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    // set a smaller block size so that we can test with smaller 
-    // diskspace quotas
-    conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
-    // Make it relinquish locks. When run serially, the result should
-    // be identical.
-    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    final FileSystem fs = cluster.getFileSystem();
-    assertTrue("Not a HDFS: "+fs.getUri(),
-                fs instanceof DistributedFileSystem);
-    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
-
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
+
+    int fileLen = 1024;
+    short replication = 3;
+    int fileSpace = fileLen * replication;
+
+    // create directory nqdir0/qdir1/qdir20/nqdir30
+    assertTrue(dfs.mkdirs(new Path(parent, "nqdir0/qdir1/qdir20/nqdir30")));
+
+    // set the quota of nqdir0/qdir1 to 4 * fileSpace
+    final Path quotaDir1 = new Path(parent, "nqdir0/qdir1");
+    dfs.setQuota(quotaDir1, HdfsConstants.QUOTA_DONT_SET, 4 * fileSpace);
+    ContentSummary c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getSpaceQuota(), 4 * fileSpace);
+
+    // set the quota of nqdir0/qdir1/qdir20 to 6 * fileSpace
+    final Path quotaDir20 = new Path(parent, "nqdir0/qdir1/qdir20");
+    dfs.setQuota(quotaDir20, HdfsConstants.QUOTA_DONT_SET, 6 * fileSpace);
+    c = dfs.getContentSummary(quotaDir20);
+    compareQuotaUsage(c, dfs, quotaDir20);
+    assertEquals(c.getSpaceQuota(), 6 * fileSpace);
+
+    // Create nqdir0/qdir1/qdir21 and set its space quota to 2 * fileSpace
+    final Path quotaDir21 = new Path(parent, "nqdir0/qdir1/qdir21");
+    assertTrue(dfs.mkdirs(quotaDir21));
+    dfs.setQuota(quotaDir21, HdfsConstants.QUOTA_DONT_SET, 2 * fileSpace);
+    c = dfs.getContentSummary(quotaDir21);
+    compareQuotaUsage(c, dfs, quotaDir21);
+    assertEquals(c.getSpaceQuota(), 2 * fileSpace);
+
+    // 5: Create directory nqdir0/qdir1/qdir21/nqdir32
+    Path tempPath = new Path(quotaDir21, "nqdir32");
+    assertTrue(dfs.mkdirs(tempPath));
+
+    // create a file under nqdir32/fileDir
+    DFSTestUtil.createFile(dfs, new Path(tempPath, "fileDir/file1"), fileLen,
+                           replication, 0);
+    c = dfs.getContentSummary(quotaDir21);
+    compareQuotaUsage(c, dfs, quotaDir21);
+    assertEquals(c.getSpaceConsumed(), fileSpace);
+
+    // Create a larger file nqdir0/qdir1/qdir21/nqdir33/
+    boolean hasException = false;
     try {
-      int fileLen = 1024;
-      short replication = 3;
-      int fileSpace = fileLen * replication;
-      
-      // create directory /nqdir0/qdir1/qdir20/nqdir30
-      assertTrue(dfs.mkdirs(new Path("/nqdir0/qdir1/qdir20/nqdir30")));
-
-      // set the quota of /nqdir0/qdir1 to 4 * fileSpace 
-      final Path quotaDir1 = new Path("/nqdir0/qdir1");
-      dfs.setQuota(quotaDir1, HdfsConstants.QUOTA_DONT_SET, 4 * fileSpace);
-      ContentSummary c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getSpaceQuota(), 4 * fileSpace);
-      
-      // set the quota of /nqdir0/qdir1/qdir20 to 6 * fileSpace 
-      final Path quotaDir20 = new Path("/nqdir0/qdir1/qdir20");
-      dfs.setQuota(quotaDir20, HdfsConstants.QUOTA_DONT_SET, 6 * fileSpace);
-      c = dfs.getContentSummary(quotaDir20);
-      compareQuotaUsage(c, dfs, quotaDir20);
-      assertEquals(c.getSpaceQuota(), 6 * fileSpace);
-
-      // Create /nqdir0/qdir1/qdir21 and set its space quota to 2 * fileSpace
-      final Path quotaDir21 = new Path("/nqdir0/qdir1/qdir21");
-      assertTrue(dfs.mkdirs(quotaDir21));
-      dfs.setQuota(quotaDir21, HdfsConstants.QUOTA_DONT_SET, 2 * fileSpace);
-      c = dfs.getContentSummary(quotaDir21);
-      compareQuotaUsage(c, dfs, quotaDir21);
-      assertEquals(c.getSpaceQuota(), 2 * fileSpace);
-
-      // 5: Create directory /nqdir0/qdir1/qdir21/nqdir32
-      Path tempPath = new Path(quotaDir21, "nqdir32");
-      assertTrue(dfs.mkdirs(tempPath));
-      
-      // create a file under nqdir32/fileDir
-      DFSTestUtil.createFile(dfs, new Path(tempPath, "fileDir/file1"), fileLen, 
-                             replication, 0);
-      c = dfs.getContentSummary(quotaDir21);
-      compareQuotaUsage(c, dfs, quotaDir21);
-      assertEquals(c.getSpaceConsumed(), fileSpace);
-      
-      // Create a larger file /nqdir0/qdir1/qdir21/nqdir33/
-      boolean hasException = false;
-      try {
-        DFSTestUtil.createFile(dfs, new Path(quotaDir21, "nqdir33/file2"), 
-                               2*fileLen, replication, 0);
-      } catch (DSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      // delete nqdir33
-      assertTrue(dfs.delete(new Path(quotaDir21, "nqdir33"), true));
-      c = dfs.getContentSummary(quotaDir21);
-      compareQuotaUsage(c, dfs, quotaDir21);
-      assertEquals(c.getSpaceConsumed(), fileSpace);
-      assertEquals(c.getSpaceQuota(), 2*fileSpace);
-
-      // Verify space before the move:
-      c = dfs.getContentSummary(quotaDir20);
-      compareQuotaUsage(c, dfs, quotaDir20);
-      assertEquals(c.getSpaceConsumed(), 0);
-      
-      // Move /nqdir0/qdir1/qdir21/nqdir32 /nqdir0/qdir1/qdir20/nqdir30
-      Path dstPath = new Path(quotaDir20, "nqdir30");
-      Path srcPath = new Path(quotaDir21, "nqdir32");
-      assertTrue(dfs.rename(srcPath, dstPath));
-      
-      // verify space after the move
-      c = dfs.getContentSummary(quotaDir20);
-      assertEquals(c.getSpaceConsumed(), fileSpace);
-      // verify space for its parent
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getSpaceConsumed(), fileSpace);
-      // verify space for source for the move
-      c = dfs.getContentSummary(quotaDir21);
-      compareQuotaUsage(c, dfs, quotaDir21);
-      assertEquals(c.getSpaceConsumed(), 0);
-      
-      final Path file2 = new Path(dstPath, "fileDir/file2");
-      int file2Len = 2 * fileLen;
-      // create a larger file under /nqdir0/qdir1/qdir20/nqdir30
-      DFSTestUtil.createFile(dfs, file2, file2Len, replication, 0);
-      
-      c = dfs.getContentSummary(quotaDir20);
-      assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
-      c = dfs.getContentSummary(quotaDir21);
-      compareQuotaUsage(c, dfs, quotaDir21);
-      assertEquals(c.getSpaceConsumed(), 0);
-      
-      // Reverse: Move /nqdir0/qdir1/qdir20/nqdir30 to /nqdir0/qdir1/qdir21/
-      hasException = false;
-      try {
-        assertFalse(dfs.rename(dstPath, srcPath));
-      } catch (DSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      
-      // make sure no intermediate directories left by failed rename
-      assertFalse(dfs.exists(srcPath));
-      // directory should exist
-      assertTrue(dfs.exists(dstPath));
-            
-      // verify space after the failed move
-      c = dfs.getContentSummary(quotaDir20);
-      assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
-      c = dfs.getContentSummary(quotaDir21);
-      compareQuotaUsage(c, dfs, quotaDir21);
-      assertEquals(c.getSpaceConsumed(), 0);
-      
-      // Test Append :
-      
-      // verify space quota
-      c = dfs.getContentSummary(quotaDir1);
-      compareQuotaUsage(c, dfs, quotaDir1);
-      assertEquals(c.getSpaceQuota(), 4 * fileSpace);
-      
-      // verify space before append;
-      c = dfs.getContentSummary(dstPath);
-      compareQuotaUsage(c, dfs, dstPath);
-      assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
-      
-      OutputStream out = dfs.append(file2);
-      // appending 1 fileLen should succeed
-      out.write(new byte[fileLen]);
+      DFSTestUtil.createFile(dfs, new Path(quotaDir21, "nqdir33/file2"),
+                             2*fileLen, replication, 0);
+    } catch (DSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+    // delete nqdir33
+    assertTrue(dfs.delete(new Path(quotaDir21, "nqdir33"), true));
+    c = dfs.getContentSummary(quotaDir21);
+    compareQuotaUsage(c, dfs, quotaDir21);
+    assertEquals(c.getSpaceConsumed(), fileSpace);
+    assertEquals(c.getSpaceQuota(), 2*fileSpace);
+
+    // Verify space before the move:
+    c = dfs.getContentSummary(quotaDir20);
+    compareQuotaUsage(c, dfs, quotaDir20);
+    assertEquals(c.getSpaceConsumed(), 0);
+
+    // Move nqdir0/qdir1/qdir21/nqdir32 nqdir0/qdir1/qdir20/nqdir30
+    Path dstPath = new Path(quotaDir20, "nqdir30");
+    Path srcPath = new Path(quotaDir21, "nqdir32");
+    assertTrue(dfs.rename(srcPath, dstPath));
+
+    // verify space after the move
+    c = dfs.getContentSummary(quotaDir20);
+    assertEquals(c.getSpaceConsumed(), fileSpace);
+    // verify space for its parent
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getSpaceConsumed(), fileSpace);
+    // verify space for source for the move
+    c = dfs.getContentSummary(quotaDir21);
+    compareQuotaUsage(c, dfs, quotaDir21);
+    assertEquals(c.getSpaceConsumed(), 0);
+
+    final Path file2 = new Path(dstPath, "fileDir/file2");
+    int file2Len = 2 * fileLen;
+    // create a larger file under nqdir0/qdir1/qdir20/nqdir30
+    DFSTestUtil.createFile(dfs, file2, file2Len, replication, 0);
+
+    c = dfs.getContentSummary(quotaDir20);
+    assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
+    c = dfs.getContentSummary(quotaDir21);
+    compareQuotaUsage(c, dfs, quotaDir21);
+    assertEquals(c.getSpaceConsumed(), 0);
+
+    // Reverse: Move nqdir0/qdir1/qdir20/nqdir30 to nqdir0/qdir1/qdir21/
+    hasException = false;
+    try {
+      assertFalse(dfs.rename(dstPath, srcPath));
+    } catch (DSQuotaExceededException e) {
+      hasException = true;
+    }
+    assertTrue(hasException);
+
+    // make sure no intermediate directories left by failed rename
+    assertFalse(dfs.exists(srcPath));
+    // directory should exist
+    assertTrue(dfs.exists(dstPath));
+
+    // verify space after the failed move
+    c = dfs.getContentSummary(quotaDir20);
+    assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
+    c = dfs.getContentSummary(quotaDir21);
+    compareQuotaUsage(c, dfs, quotaDir21);
+    assertEquals(c.getSpaceConsumed(), 0);
+
+    // Test Append :
+
+    // verify space quota
+    c = dfs.getContentSummary(quotaDir1);
+    compareQuotaUsage(c, dfs, quotaDir1);
+    assertEquals(c.getSpaceQuota(), 4 * fileSpace);
+
+    // verify space before append;
+    c = dfs.getContentSummary(dstPath);
+    compareQuotaUsage(c, dfs, dstPath);
+    assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
+
+    OutputStream out = dfs.append(file2);
+    // appending 1 fileLen should succeed
+    out.write(new byte[fileLen]);
+    out.close();
+
+    file2Len += fileLen; // after append
+
+    // verify space after append;
+    c = dfs.getContentSummary(dstPath);
+    compareQuotaUsage(c, dfs, dstPath);
+    assertEquals(c.getSpaceConsumed(), 4 * fileSpace);
+
+    // now increase the quota for quotaDir1
+    dfs.setQuota(quotaDir1, HdfsConstants.QUOTA_DONT_SET, 5 * fileSpace);
+    // Now, appending more than 1 fileLen should result in an error
+    out = dfs.append(file2);
+    hasException = false;
+    try {
+      out.write(new byte[fileLen + 1024]);
+      out.flush();
       out.close();
-      
-      file2Len += fileLen; // after append
-      
-      // verify space after append;
-      c = dfs.getContentSummary(dstPath);
-      compareQuotaUsage(c, dfs, dstPath);
-      assertEquals(c.getSpaceConsumed(), 4 * fileSpace);
-      
-      // now increase the quota for quotaDir1
-      dfs.setQuota(quotaDir1, HdfsConstants.QUOTA_DONT_SET, 5 * fileSpace);
-      // Now, appending more than 1 fileLen should result in an error
-      out = dfs.append(file2);
-      hasException = false;
-      try {
-        out.write(new byte[fileLen + 1024]);
-        out.flush();
-        out.close();
-      } catch (DSQuotaExceededException e) {
-        hasException = true;
-        IOUtils.closeStream(out);
-      }
-      assertTrue(hasException);
-      
-      file2Len += fileLen; // after partial append
-      
-      // verify space after partial append
-      c = dfs.getContentSummary(dstPath);
-      compareQuotaUsage(c, dfs, dstPath);
-      assertEquals(c.getSpaceConsumed(), 5 * fileSpace);
-      
-      // Test set replication :
-      
-      // first reduce the replication
-      dfs.setReplication(file2, (short)(replication-1));
-      
-      // verify that space is reduced by file2Len
-      c = dfs.getContentSummary(dstPath);
-      compareQuotaUsage(c, dfs, dstPath);
-      assertEquals(c.getSpaceConsumed(), 5 * fileSpace - file2Len);
-      
-      // now try to increase the replication and and expect an error.
-      hasException = false;
-      try {
-        dfs.setReplication(file2, (short)(replication+1));
-      } catch (DSQuotaExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
+    } catch (DSQuotaExceededException e) {
+      hasException = true;
+      IOUtils.closeStream(out);
+    }
+    assertTrue(hasException);
 
-      // verify space consumed remains unchanged.
-      c = dfs.getContentSummary(dstPath);
-      compareQuotaUsage(c, dfs, dstPath);
-      assertEquals(c.getSpaceConsumed(), 5 * fileSpace - file2Len);
-      
-      // now increase the quota for quotaDir1 and quotaDir20
-      dfs.setQuota(quotaDir1, HdfsConstants.QUOTA_DONT_SET, 10 * fileSpace);
-      dfs.setQuota(quotaDir20, HdfsConstants.QUOTA_DONT_SET, 10 * fileSpace);
-      
-      // then increasing replication should be ok.
+    file2Len += fileLen; // after partial append
+
+    // verify space after partial append
+    c = dfs.getContentSummary(dstPath);
+    compareQuotaUsage(c, dfs, dstPath);
+    assertEquals(c.getSpaceConsumed(), 5 * fileSpace);
+
+    // Test set replication :
+
+    // first reduce the replication
+    dfs.setReplication(file2, (short)(replication-1));
+
+    // verify that space is reduced by file2Len
+    c = dfs.getContentSummary(dstPath);
+    compareQuotaUsage(c, dfs, dstPath);
+    assertEquals(c.getSpaceConsumed(), 5 * fileSpace - file2Len);
+
+    // now try to increase the replication and and expect an error.
+    hasException = false;
+    try {
       dfs.setReplication(file2, (short)(replication+1));
-      // verify increase in space
-      c = dfs.getContentSummary(dstPath);
-      compareQuotaUsage(c, dfs, dstPath);
-      assertEquals(c.getSpaceConsumed(), 5 * fileSpace + file2Len);
-
-      // Test HDFS-2053 :
-
-      // Create directory /hdfs-2053
-      final Path quotaDir2053 = new Path("/hdfs-2053");
-      assertTrue(dfs.mkdirs(quotaDir2053));
-
-      // Create subdirectories /hdfs-2053/{A,B,C}
-      final Path quotaDir2053_A = new Path(quotaDir2053, "A");
-      assertTrue(dfs.mkdirs(quotaDir2053_A));
-      final Path quotaDir2053_B = new Path(quotaDir2053, "B");
-      assertTrue(dfs.mkdirs(quotaDir2053_B));
-      final Path quotaDir2053_C = new Path(quotaDir2053, "C");
-      assertTrue(dfs.mkdirs(quotaDir2053_C));
-
-      // Factors to vary the sizes of test files created in each subdir.
-      // The actual factors are not really important but they allow us to create
-      // identifiable file sizes per subdir, which helps during debugging.
-      int sizeFactorA = 1;
-      int sizeFactorB = 2;
-      int sizeFactorC = 4;
-
-      // Set space quota for subdirectory C
-      dfs.setQuota(quotaDir2053_C, HdfsConstants.QUOTA_DONT_SET,
-          (sizeFactorC + 1) * fileSpace);
-      c = dfs.getContentSummary(quotaDir2053_C);
-      compareQuotaUsage(c, dfs, quotaDir2053_C);
-      assertEquals(c.getSpaceQuota(), (sizeFactorC + 1) * fileSpace);
-
-      // Create a file under subdirectory A
-      DFSTestUtil.createFile(dfs, new Path(quotaDir2053_A, "fileA"),
-          sizeFactorA * fileLen, replication, 0);
-      c = dfs.getContentSummary(quotaDir2053_A);
-      compareQuotaUsage(c, dfs, quotaDir2053_A);
-      assertEquals(c.getSpaceConsumed(), sizeFactorA * fileSpace);
-
-      // Create a file under subdirectory B
-      DFSTestUtil.createFile(dfs, new Path(quotaDir2053_B, "fileB"),
-          sizeFactorB * fileLen, replication, 0);
-      c = dfs.getContentSummary(quotaDir2053_B);
-      compareQuotaUsage(c, dfs, quotaDir2053_B);
-      assertEquals(c.getSpaceConsumed(), sizeFactorB * fileSpace);
-
-      // Create a file under subdirectory C (which has a space quota)
-      DFSTestUtil.createFile(dfs, new Path(quotaDir2053_C, "fileC"),
-          sizeFactorC * fileLen, replication, 0);
-      c = dfs.getContentSummary(quotaDir2053_C);
-      compareQuotaUsage(c, dfs, quotaDir2053_C);
-      assertEquals(c.getSpaceConsumed(), sizeFactorC * fileSpace);
-
-      // Check space consumed for /hdfs-2053
-      c = dfs.getContentSummary(quotaDir2053);
-      compareQuotaUsage(c, dfs, quotaDir2053);
-      assertEquals(c.getSpaceConsumed(),
-          (sizeFactorA + sizeFactorB + sizeFactorC) * fileSpace);
-
-      assertEquals(28, cluster.getNamesystem().getFSDirectory().getYieldCount());
-    } finally {
-      cluster.shutdown();
+    } catch (DSQuotaExceededException e) {
+      hasException = true;
     }
+    assertTrue(hasException);
+
+    // verify space consumed remains unchanged.
+    c = dfs.getContentSummary(dstPath);
+    compareQuotaUsage(c, dfs, dstPath);
+    assertEquals(c.getSpaceConsumed(), 5 * fileSpace - file2Len);
+
+    // now increase the quota for quotaDir1 and quotaDir20
+    dfs.setQuota(quotaDir1, HdfsConstants.QUOTA_DONT_SET, 10 * fileSpace);
+    dfs.setQuota(quotaDir20, HdfsConstants.QUOTA_DONT_SET, 10 * fileSpace);
+
+    // then increasing replication should be ok.
+    dfs.setReplication(file2, (short)(replication+1));
+    // verify increase in space
+    c = dfs.getContentSummary(dstPath);
+    compareQuotaUsage(c, dfs, dstPath);
+    assertEquals(c.getSpaceConsumed(), 5 * fileSpace + file2Len);
+
+    // Test HDFS-2053 :
+
+    // Create directory hdfs-2053
+    final Path quotaDir2053 = new Path(parent, "hdfs-2053");
+    assertTrue(dfs.mkdirs(quotaDir2053));
+
+    // Create subdirectories /hdfs-2053/{A,B,C}
+    final Path quotaDir2053_A = new Path(quotaDir2053, "A");
+    assertTrue(dfs.mkdirs(quotaDir2053_A));
+    final Path quotaDir2053_B = new Path(quotaDir2053, "B");
+    assertTrue(dfs.mkdirs(quotaDir2053_B));
+    final Path quotaDir2053_C = new Path(quotaDir2053, "C");
+    assertTrue(dfs.mkdirs(quotaDir2053_C));
+
+    // Factors to vary the sizes of test files created in each subdir.
+    // The actual factors are not really important but they allow us to create
+    // identifiable file sizes per subdir, which helps during debugging.
+    int sizeFactorA = 1;
+    int sizeFactorB = 2;
+    int sizeFactorC = 4;
+
+    // Set space quota for subdirectory C
+    dfs.setQuota(quotaDir2053_C, HdfsConstants.QUOTA_DONT_SET,
+        (sizeFactorC + 1) * fileSpace);
+    c = dfs.getContentSummary(quotaDir2053_C);
+    compareQuotaUsage(c, dfs, quotaDir2053_C);
+    assertEquals(c.getSpaceQuota(), (sizeFactorC + 1) * fileSpace);
+
+    // Create a file under subdirectory A
+    DFSTestUtil.createFile(dfs, new Path(quotaDir2053_A, "fileA"),
+        sizeFactorA * fileLen, replication, 0);
+    c = dfs.getContentSummary(quotaDir2053_A);
+    compareQuotaUsage(c, dfs, quotaDir2053_A);
+    assertEquals(c.getSpaceConsumed(), sizeFactorA * fileSpace);
+
+    // Create a file under subdirectory B
+    DFSTestUtil.createFile(dfs, new Path(quotaDir2053_B, "fileB"),
+        sizeFactorB * fileLen, replication, 0);
+    c = dfs.getContentSummary(quotaDir2053_B);
+    compareQuotaUsage(c, dfs, quotaDir2053_B);
+    assertEquals(c.getSpaceConsumed(), sizeFactorB * fileSpace);
+
+    // Create a file under subdirectory C (which has a space quota)
+    DFSTestUtil.createFile(dfs, new Path(quotaDir2053_C, "fileC"),
+        sizeFactorC * fileLen, replication, 0);
+    c = dfs.getContentSummary(quotaDir2053_C);
+    compareQuotaUsage(c, dfs, quotaDir2053_C);
+    assertEquals(c.getSpaceConsumed(), sizeFactorC * fileSpace);
+
+    // Check space consumed for /hdfs-2053
+    c = dfs.getContentSummary(quotaDir2053);
+    compareQuotaUsage(c, dfs, quotaDir2053);
+    assertEquals(c.getSpaceConsumed(),
+        (sizeFactorA + sizeFactorB + sizeFactorC) * fileSpace);
   }
 
   /**
@@ -839,52 +894,39 @@ public class TestQuota {
    */
   @Test
   public void testQuotaByStorageType() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    // set a smaller block size so that we can test with smaller
-    // diskspace quotas
-    conf.set(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, "512");
-    // Make it relinquish locks. When run serially, the result should
-    // be identical.
-    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
-    final MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
-    final FileSystem fs = cluster.getFileSystem();
-    assertTrue("Not a HDFS: " + fs.getUri(),
-        fs instanceof DistributedFileSystem);
-    final DistributedFileSystem dfs = (DistributedFileSystem) fs;
-
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
+
+    int fileLen = 1024;
+    short replication = 3;
+    int fileSpace = fileLen * replication;
+
+    final Path quotaDir20 = new Path(parent, "nqdir0/qdir1/qdir20");
+    assertTrue(dfs.mkdirs(quotaDir20));
+    dfs.setQuota(quotaDir20, HdfsConstants.QUOTA_DONT_SET, 6 * fileSpace);
+
+    // Verify DirectoryWithQuotaFeature's storage type usage
+    // is updated properly after deletion.
+    // File creation followed by deletion shouldn't change storage type
+    // usage regardless whether storage policy is set.
+    Path file = new Path(quotaDir20, "fileDir/file1");
+    DFSTestUtil.createFile(dfs, file, fileLen * 3, replication, 0);
+    dfs.delete(file, false);
+    dfs.setStoragePolicy(quotaDir20, HdfsConstants.HOT_STORAGE_POLICY_NAME);
+    dfs.setQuotaByStorageType(quotaDir20, StorageType.DEFAULT,
+        2 * fileSpace);
+    boolean hasException = false;
     try {
-      int fileLen = 1024;
-      short replication = 3;
-      int fileSpace = fileLen * replication;
-
-      final Path quotaDir20 = new Path("/nqdir0/qdir1/qdir20");
-      assertTrue(dfs.mkdirs(quotaDir20));
-      dfs.setQuota(quotaDir20, HdfsConstants.QUOTA_DONT_SET, 6 * fileSpace);
-
-      // Verify DirectoryWithQuotaFeature's storage type usage
-      // is updated properly after deletion.
-      // File creation followed by deletion shouldn't change storage type
-      // usage regardless whether storage policy is set.
-      Path file = new Path(quotaDir20, "fileDir/file1");
       DFSTestUtil.createFile(dfs, file, fileLen * 3, replication, 0);
-      dfs.delete(file, false);
-      dfs.setStoragePolicy(quotaDir20, HdfsConstants.HOT_STORAGE_POLICY_NAME);
-      dfs.setQuotaByStorageType(quotaDir20, StorageType.DEFAULT,
-          2 * fileSpace);
-      boolean hasException = false;
-      try {
-        DFSTestUtil.createFile(dfs, file, fileLen * 3, replication, 0);
-      } catch (QuotaByStorageTypeExceededException e) {
-        hasException = true;
-      }
-      assertTrue(hasException);
-      dfs.delete(file, false);
-      dfs.setQuotaByStorageType(quotaDir20, StorageType.DEFAULT,
-          6 * fileSpace);
-    } finally {
-      cluster.shutdown();
+    } catch (QuotaByStorageTypeExceededException e) {
+      hasException = true;
     }
+    assertTrue(hasException);
+    dfs.delete(file, false);
+    dfs.setQuotaByStorageType(quotaDir20, StorageType.DEFAULT,
+        6 * fileSpace);
   }
 
   private static void checkContentSummary(final ContentSummary expected,
@@ -893,6 +935,66 @@ public class TestQuota {
   }
 
   /**
+   * Test limit cases for setting space quotas.
+   */
+  @Test
+  public void testMaxSpaceQuotas() throws Exception {
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
+
+    final FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+                fs instanceof DistributedFileSystem);
+    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+
+    // create test directory
+    final Path testFolder = new Path(parent, "testFolder");
+    assertTrue(dfs.mkdirs(testFolder));
+
+    // setting namespace quota to Long.MAX_VALUE - 1 should work
+    dfs.setQuota(testFolder, Long.MAX_VALUE - 1, 10);
+    ContentSummary c = dfs.getContentSummary(testFolder);
+    compareQuotaUsage(c, dfs, testFolder);
+    assertTrue("Quota not set properly", c.getQuota() == Long.MAX_VALUE - 1);
+
+    // setting diskspace quota to Long.MAX_VALUE - 1 should work
+    dfs.setQuota(testFolder, 10, Long.MAX_VALUE - 1);
+    c = dfs.getContentSummary(testFolder);
+    compareQuotaUsage(c, dfs, testFolder);
+    assertTrue("Quota not set properly", c.getSpaceQuota() == Long.MAX_VALUE - 1);
+
+    // setting namespace quota to Long.MAX_VALUE should not work + no error
+    dfs.setQuota(testFolder, Long.MAX_VALUE, 10);
+    c = dfs.getContentSummary(testFolder);
+    compareQuotaUsage(c, dfs, testFolder);
+    assertTrue("Quota should not have changed", c.getQuota() == 10);
+
+    // setting diskspace quota to Long.MAX_VALUE should not work + no error
+    dfs.setQuota(testFolder, 10, Long.MAX_VALUE);
+    c = dfs.getContentSummary(testFolder);
+    compareQuotaUsage(c, dfs, testFolder);
+    assertTrue("Quota should not have changed", c.getSpaceQuota() == 10);
+
+    // setting namespace quota to Long.MAX_VALUE + 1 should not work + error
+    try {
+      dfs.setQuota(testFolder, Long.MAX_VALUE + 1, 10);
+      fail("Exception not thrown");
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+
+    // setting diskspace quota to Long.MAX_VALUE + 1 should not work + error
+    try {
+      dfs.setQuota(testFolder, 10, Long.MAX_VALUE + 1);
+      fail("Exception not thrown");
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  /**
    * Violate a space quota using files of size < 1 block. Test that block
    * allocation conservatively assumes that for quota checking the entire
    * space of the block is used.
@@ -900,60 +1002,47 @@ public class TestQuota {
   @Test
   public void testBlockAllocationAdjustsUsageConservatively() 
       throws Exception {
-    Configuration conf = new HdfsConfiguration();
-    final int BLOCK_SIZE = 6 * 1024;
-    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
-    MiniDFSCluster cluster = 
-      new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-    DFSAdmin admin = new DFSAdmin(conf);
-
-    final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
-    final String webhdfsuri = WebHdfsConstants.WEBHDFS_SCHEME + "://" + nnAddr;
-    System.out.println("webhdfsuri=" + webhdfsuri);
-    final FileSystem webhdfs = new Path(webhdfsuri).getFileSystem(conf);
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
 
+    DFSAdmin admin = new DFSAdmin(conf);
+    Path dir = new Path(parent, "test");
+    Path file1 = new Path(parent, "test/test1");
+    Path file2 = new Path(parent, "test/test2");
+    boolean exceededQuota = false;
+    final int QUOTA_SIZE = 3 * DEFAULT_BLOCK_SIZE; // total space usage including
+                                           // repl.
+    final int FILE_SIZE = DEFAULT_BLOCK_SIZE / 2;
+    ContentSummary c;
+
+    // Create the directory and set the quota
+    assertTrue(dfs.mkdirs(dir));
+    runCommand(admin, false, "-setSpaceQuota", Integer.toString(QUOTA_SIZE),
+         dir.toString());
+
+    // Creating a file should use half the quota
+    DFSTestUtil.createFile(dfs, file1, FILE_SIZE, (short) 3, 1L);
+    DFSTestUtil.waitReplication(dfs, file1, (short) 3);
+    c = dfs.getContentSummary(dir);
+    compareQuotaUsage(c, dfs, dir);
+    checkContentSummary(c, webhdfs.getContentSummary(dir));
+    assertEquals("Quota is half consumed", QUOTA_SIZE / 2,
+                 c.getSpaceConsumed());
+
+    // We can not create the 2nd file because even though the total spaced
+    // used by two files (2 * 3 * 512/2) would fit within the quota (3 * 512)
+    // when a block for a file is created the space used is adjusted
+    // conservatively (3 * block size, ie assumes a full block is written)
+    // which will violate the quota (3 * block size) since we've already
+    // used half the quota for the first file.
     try {
-      Path dir = new Path("/test");
-      Path file1 = new Path("/test/test1");
-      Path file2 = new Path("/test/test2");
-      boolean exceededQuota = false;
-      final int QUOTA_SIZE = 3 * BLOCK_SIZE; // total space usage including
-                                             // repl.
-      final int FILE_SIZE = BLOCK_SIZE / 2;
-      ContentSummary c;
-      
-      // Create the directory and set the quota
-      assertTrue(fs.mkdirs(dir));
-      runCommand(admin, false, "-setSpaceQuota", Integer.toString(QUOTA_SIZE),
-	         dir.toString());
-
-      // Creating a file should use half the quota
-      DFSTestUtil.createFile(fs, file1, FILE_SIZE, (short) 3, 1L);
-      DFSTestUtil.waitReplication(fs, file1, (short) 3);
-      c = fs.getContentSummary(dir);
-      compareQuotaUsage(c, fs, dir);
-      checkContentSummary(c, webhdfs.getContentSummary(dir));
-      assertEquals("Quota is half consumed", QUOTA_SIZE / 2,
-                   c.getSpaceConsumed());
-
-      // We can not create the 2nd file because even though the total spaced
-      // used by two files (2 * 3 * 512/2) would fit within the quota (3 * 512)
-      // when a block for a file is created the space used is adjusted
-      // conservatively (3 * block size, ie assumes a full block is written)
-      // which will violate the quota (3 * block size) since we've already 
-      // used half the quota for the first file.
-      try {
-        DFSTestUtil.createFile(fs, file2, FILE_SIZE, (short) 3, 1L);
-      } catch (QuotaExceededException e) {
-        exceededQuota = true;
-      }
-      assertTrue("Quota not exceeded", exceededQuota);
-    } finally {
-      cluster.shutdown();
+      DFSTestUtil.createFile(dfs, file2, FILE_SIZE, (short) 3, 1L);
+    } catch (QuotaExceededException e) {
+      exceededQuota = true;
     }
+    assertTrue("Quota not exceeded", exceededQuota);
  }
 
  /**
@@ -963,26 +1052,30 @@ public class TestQuota {
   */
   @Test
   public void testMultipleFilesSmallerThanOneBlock() throws Exception {
-    Configuration conf = new HdfsConfiguration();
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
+
+    Configuration dfsConf = new HdfsConfiguration();
     final int BLOCK_SIZE = 6 * 1024;
-    conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    conf.setBoolean(HdfsClientConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+    dfsConf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     // Make it relinquish locks. When run serially, the result should
     // be identical.
-    conf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
-    MiniDFSCluster cluster = 
-      new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
-    cluster.waitActive();
-    FileSystem fs = cluster.getFileSystem();
-    DFSAdmin admin = new DFSAdmin(conf);
-
-    final String nnAddr = conf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
+    dfsConf.setInt(DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY, 2);
+    MiniDFSCluster dfsCluster =
+      new MiniDFSCluster.Builder(dfsConf).numDataNodes(3).build();
+    dfsCluster.waitActive();
+    FileSystem fs = dfsCluster.getFileSystem();
+    DFSAdmin admin = new DFSAdmin(dfsConf);
+
+    final String nnAddr = dfsConf.get(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY);
     final String webhdfsuri = WebHdfsConstants.WEBHDFS_SCHEME + "://" + nnAddr;
     System.out.println("webhdfsuri=" + webhdfsuri);
-    final FileSystem webhdfs = new Path(webhdfsuri).getFileSystem(conf);
+    final FileSystem webHDFS = new Path(webhdfsuri).getFileSystem(dfsConf);
     
     try {
-      Path dir = new Path("/test");
+      Path dir = new Path(parent, "test");
       boolean exceededQuota = false;
       ContentSummary c;
       // 1kb file
@@ -1006,7 +1099,7 @@ public class TestQuota {
       // need to leave at least 3 * BLOCK_SIZE free space when allocating
       // the last block: (58 * 3 * 1024) (3 * 6 * 1024) = 192kb
       for (int i = 0; i < 59; i++) {
-        Path file = new Path("/test/test"+i);
+        Path file = new Path(parent, "test/test"+i);
         DFSTestUtil.createFile(fs, file, FILE_SIZE, (short) 3, 1L);
         DFSTestUtil.waitReplication(fs, file, (short) 3);
       }
@@ -1014,7 +1107,7 @@ public class TestQuota {
       // Should account for all 59 files (almost QUOTA_SIZE)
       c = fs.getContentSummary(dir);
       compareQuotaUsage(c, fs, dir);
-      checkContentSummary(c, webhdfs.getContentSummary(dir));
+      checkContentSummary(c, webHDFS.getContentSummary(dir));
       assertEquals("Invalid space consumed", 59 * FILE_SIZE * 3,
           c.getSpaceConsumed());
       assertEquals("Invalid space consumed", QUOTA_SIZE - (59 * FILE_SIZE * 3),
@@ -1022,16 +1115,16 @@ public class TestQuota {
 
       // Now check that trying to create another file violates the quota
       try {
-        Path file = new Path("/test/test59");
+        Path file = new Path(parent, "test/test59");
         DFSTestUtil.createFile(fs, file, FILE_SIZE, (short) 3, 1L);
         DFSTestUtil.waitReplication(fs, file, (short) 3);
       } catch (QuotaExceededException e) {
         exceededQuota = true;
       }
       assertTrue("Quota not exceeded", exceededQuota);
-      assertEquals(2, cluster.getNamesystem().getFSDirectory().getYieldCount());
+      assertEquals(2, dfsCluster.getNamesystem().getFSDirectory().getYieldCount());
     } finally {
-      cluster.shutdown();
+      dfsCluster.shutdown();
     }
   }
 
@@ -1061,28 +1154,21 @@ public class TestQuota {
    */
   @Test
   public void testHugeFileCount() throws IOException {
-    MiniDFSCluster cluster = null;
-    Configuration conf = new Configuration();
-    conf.setInt("dfs.content-summary.limit", 4);
-    try {
-      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      for (int i = 1; i <= 5; i++) {
-        FSDataOutputStream out =
-            dfs.create(new Path("/Folder1/" + "file" + i),(short)1);
-        out.close();
-      }
-      FSDataOutputStream out = dfs.create(new Path("/Folder2/file6"),(short)1);
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(parent));
+
+    for (int i = 1; i <= 5; i++) {
+      FSDataOutputStream out =
+          dfs.create(new Path(parent, "Folder1/" + "file" + i),(short)1);
       out.close();
-      ContentSummary contentSummary = dfs.getContentSummary(new Path("/"));
-      compareQuotaUsage(contentSummary, dfs, new Path("/"));
-      assertEquals(6, contentSummary.getFileCount());
-    } finally {
-      if (cluster != null) {
-        cluster.shutdown();
-        cluster = null;
-      }
     }
+    FSDataOutputStream out = dfs.create(new Path(parent, "Folder2/file6"),(short)1);
+    out.close();
+    ContentSummary contentSummary = dfs.getContentSummary(parent);
+    compareQuotaUsage(contentSummary, dfs, parent);
+    assertEquals(6, contentSummary.getFileCount());
   }
 
   // check the QuotaUsage got from getContentSummary is the same as
@@ -1092,4 +1178,299 @@ public class TestQuota {
     QuotaUsage quotaUsage = fileSystem.getQuotaUsage(filePath);
     assertEquals(fromContentSummary, quotaUsage);
   }
+
+
+  /**
+   * Test to set space quote using negative number.
+   */
+  @Test(timeout = 30000)
+  public void testSetSpaceQuotaNegativeNumber() throws Exception {
+
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final Path dir = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+
+    final List<String> outs = Lists.newArrayList();
+
+    /* set space quota */
+    resetStream();
+    outs.clear();
+    final int ret = ToolRunner.run(
+        dfsAdmin,
+        new String[] {"-setSpaceQuota", "-10", dir.toString()});
+    assertEquals(-1, ret);
+    scanIntoList(ERR_STREAM, outs);
+    assertEquals(
+        "It should be two lines of error messages,"
+        + " the 1st one is about Illegal option,"
+        + " the 2nd one is about SetSpaceQuota usage.",
+        2, outs.size());
+    assertThat(outs.get(0),
+        is(allOf(containsString("setSpaceQuota"),
+            containsString("Illegal option"))));
+  }
+
+  /**
+   * Test to set and clear space quote, regular usage.
+   */
+  @Test(timeout = 30000)
+  public void testSetAndClearSpaceQuotaRegular() throws Exception {
+
+    final Path dir = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+
+    /* set space quota */
+    testSetAndClearSpaceQuotaRegularInternal(
+        new String[] {"-setSpaceQuota", "1024", dir.toString()},
+        dir,
+        0,
+        1024);
+
+    /* clear space quota */
+    testSetAndClearSpaceQuotaRegularInternal(
+        new String[] {"-clrSpaceQuota", dir.toString()},
+        dir,
+        0,
+        -1);
+  }
+
+  private void testSetAndClearSpaceQuotaRegularInternal(
+      final String[] args,
+      final Path dir,
+      final int cmdRet,
+      final int spaceQuota) throws Exception {
+
+    resetStream();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final List<String> outs = Lists.newArrayList();
+
+    final int ret = ToolRunner.run(dfsAdmin, args);
+    assertEquals(cmdRet, ret);
+    final QuotaUsage quotaUsage = dfs.getQuotaUsage(dir);
+    assertEquals(spaceQuota, quotaUsage.getSpaceQuota());
+    scanIntoList(OUT_STREAM, outs);
+    assertTrue(
+        "There should be no output if it runs successfully.",
+        outs.isEmpty());
+  }
+
+  /**
+   * Test to set and clear space quote by storage type.
+   */
+  @Test(timeout = 30000)
+  public void testSetAndClearSpaceQuotaByStorageType() throws Exception {
+
+    final Path dir = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+
+    /* set space quota */
+    testSetAndClearSpaceQuotaByStorageTypeInternal(
+        new String[] {
+            "-setSpaceQuota", "2048", "-storageType", "DISK",
+            dir.toString()},
+        dir,
+        0,
+        -1,
+        2048);
+
+    /* clear space quota */
+    testSetAndClearSpaceQuotaByStorageTypeInternal(
+        new String[] {
+            "-clrSpaceQuota", "-storageType", "DISK",
+            dir.toString()},
+        dir,
+        0,
+        -1,
+        -1);
+  }
+
+  private void testSetAndClearSpaceQuotaByStorageTypeInternal(
+      final String[] args,
+      final Path dir,
+      final int cmdRet,
+      final int spaceQuota,
+      final int spaceQuotaByStorageType) throws Exception {
+
+    resetStream();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final List<String> outs = Lists.newArrayList();
+
+    final int ret = ToolRunner.run(dfsAdmin, args);
+    assertEquals(cmdRet, ret);
+    final QuotaUsage quotaUsage = dfs.getQuotaUsage(dir);
+    assertEquals(spaceQuota, quotaUsage.getSpaceQuota());
+    assertEquals(
+        spaceQuotaByStorageType,
+        quotaUsage.getTypeQuota(StorageType.DISK));
+    scanIntoList(OUT_STREAM, outs);
+    assertTrue(
+        "There should be no output if it runs successfully.",
+        outs.isEmpty());
+  }
+
+  /**
+   * Test to set and clear space quote when directory doesn't exist.
+   */
+  @Test(timeout = 30000)
+  public void testSetAndClearSpaceQuotaDirecotryNotExist() throws Exception {
+    final Path dir = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+
+    /* set space quota */
+    testSetAndClearSpaceQuotaDirecotryNotExistInternal(
+        new String[] {"-setSpaceQuota", "1024", dir.toString()},
+        dir,
+        -1,
+        "setSpaceQuota");
+
+    /* clear space quota */
+    testSetAndClearSpaceQuotaDirecotryNotExistInternal(
+        new String[] {"-clrSpaceQuota", dir.toString()},
+        dir,
+        -1,
+        "clrSpaceQuota");
+  }
+
+  private void testSetAndClearSpaceQuotaDirecotryNotExistInternal(
+      final String[] args,
+      final Path dir,
+      final int cmdRet,
+      final String cmdName) throws Exception {
+
+    resetStream();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final List<String> outs = Lists.newArrayList();
+
+    final int ret = ToolRunner.run(dfsAdmin, args);
+    assertEquals(cmdRet, ret);
+    scanIntoList(ERR_STREAM, outs);
+    assertEquals(
+        "It should be one line error message like: clrSpaceQuota:"
+            + " Directory does not exist: <full path of XXX directory>",
+        1, outs.size());
+    assertThat(outs.get(0),
+        is(allOf(containsString(cmdName),
+            containsString("does not exist"),
+            containsString(dir.toString()))));
+  }
+
+  /**
+   * Test to set and clear space quote when path is a file.
+   */
+  @Test (timeout = 30000)
+  public void testSetAndClearSpaceQuotaPathIsFile() throws Exception {
+
+    final Path parent = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    final Path file = new Path(parent, "path-is-file");
+    DFSTestUtil.createFile(dfs, file, 1024L, (short) 1L, 0);
+    assertTrue(dfs.isFile(file));
+
+    /* set space quota */
+    testSetAndClearSpaceQuotaPathIsFileInternal(
+        new String[] {"-setSpaceQuota", "1024", file.toString()},
+        file,
+        -1,
+        "setSpaceQuota");
+
+    /* clear space quota */
+    testSetAndClearSpaceQuotaPathIsFileInternal(
+        new String[] {"-clrSpaceQuota", file.toString()},
+        file,
+        -1,
+        "clrSpaceQuota");
+  }
+
+  private void testSetAndClearSpaceQuotaPathIsFileInternal(
+      final String[] args,
+      final Path file,
+      final int cmdRet,
+      final String cmdName) throws Exception {
+
+    resetStream();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final List<String> outs = Lists.newArrayList();
+
+    final int ret = ToolRunner.run(dfsAdmin, args);
+    assertEquals(cmdRet, ret);
+    scanIntoList(ERR_STREAM, outs);
+    assertEquals(
+        "It should be one line error message like: clrSpaceQuota:"
+            + " <full path of XXX file> is not a directory",
+        1, outs.size());
+    assertThat(outs.get(0),
+        is(allOf(containsString(cmdName),
+            containsString(file.toString()),
+            containsString("Is not a directory"))));
+  }
+
+  /**
+   * Test to set and clear space quote when user has no access right.
+   */
+  @Test(timeout = 30000)
+  public void testSetAndClearSpaceQuotaNoAccess() throws Exception {
+
+    final Path dir = new Path(
+        PathUtils.getTestPath(getClass()),
+        GenericTestUtils.getMethodName());
+    assertTrue(dfs.mkdirs(dir));
+
+    /* set space quota */
+    testSetAndClearSpaceQuotaNoAccessInternal(
+        new String[] {"-setSpaceQuota", "2048", dir.toString()},
+        -1,
+        "setSpaceQuota");
+
+    /* clear space quota */
+    testSetAndClearSpaceQuotaNoAccessInternal(
+        new String[] {"-clrSpaceQuota", dir.toString()},
+        -1,
+        "clrSpaceQuota");
+  }
+
+  private void testSetAndClearSpaceQuotaNoAccessInternal(
+      final String[] args,
+      final int cmdRet,
+      final String cmdName) throws Exception {
+
+    resetStream();
+    final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+    final List<String> outs = Lists.newArrayList();
+
+    final UserGroupInformation whoever =
+        UserGroupInformation.createUserForTesting(
+            "whoever",
+            new String[] {"whoever_group"});
+
+    final int ret = whoever.doAs(new PrivilegedExceptionAction<Integer>() {
+      @Override
+      public Integer run() throws Exception {
+        return ToolRunner.run(dfsAdmin, args);
+      }
+    });
+    assertEquals(cmdRet, ret);
+    scanIntoList(ERR_STREAM, outs);
+    assertThat(outs.get(0),
+        is(allOf(containsString(cmdName),
+            containsString("Access denied for user whoever"),
+            containsString("Superuser privilege is required"))));
+  }
+
+  private static void scanIntoList(
+      final ByteArrayOutputStream baos,
+      final List<String> list) {
+    final Scanner scanner = new Scanner(baos.toString());
+    while (scanner.hasNextLine()) {
+      list.add(scanner.nextLine());
+    }
+    scanner.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e0fc749a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 4bcca85..b54d3a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -56,6 +56,9 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+/**
+ * set/clrSpaceQuote are tested in {@link org.apache.hadoop.hdfs.TestQuota}.
+ */
 public class TestDFSAdmin {
   private Configuration conf = null;
   private MiniDFSCluster cluster;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message