hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r967293 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/
Date Fri, 23 Jul 2010 22:50:51 GMT
Author: suresh
Date: Fri Jul 23 22:50:51 2010
New Revision: 967293

URL: http://svn.apache.org/viewvc?rev=967293&view=rev
Log:
HDFS-1315. Add fsck event to audit log and remove other audit log events corresponding to
FSCK listStatus and open calls. Contributed by Suresh Srinivas.


Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Fri Jul 23 22:50:51 2010
@@ -88,6 +88,8 @@ Trunk (unreleased changes)
     HDFS-1302. The HDFS side of the changes corresponding to HADOOP-6861.
     (Jitendra Pandey & Owen O'Malley via ddas)
     
+    HDFS-1315. Add fsck event to audit log and remove other audit log events 
+    corresponding to FSCK listStatus and open calls. (suresh)
 
   OPTIMIZATIONS
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri
Jul 23 22:50:51 2010
@@ -675,7 +675,7 @@ public class FSNamesystem implements FSC
     checkOwner(src);
     dir.setPermission(src, permission);
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -704,7 +704,7 @@ public class FSNamesystem implements FSC
     }
     dir.setOwner(src, username, group);
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -719,7 +719,7 @@ public class FSNamesystem implements FSC
   LocatedBlocks getBlockLocations(String clientMachine, String src,
       long offset, long length) throws AccessControlException,
       FileNotFoundException, UnresolvedLinkException, IOException {
-    LocatedBlocks blocks = getBlockLocations(src, offset, length, true);
+    LocatedBlocks blocks = getBlockLocations(src, offset, length, true, true);
     if (blocks != null) {
       //sort the blocks
       DatanodeDescriptor client = host2DataNodeMap.getDatanodeByHost(
@@ -737,7 +737,7 @@ public class FSNamesystem implements FSC
    * @throws FileNotFoundException
    */
   LocatedBlocks getBlockLocations(String src, long offset, long length,
-      boolean doAccessTime) throws FileNotFoundException,
+      boolean doAccessTime, boolean needBlockToken) throws FileNotFoundException,
       UnresolvedLinkException, IOException {
     if (isPermissionEnabled) {
       checkPathAccess(src, FsAction.READ);
@@ -752,8 +752,8 @@ public class FSNamesystem implements FSC
           "Negative length is not supported. File: " + src);
     }
     final LocatedBlocks ret = getBlockLocationsInternal(src,
-        offset, length, doAccessTime);  
-    if (auditLog.isInfoEnabled()) {
+        offset, length, doAccessTime, needBlockToken);  
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "open", src, null, null);
@@ -764,7 +764,8 @@ public class FSNamesystem implements FSC
   private synchronized LocatedBlocks getBlockLocationsInternal(String src,
                                                        long offset, 
                                                        long length,
-                                                       boolean doAccessTime)
+                                                       boolean doAccessTime, 
+                                                       boolean needBlockToken)
       throws FileNotFoundException, UnresolvedLinkException, IOException {
     INodeFile inode = dir.getFileINode(src);
     if (inode == null)
@@ -793,7 +794,9 @@ public class FSNamesystem implements FSC
         LOG.debug("last = " + last);
       }
 
