hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r731807 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/hdfs/org/apache/hadoop/hdfs/tools/ src/test/org/apache/hadoop/hdfs/ src/test/org/apache/hadoop/hdfs/server/namenode/
Date Tue, 06 Jan 2009 01:37:28 GMT
Author: szetszwo
Date: Mon Jan  5 17:37:27 2009
New Revision: 731807

URL: http://svn.apache.org/viewvc?rev=731807&view=rev
Log:
HADOOP-4268. Change fsck to use ClientProtocol methods for enforcing permissions.  (szetszwo)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=731807&r1=731806&r2=731807&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Jan  5 17:37:27 2009
@@ -12,6 +12,10 @@
 
   NEW FEATURES
 
+    HADOOP-4268. Change fsck to use ClientProtocol methods so that the
+    corresponding permission requirement for running the ClientProtocol
+    methods will be enforced.  (szetszwo)
+
   IMPROVEMENTS
 
     HADOOP-4936. Improvements to TestSafeMode. (shv)

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=731807&r1=731806&r2=731807&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Mon
Jan  5 17:37:27 2009
@@ -17,29 +17,46 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.util.*;
-import java.io.*;
-import org.apache.hadoop.conf.*;
-import org.apache.commons.logging.*;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Map;
+
 import javax.servlet.ServletContext;
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
+
 /**
- * This class is used in Namesystem's jetty to do fsck on namenode.
+ * This class is used in Namesystem's web server to do fsck on namenode.
  */