-      if(isBlockTokenEnabled) setBlockTokens(locatedblocks);
+      if(isBlockTokenEnabled && needBlockToken) {
+        setBlockTokens(locatedblocks);
+      }
 
       if (last.isComplete()) {
         return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
@@ -957,7 +960,7 @@ public class FSNamesystem implements FSC
     getEditLog().logSync();
    
     
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(target, false);
       logAuditEvent(UserGroupInformation.getLoginUser(),
                     Server.getRemoteIp(),
@@ -985,7 +988,7 @@ public class FSNamesystem implements FSC
     INodeFile inode = dir.getFileINode(src);
     if (inode != null) {
       dir.setTimes(src, inode, mtime, atime, true);
-      if (auditLog.isInfoEnabled()) {
+      if (auditLog.isInfoEnabled() && isExternalInvocation()) {
         final HdfsFileStatus stat = dir.getFileInfo(src, false);
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
@@ -1007,7 +1010,7 @@ public class FSNamesystem implements FSC
     }
     createSymlinkInternal(target, link, dirPerms, createParent);
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(link, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -1063,7 +1066,7 @@ public class FSNamesystem implements FSC
     throws IOException, UnresolvedLinkException {
     boolean status = setReplicationInternal(src, replication);
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled()) {
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "setReplication", src, null, null);
@@ -1148,7 +1151,7 @@ public class FSNamesystem implements FSC
     startFileInternal(src, permissions, holder, clientMachine, flag,
         createParent, replication, blockSize);
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -1407,7 +1410,7 @@ public class FSNamesystem implements FSC
       }
     }
 
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "append", src, null, null);
@@ -1695,7 +1698,7 @@ public class FSNamesystem implements FSC
     throws IOException, UnresolvedLinkException {
     boolean status = renameToInternal(src, dst);
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled()) {
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(dst, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -1738,7 +1741,7 @@ public class FSNamesystem implements FSC
       throws IOException, UnresolvedLinkException {
     renameToInternal(src, dst, options);
     getEditLog().logSync();
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       StringBuilder cmd = new StringBuilder("rename options=");
       for (Rename option : options) {
         cmd.append(option.value()).append(" ");
@@ -1787,7 +1790,7 @@ public class FSNamesystem implements FSC
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
       }
       boolean status = deleteInternal(src, true);
-      if (status && auditLog.isInfoEnabled()) {
+      if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
                       "delete", src, null, null);
@@ -1895,7 +1898,7 @@ public class FSNamesystem implements FSC
       boolean createParent) throws IOException, UnresolvedLinkException {
     boolean status = mkdirsInternal(src, permissions, createParent);
     getEditLog().logSync();
-    if (status && auditLog.isInfoEnabled()) {
+    if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
       final HdfsFileStatus stat = dir.getFileInfo(src, false);
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
@@ -2268,7 +2271,7 @@ public class FSNamesystem implements FSC
         checkTraverse(src);
       }
     }
-    if (auditLog.isInfoEnabled()) {
+    if (auditLog.isInfoEnabled() && isExternalInvocation()) {
       logAuditEvent(UserGroupInformation.getCurrentUser(),
                     Server.getRemoteIp(),
                     "listStatus", src, null, null);
@@ -4679,4 +4682,24 @@ public class FSNamesystem implements FSC
     }
     return authMethod;
   }
+  
+  /**
+   * If the remote IP for namenode method invokation is null, then the
+   * invocation is internal to the namenode. Client invoked methods are invoked
+   * over RPC and always have address != null.
+   */
+  private boolean isExternalInvocation() {
+    return Server.getRemoteIp() != null;
+  }
+  
+  /**
+   * Log fsck event in the audit log 
+   */
+  void logFsckEvent(String src, InetAddress remoteAddress) throws IOException {
+    if (auditLog.isInfoEnabled()) {
+      logAuditEvent(UserGroupInformation.getCurrentUser(),
+                    remoteAddress,
+                    "fsck", src, null, null);
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FsckServlet.java Fri
Jul 23 22:50:51 2010
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 
@@ -46,6 +47,8 @@ public class FsckServlet extends DfsServ
     @SuppressWarnings("unchecked")
     final Map<String,String[]> pmap = request.getParameterMap();
     final PrintWriter out = response.getWriter();
+    final InetAddress remoteAddress = 
+      InetAddress.getByName(request.getRemoteAddr());
     final Configuration conf = 
       (Configuration) getServletContext().getAttribute(JspHelper.CURRENT_CONF);
 
@@ -64,7 +67,7 @@ public class FsckServlet extends DfsServ
           final short minReplication = namesystem.getMinReplication();
 
           new NamenodeFsck(conf, nn, nn.getNetworkTopology(), pmap, out,
-              totalDatanodes, minReplication).fsck();
+              totalDatanodes, minReplication, remoteAddress).fsck();
           
           return null;
         }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Jul
23 22:50:51 2010
@@ -719,19 +719,6 @@ public class NameNode implements Namenod
                                         src, offset, length);
   }
   
-  /**
-   * The specification of this method matches that of
-   * {@link getBlockLocations(Path)}
-   * except that it does not update the file's access time.
-   */
-  LocatedBlocks getBlockLocationsNoATime(String src, 
-                                         long offset, 
-                                         long length)
-      throws IOException {
-    myMetrics.numGetBlockLocations.inc();
-    return namesystem.getBlockLocations(src, offset, length, false);
-  }
-  
   private static String getClientMachine() {
     String clientMachine = Server.getRemoteAddress();
     if (clientMachine == null) {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Fri
Jul 23 22:50:51 2010
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.na
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintWriter;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
@@ -49,6 +50,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * This class provides rudimentary checking of DFS volumes for errors and
@@ -93,6 +95,7 @@ public class NamenodeFsck {
   private final NetworkTopology networktopology;
   private final int totalDatanodes;
   private final short minReplication;
+  private final InetAddress remoteAddress;
 
   private String lostFound = null;
   private boolean lfInited = false;
@@ -113,20 +116,24 @@ public class NamenodeFsck {
    * Filesystem checker.
    * @param conf configuration (namenode config)
    * @param nn namenode that this fsck is going to use
-   * @param pmap key=value[] map that is passed to the http servlet as url parameters
-   * @param response the object into which  this servelet writes the url contents
+   * @param pmap key=value[] map passed to the http servlet as url parameters
+   * @param out output stream to write the fsck output
+   * @param totalDatanodes number of live datanodes
+   * @param minReplication minimum replication
+   * @param remoteAddress source address of the fsck request
    * @throws IOException
    */
   NamenodeFsck(Configuration conf, NameNode namenode,
       NetworkTopology networktopology, 
       Map<String,String[]> pmap, PrintWriter out,
-      int totalDatanodes, short minReplication) {
+      int totalDatanodes, short minReplication, InetAddress remoteAddress) {
     this.conf = conf;
     this.namenode = namenode;
     this.networktopology = networktopology;
     this.out = out;
     this.totalDatanodes = totalDatanodes;
     this.minReplication = minReplication;
+    this.remoteAddress = remoteAddress;
 
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -148,7 +155,11 @@ public class NamenodeFsck {
   public void fsck() {
     final long startTime = System.currentTimeMillis();
     try {
-      out.println("Namenode FSCK started at " + new Date());
+      String msg = "FSCK started by " + UserGroupInformation.getCurrentUser()
+          + " from " + remoteAddress + " for path " + path + " at " + new Date();
+      LOG.info(msg);
+      out.println(msg);
+      namenode.getNamesystem().logFsckEvent(path, remoteAddress);
 
       final HdfsFileStatus file = namenode.getFileInfo(path);
       if (file != null) {
@@ -182,7 +193,6 @@ public class NamenodeFsck {
       } else {
         out.print("\n\nPath '" + path + "' " + NONEXISTENT_STATUS);
       }
-
     } catch (Exception e) {
       String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
       LOG.warn(errMsg, e);
@@ -270,7 +280,10 @@ public class NamenodeFsck {
       return;
     }
     long fileLen = file.getLen();
-    LocatedBlocks blocks = namenode.getBlockLocationsNoATime(path, 0, fileLen);
+    // Get block locations without updating the file access time 
+    // and without block access tokens
+    LocatedBlocks blocks = namenode.getNamesystem().getBlockLocations(path, 0,
+        fileLen, false, false);
     if (blocks == null) { // the file is deleted
       return;
     }

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
(original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
Fri Jul 23 22:50:51 2010
@@ -38,7 +38,7 @@ public class NameNodeAdapter {
   public static LocatedBlocks getBlockLocations(NameNode namenode,
       String src, long offset, long length) throws IOException {
     return namenode.getNamesystem().getBlockLocations(
-        src, offset, length, false);
+        src, offset, length, false, true);
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java?rev=967293&r1=967292&r2=967293&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java Fri
Jul 23 22:50:51 2010
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.RandomAccessFile;
@@ -27,6 +29,7 @@ import java.net.InetSocketAddress;
 import java.nio.channels.FileChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.Random;
+import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 
@@ -52,11 +55,25 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.RollingFileAppender;
 
 /**
  * A JUnit test for doing fsck
  */
 public class TestFsck extends TestCase {
+  static final String auditLogFile = System.getProperty("test.build.dir",
+      "build/test") + "/audit.log";
+  
+  // Pattern for: 
+  // ugi=name ip=/address cmd=FSCK src=/ dst=null perm=null
+  static final Pattern fsckPattern = Pattern.compile(
+      "ugi=.*?\\s" + 
+      "ip=/\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\s" + 
+      "cmd=fsck\\ssrc=\\/\\sdst=null\\s" + 
+      "perm=null");
+  
   static String runFsck(Configuration conf, int expectedErrCode, 
                         boolean checkErrorCode,String... path) 
                         throws Exception {
@@ -91,7 +108,9 @@ public class TestFsck extends TestCase {
       final Path file = new Path(fileName);
       long aTime = fs.getFileStatus(file).getAccessTime();
       Thread.sleep(precision);
+      setupAuditLogs();
       String outStr = runFsck(conf, 0, true, "/");
+      verifyAuditLogs();
       assertEquals(aTime, fs.getFileStatus(file).getAccessTime());
       assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
       System.out.println(outStr);
@@ -117,6 +136,33 @@ public class TestFsck extends TestCase {
     }
   }
 
+  /** Sets up log4j logger for auditlogs */
+  private void setupAuditLogs() throws IOException {
+    File file = new File(auditLogFile);
+    if (file.exists()) {
+      file.delete();
+    }
+    Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
+    logger.setLevel(Level.INFO);
+    PatternLayout layout = new PatternLayout("%m%n");
+    RollingFileAppender appender = new RollingFileAppender(layout, auditLogFile);
+    logger.addAppender(appender);
+  }
+  
+  private void verifyAuditLogs() throws IOException {
+    // Turn off the logs
+    Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
+    logger.setLevel(Level.OFF);
+    
+    // Ensure audit log has only one for FSCK
+    BufferedReader reader = new BufferedReader(new FileReader(auditLogFile));
+    String line = reader.readLine();
+    assertNotNull(line);
+    assertTrue("Expected fsck event not found in audit log",
+        fsckPattern.matcher(line).matches());
+    assertNull("Unexpected event in audit log", reader.readLine());
+  }
+  
   public void testFsckNonExistent() throws Exception {
     DFSTestUtil util = new DFSTestUtil("TestFsck", 20, 3, 8*1024);
     MiniDFSCluster cluster = null;



Mime
View raw message