-public class FsckServlet extends HttpServlet {
-  @SuppressWarnings("unchecked")
-  public void doGet(HttpServletRequest request,
-                    HttpServletResponse response
-                    ) throws ServletException, IOException {
-    Map<String,String[]> pmap = request.getParameterMap();
-    ServletContext context = getServletContext();
-    NameNode nn = (NameNode) context.getAttribute("name.node");
-    Configuration conf = (Configuration) context.getAttribute("name.conf");
-    NamenodeFsck fscker = new NamenodeFsck(conf, nn, pmap, response);
-    fscker.fsck();
+public class FsckServlet extends DfsServlet {
+  /** for java.io.Serializable */
+  private static final long serialVersionUID = 1L;
+
+  /** Handle fsck request */
+  public void doGet(HttpServletRequest request, HttpServletResponse response
+      ) throws IOException {
+    @SuppressWarnings("unchecked")
+    final Map<String,String[]> pmap = request.getParameterMap();
+    final PrintWriter out = response.getWriter();
+
+    final UnixUserGroupInformation ugi = getUGI(request);
+    UserGroupInformation.setCurrentUser(ugi);
+
+    final ServletContext context = getServletContext();
+    final Configuration conf = new Configuration((Configuration) context.getAttribute("name.conf"));
+    UnixUserGroupInformation.saveToConf(conf,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME, ugi);
+
+    final NameNode nn = (NameNode) context.getAttribute("name.node");
+    final int totalDatanodes = nn.namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE);

+    final short minReplication = nn.namesystem.getMinReplication();
+
+    new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
+        totalDatanodes, minReplication).fsck();
   }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=731807&r1=731806&r2=731807&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon
Jan  5 17:37:27 2009
@@ -24,24 +24,25 @@
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeSet;
-import javax.servlet.http.HttpServletResponse;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.permission.PermissionStatus;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.NodeBase;
 
 /**
  * This class provides rudimentary checking of DFS volumes for errors and
@@ -81,7 +82,11 @@
   /** Delete corrupted files. */
   public static final int FIXING_DELETE = 2;
   
-  private NameNode nn;
+  private final ClientProtocol namenode;
+  private final NetworkTopology networktopology;
+  private final int totalDatanodes;
+  private final short minReplication;
+
   private String lostFound = null;
   private boolean lfInited = false;
   private boolean lfInitedOk = false;
@@ -93,8 +98,8 @@
   private int fixing = FIXING_NONE;
   private String path = "/";
   
-  private Configuration conf;
-  private PrintWriter out;
+  private final Configuration conf;
+  private final PrintWriter out;
   
   /**
    * Filesystem checker.
@@ -104,13 +109,17 @@
    * @param response the object into which  this servelet writes the url contents
    * @throws IOException
    */
-  public NamenodeFsck(Configuration conf,
-                      NameNode nn,
-                      Map<String,String[]> pmap,
-                      HttpServletResponse response) throws IOException {
+  NamenodeFsck(Configuration conf, ClientProtocol namenode,
+      NetworkTopology networktopology, 
+      Map<String,String[]> pmap, PrintWriter out,
+      int totalDatanodes, short minReplication) {
     this.conf = conf;
-    this.nn = nn;
-    this.out = response.getWriter();
+    this.namenode = namenode;
+    this.networktopology = networktopology;
+    this.out = out;
+    this.totalDatanodes = totalDatanodes;
+    this.minReplication = minReplication;
+
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
       if (key.equals("path")) { this.path = pmap.get("path")[0]; }
@@ -126,21 +135,20 @@
   
   /**
    * Check files on DFS, starting from the indicated path.
-   * @throws Exception
    */
-  public void fsck() throws IOException {
+  public void fsck() {
     try {
-      FileStatus[] files = nn.namesystem.dir.getListing(path);
-      FsckResult res = new FsckResult();
-      res.totalRacks = nn.getNetworkTopology().getNumOfRacks();
-      res.totalDatanodes = nn.namesystem.getNumberOfDatanodes(
-          DatanodeReportType.LIVE);
-      res.setReplication((short) conf.getInt("dfs.replication", 3));
+      Result res = new Result(conf);
+
+      final FileStatus[] files = namenode.getListing(path);
       if (files != null) {
         for (int i = 0; i < files.length; i++) {
           check(files[i], res);
         }
         out.println(res);
+        out.println(" Number of data-nodes:\t\t" + totalDatanodes);
+        out.println(" Number of racks:\t\t" + networktopology.getNumOfRacks());
+
         // DFSck client scans for the string HEALTHY/CORRUPT to check the status
         // of file system and return appropriate code. Changing the output string
         // might break testcases. 
@@ -162,13 +170,12 @@
     }
   }
   
-  private void check(FileStatus file, FsckResult res) throws IOException {
-    int minReplication = nn.namesystem.getMinReplication();
+  private void check(FileStatus file, Result res) throws IOException {
     String path = file.getPath().toString();
     boolean isOpen = false;
 
     if (file.isDir()) {
-      FileStatus[] files = nn.namesystem.dir.getListing(path);
+      final FileStatus[] files = namenode.getListing(path);
       if (files == null) {
         return;
       }
@@ -182,7 +189,7 @@
       return;
     }
     long fileLen = file.getLen();
-    LocatedBlocks blocks = nn.namesystem.getBlockLocations(path, 0, fileLen);
+    LocatedBlocks blocks = namenode.getBlockLocations(path, 0, fileLen);
     if (blocks == null) { // the file is deleted
       return;
     }
@@ -247,7 +254,7 @@
       }
       // verify block placement policy
       int missingRacks = ReplicationTargetChooser.verifyBlockPlacement(
-                    lBlk, targetFileReplication, nn.getNetworkTopology());
+                    lBlk, targetFileReplication, networktopology);
       if (missingRacks > 0) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;
@@ -300,7 +307,7 @@
         break;
       case FIXING_DELETE:
         if (!isOpen)
-          nn.namesystem.deleteInternal(path, false);
+          namenode.delete(path, true);
       }
     }
     if (showFiles) {
@@ -328,9 +335,7 @@
     String target = lostFound + file.getPath();
     String errmsg = "Failed to move " + file.getPath() + " to /lost+found";
     try {
-      PermissionStatus ps = new PermissionStatus(
-          file.getOwner(), file.getGroup(), file.getPermission()); 
-      if (!nn.namesystem.dir.mkdirs(target, ps, false, FSNamesystem.now())) {
+      if (!namenode.mkdirs(target, file.getPermission())) {
         LOG.warn(errmsg);
         return;
       }
@@ -513,21 +518,12 @@
       lfInitedOk = false;
     }
   }
-  
-  /**
-   * @param args
-   */
-  public int run(String[] args) throws Exception {
-    
-    return 0;
-  }
-  
+
   /**
    * FsckResult of checking, plus overall DFS statistics.
-   *
    */
-  public static class FsckResult {
-    private ArrayList<String> missingIds = new ArrayList<String>();
+  private static class Result {
+    private List<String> missingIds = new ArrayList<String>();
     private long missingSize = 0L;
     private long corruptFiles = 0L;
     private long corruptBlocks = 0L;
@@ -537,7 +533,6 @@
     private long numUnderReplicatedBlocks = 0L;
     private long numMisReplicatedBlocks = 0L;  // blocks that do not satisfy block placement
policy
     private long numMinReplicatedBlocks = 0L;  // minimally replicatedblocks
-    private int replication = 0;
     private long totalBlocks = 0L;
     private long totalOpenFilesBlocks = 0L;
     private long totalFiles = 0L;
@@ -546,138 +541,34 @@
     private long totalSize = 0L;
     private long totalOpenFilesSize = 0L;
     private long totalReplicas = 0L;
-    private int totalDatanodes = 0;
-    private int totalRacks = 0;
+
+    final short replication;
+    
+    private Result(Configuration conf) {
+      this.replication = (short)conf.getInt("dfs.replication", 3);
+    }
     
     /**
      * DFS is considered healthy if there are no missing blocks.
      */
-    public boolean isHealthy() {
+    boolean isHealthy() {
       return ((missingIds.size() == 0) && (corruptBlocks == 0));
     }
     
     /** Add a missing block name, plus its size. */
-    public void addMissing(String id, long size) {
+    void addMissing(String id, long size) {
       missingIds.add(id);
       missingSize += size;
     }
     
-    /** Return a list of missing block names (as list of Strings). */
-    public ArrayList<String> getMissingIds() {
-      return missingIds;
-    }
-    
-    /** Return total size of missing data, in bytes. */
-    public long getMissingSize() {
-      return missingSize;
-    }
-
-    public void setMissingSize(long missingSize) {
-      this.missingSize = missingSize;
-    }
-    
-    /** Return the number of over-replicated blocks. */
-    public long getExcessiveReplicas() {
-      return excessiveReplicas;
-    }
-    
-    public void setExcessiveReplicas(long overReplicatedBlocks) {
-      this.excessiveReplicas = overReplicatedBlocks;
-    }
-    
     /** Return the actual replication factor. */
-    public float getReplicationFactor() {
+    float getReplicationFactor() {
       if (totalBlocks == 0)
         return 0.0f;
       return (float) (totalReplicas) / (float) totalBlocks;
     }
     
-    /** Return the number of under-replicated blocks. Note: missing blocks are not counted
here.*/
-    public long getMissingReplicas() {
-      return missingReplicas;
-    }
-    
-    public void setMissingReplicas(long underReplicatedBlocks) {
-      this.missingReplicas = underReplicatedBlocks;
-    }
-    
-    /** Return total number of directories encountered during this scan. */
-    public long getTotalDirs() {
-      return totalDirs;
-    }
-    
-    public void setTotalDirs(long totalDirs) {
-      this.totalDirs = totalDirs;
-    }
-    
-    /** Return total number of files encountered during this scan. */
-    public long getTotalFiles() {
-      return totalFiles;
-    }
-    
-    public void setTotalFiles(long totalFiles) {
-      this.totalFiles = totalFiles;
-    }
-    
-    /** Return total number of files opened for write encountered during this scan. */
-    public long getTotalOpenFiles() {
-      return totalOpenFiles;
-    }
-
-    /** Set total number of open files encountered during this scan. */
-    public void setTotalOpenFiles(long totalOpenFiles) {
-      this.totalOpenFiles = totalOpenFiles;
-    }
-    
-    /** Return total size of scanned data, in bytes. */
-    public long getTotalSize() {
-      return totalSize;
-    }
-    
-    public void setTotalSize(long totalSize) {
-      this.totalSize = totalSize;
-    }
-    
-    /** Return total size of open files data, in bytes. */
-    public long getTotalOpenFilesSize() {
-      return totalOpenFilesSize;
-    }
-    
-    public void setTotalOpenFilesSize(long totalOpenFilesSize) {
-      this.totalOpenFilesSize = totalOpenFilesSize;
-    }
-    
-    /** Return the intended replication factor, against which the over/under-
-     * replicated blocks are counted. Note: this values comes from the current
-     * Configuration supplied for the tool, so it may be different from the
-     * value in DFS Configuration.
-     */
-    public int getReplication() {
-      return replication;
-    }
-    
-    public void setReplication(int replication) {
-      this.replication = replication;
-    }
-    
-    /** Return the total number of blocks in the scanned area. */
-    public long getTotalBlocks() {
-      return totalBlocks;
-    }
-    
-    public void setTotalBlocks(long totalBlocks) {
-      this.totalBlocks = totalBlocks;
-    }
-    
-    /** Return the total number of blocks held by open files. */
-    public long getTotalOpenFilesBlocks() {
-      return totalOpenFilesBlocks;
-    }
-    
-    public void setTotalOpenFilesBlocks(long totalOpenFilesBlocks) {
-      this.totalOpenFilesBlocks = totalOpenFilesBlocks;
-    }
-    
+    /** {@inheritDoc} */
     public String toString() {
       StringBuffer res = new StringBuffer();
       res.append("Status: " + (isHealthy() ? "HEALTHY" : "CORRUPT"));
@@ -720,18 +611,7 @@
       res.append("\n Corrupt blocks:\t\t" + corruptBlocks);
       res.append("\n Missing replicas:\t\t" + missingReplicas);
       if (totalReplicas > 0)        res.append(" (" + ((float) (missingReplicas * 100)
/ (float) totalReplicas) + " %)");
-      res.append("\n Number of data-nodes:\t\t" + totalDatanodes);
-      res.append("\n Number of racks:\t\t" + totalRacks);
       return res.toString();
     }
-    
-    /** Return the number of currupted files. */
-    public long getCorruptFiles() {
-      return corruptFiles;
-    }
-    
-    public void setCorruptFiles(long corruptFiles) {
-      this.corruptFiles = corruptFiles;
-    }
   }
 }

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java?rev=731807&r1=731806&r2=731807&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/tools/DFSck.java Mon Jan  5 17:37:27
2009
@@ -17,18 +17,22 @@
  */
 package org.apache.hadoop.hdfs.tools;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.io.BufferedReader;
 import java.net.URL;
 import java.net.URLConnection;
 import java.net.URLEncoder;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
@@ -57,16 +61,21 @@
  *  
  */
 public class DFSck extends Configured implements Tool {
+  static{
+    Configuration.addDefaultResource("hdfs-default.xml");
+    Configuration.addDefaultResource("hdfs-site.xml");
+  }
+
+  private final UserGroupInformation ugi;
 
-  DFSck() {}
-  
   /**
    * Filesystem checker.
    * @param conf current Configuration
-   * @throws Exception
+   * @throws LoginException if login failed 
    */
-  public DFSck(Configuration conf) throws Exception {
+  public DFSck(Configuration conf) throws LoginException {
     super(conf);
+    this.ugi = UnixUserGroupInformation.login(conf, true);
   }
   
   private String getInfoServer() throws IOException {
@@ -96,13 +105,15 @@
   /**
    * @param args
    */
-  public int run(String[] args) throws Exception {
-    String fsName = getInfoServer();
+  public int run(String[] args) throws IOException {
     if (args.length == 0) {
       printUsage();
       return -1;
     }
-    StringBuffer url = new StringBuffer("http://"+fsName+"/fsck?path=");
+
+    final StringBuffer url = new StringBuffer("http://");
+    url.append(getInfoServer()).append("/fsck?ugi=").append(ugi).append("&path=");
+
     String dir = "/";
     // find top-level dir first
     for (int idx = 0; idx < args.length; idx++) {

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=731807&r1=731806&r2=731807&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/DFSTestUtil.java Mon Jan  5 17:37:27
2009
@@ -23,19 +23,21 @@
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.Random;
-import junit.framework.TestCase;
-import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.io.IOUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation;
 
-/**
- */
-public class DFSTestUtil extends TestCase {
+/** Utilities for HDFS tests */
+public class DFSTestUtil {
   
   private static Random gen = new Random();
   private static String[] dirNames = {
@@ -256,4 +258,16 @@
     in.close();      
     return b.toString();
   }
+
+  static public Configuration getConfigurationWithDifferentUsername(Configuration conf
+      ) throws IOException {
+    final Configuration c = new Configuration(conf);
+    final UserGroupInformation ugi = UserGroupInformation.getCurrentUGI();
+    final String username = ugi.getUserName()+"_XXX";
+    final String[] groups = {ugi.getGroupNames()[0] + "_XXX"};
+    UnixUserGroupInformation.saveToConf(c,
+        UnixUserGroupInformation.UGI_PROPERTY_NAME,
+        new UnixUserGroupInformation(username, groups));
+    return c;
+  }
 }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=731807&r1=731806&r2=731807&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Mon Jan
 5 17:37:27 2009
@@ -19,30 +19,30 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
 import java.io.File;
-import java.io.RandomAccessFile;
-import java.lang.Exception;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.util.Random;
 
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.log4j.Level;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSck;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
 
 /**
  * A JUnit test for doing fsck
@@ -122,6 +122,39 @@
     }
   }
 
+  /** Test fsck with permission set on inodes */
+  public void testFsckPermission() throws Exception {
+    final DFSTestUtil util = new DFSTestUtil(getClass().getSimpleName(), 20, 3, 8*1024);
+    final Configuration conf = new Configuration();
+    conf.setLong("dfs.blockreport.intervalMsec", 10000L);
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster(conf, 4, true, null);
+
+      final FileSystem fs = cluster.getFileSystem();
+      final String dir = "/dfsck";
+      final Path dirpath = new Path(dir);
+      util.createFiles(fs, dir);
+      util.waitReplication(fs, dir, (short)3);
+      fs.setPermission(dirpath, new FsPermission((short)0700));
+
+      //run DFSck as another user
+      final Configuration c2 = DFSTestUtil.getConfigurationWithDifferentUsername(conf);
+      System.out.println(runFsck(c2, -1, true, dir));
+
+      //set permission and try DFSck again
+      fs.setPermission(dirpath, new FsPermission((short)0777));
+      final String outStr = runFsck(c2, 0, true, dir);
+      System.out.println(outStr);
+      assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
+
+      util.cleanup(fs, dir);
+    } finally {
+      if (cluster != null) { cluster.shutdown(); }
+    }
+  }
+
   public void testFsckMove() throws Exception {
     DFSTestUtil util = new DFSTestUtil("TestFsck", 5, 3, 8*1024);
     MiniDFSCluster cluster = null;



Mime
View raw message