hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r400112 - in /lucene/hadoop/trunk: ./ bin/ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/ src/test/org/apache/hadoop/dfs/
Date Fri, 05 May 2006 17:04:24 GMT
Author: cutting
Date: Fri May  5 10:04:21 2006
New Revision: 400112

URL: http://svn.apache.org/viewcvs?rev=400112&view=rev
Log:
HADOOP-96.  Logging improvements.  Contributed by Hairong Kuang.

Added:
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/bin/hadoop
    lucene/hadoop/trunk/bin/hadoop-daemon.sh
    lucene/hadoop/trunk/bin/hadoop-daemons.sh
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri May  5 10:04:21 2006
@@ -188,6 +188,12 @@
     status of reduce tasks or completed jobs.  Also fixes the progress
     meter so that failed tasks are subtracted. (omalley via cutting)
 
+49. HADOOP-96.  Logging improvements.  Log files are now separate from
+    standard output and standard error files.  Logs are now rolled.
+    Logging of all DFS state changes can be enabled, to facilitate
+    debugging.  (Hairong Kuang via cutting)
+
+
 Release 0.1.1 - 2006-04-08
 
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)

Modified: lucene/hadoop/trunk/bin/hadoop
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop (original)
+++ lucene/hadoop/trunk/bin/hadoop Fri May  5 10:04:21 2006
@@ -51,7 +51,7 @@
 
 # some directories
 THIS_DIR=`dirname "$THIS"`
-HADOOP_HOME=`cd "$THIS_DIR/.." ; pwd`
+export HADOOP_HOME=`cd "$THIS_DIR/.." ; pwd`
 
 # Allow alternate conf dir location.
 HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_HOME/conf}"
@@ -143,6 +143,8 @@
 # cygwin path translation
 if expr `uname` : 'CYGWIN*' > /dev/null; then
   CLASSPATH=`cygpath -p -w "$CLASSPATH"`
+  HADOOP_HOME=`cygpath -p -w "$HADOOP_HOME"`
+  HADOOP_LOG_DIR=`cygpath -p -w "$HADOOP_LOG_DIR"`
 fi
 
 # run it

Modified: lucene/hadoop/trunk/bin/hadoop-daemon.sh
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop-daemon.sh?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop-daemon.sh (original)
+++ lucene/hadoop/trunk/bin/hadoop-daemon.sh Fri May  5 10:04:21 2006
@@ -38,7 +38,7 @@
 done
 
 # the root of the Hadoop installation
-HADOOP_HOME=`dirname "$this"`/..
+export HADOOP_HOME=`dirname "$this"`/..
 
 # Allow alternate conf dir location.
 HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-$HADOOP_HOME/conf}"
@@ -49,9 +49,9 @@
 
 # get log directory
 if [ "$HADOOP_LOG_DIR" = "" ]; then
-  HADOOP_LOG_DIR="$HADOOP_HOME/logs"
-  mkdir -p "$HADOOP_LOG_DIR"
+  export HADOOP_LOG_DIR="$HADOOP_HOME/logs"
 fi
+mkdir -p "$HADOOP_LOG_DIR"
 
 if [ "$HADOOP_PID_DIR" = "" ]; then
   HADOOP_PID_DIR=/tmp
@@ -62,7 +62,7 @@
 fi
 
 # some variables
-log=$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$command-`hostname`.log
+log=$HADOOP_LOG_DIR/hadoop-$HADOOP_IDENT_STRING-$command-`hostname`.out
 pid=$HADOOP_PID_DIR/hadoop-$HADOOP_IDENT_STRING-$command.pid
 
 case $startStop in
@@ -81,9 +81,8 @@
       rsync -a -e ssh --delete --exclude=.svn $HADOOP_MASTER/ "$HADOOP_HOME"
     fi
 
-    cd "$HADOOP_HOME"
     echo starting $command, logging to $log
-    nohup bin/hadoop $command "$@" >& "$log" < /dev/null &
+    nohup $HADOOP_HOME/bin/hadoop $command "$@" >& "$log" < /dev/null &
     echo $! > $pid
     sleep 1; head "$log"
     ;;

Modified: lucene/hadoop/trunk/bin/hadoop-daemons.sh
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/bin/hadoop-daemons.sh?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/bin/hadoop-daemons.sh (original)
+++ lucene/hadoop/trunk/bin/hadoop-daemons.sh Fri May  5 10:04:21 2006
@@ -13,4 +13,6 @@
 bin=`dirname "$0"`
 bin=`cd "$bin"; pwd`
 
-exec "$bin/slaves.sh" "$bin/hadoop-daemon.sh" "$@"
+HADOOP_HOME="$bin/.."
+
+exec "$bin/slaves.sh" cd "$HADOOP_HOME" \; "$bin/hadoop-daemon.sh" "$@"

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Fri May  5 10:04:21 2006
@@ -7,6 +7,28 @@
 
 <configuration>
 
+<!--- logging properties -->
+
+<property>
+  <name>hadoop.logfile.size</name>
+  <value>10000000</value>
+  <description>The max size of each log file</description>
+</property>
+
+<property>
+  <name>hadoop.logfile.count</name>
+  <value>10</value>
+  <description>The max number of log files</description>
+</property>
+
+<property>
+  <name>dfs.namenode.logging.level</name>
+  <value>info</value>
+  <description>The logging level for dfs namenode. Other values are "dir"(trac
+e namespace mutations), "block"(trace block under/over replications and block
+creations/deletions), or "all".</description>
+</property>
+
 <!-- i/o properties -->
 
 <property>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri May  5 10:04:21 2006
@@ -824,7 +824,9 @@
     /**
      */
     public static void main(String args[]) throws IOException {
+        Configuration conf = new Configuration();
         LogFormatter.setShowThreadIDs(true);
-        runAndWait(new Configuration());
+        LogFormatter.initFileHandler(conf, "datanode");
+        runAndWait(conf);
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri May  5 10:04:21 2006
@@ -622,8 +622,12 @@
         String pathString = path.toString();
         mkdirs(new Path(pathString).getParent().toString());
         INode newNode = new INode( new File(pathString).getName(), blocks, replication);
-        if( ! unprotectedAddFile(path, newNode) )
-          return false;
+        if( ! unprotectedAddFile(path, newNode) ) {
+           NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
+                    +"failed to add "+path+" with "
+                    +blocks.length+" blocks to the file system" );
+           return false;
+        }
         // add create file record to log
         UTF8 nameReplicationPair[] = new UTF8[] { 
                               path, 
@@ -631,6 +635,8 @@
         logEdit(OP_ADD,
                 new ArrayWritable( UTF8.class, nameReplicationPair ), 
                 new ArrayWritable( Block.class, newNode.blocks ));
+        NameNode.stateChangeLog.fine("DIR* FSDirectory.addFile: "
+                +path+" with "+blocks.length+" blocks is added to the file system" );
         return true;
     }
     
@@ -658,6 +664,8 @@
      * Change the filename
      */
     public boolean renameTo(UTF8 src, UTF8 dst) {
+        NameNode.stateChangeLog.fine("DIR* FSDirectory.renameTo: "
+                +src+" to "+dst );
         waitForReady();
         if (unprotectedRenameTo(src, dst)) {
             logEdit(OP_RENAME, src, dst);
@@ -675,6 +683,8 @@
           String dstStr = dst.toString();
             INode renamedNode = rootDir.getNode(srcStr);
             if (renamedNode == null) {
+                NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedRenameTo: "
+                        +"failed to rename "+src+" to "+dst+ " because "+ src+" does not exist" );
                 return false;
             }
             renamedNode.removeNode();
@@ -683,9 +693,13 @@
             }
             // the renamed node can be reused now
             if( rootDir.addNode(dstStr, renamedNode ) == null ) {
+                NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedRenameTo: "
+                        +"failed to rename "+src+" to "+dst );
               rootDir.addNode(srcStr, renamedNode); // put it back
               return false;
             }
+            NameNode.stateChangeLog.fine("DIR* FSDirectory.unprotectedRenameTo: "
+                     +src+" is renamed to "+dst );
             return true;
         }
     }
@@ -738,6 +752,8 @@
      * Remove the file from management, return blocks
      */
     public Block[] delete(UTF8 src) {
+        NameNode.stateChangeLog.fine("DIR* FSDirectory.delete: "
+                +src );
         waitForReady();
         Block[] blocks = unprotectedDelete(src); 
         if( blocks != null )
@@ -751,6 +767,8 @@
         synchronized (rootDir) {
             INode targetNode = rootDir.getNode(src.toString());
             if (targetNode == null) {
+                NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedDelete: "
+                        +"failed to remove "+src+" because it does not exist" );
                 return null;
             } else {
                 //
@@ -758,8 +776,12 @@
                 // the blocks underneath the node.
                 //
                 if (! targetNode.removeNode()) {
+                    NameNode.stateChangeLog.warning("DIR* FSDirectory.unprotectedDelete: "
+                            +"failed to remove "+src+" because it does not have a parent" );
                     return null;
                 } else {
+                    NameNode.stateChangeLog.fine("DIR* FSDirectory.unprotectedDelete: "
+                            +src+" is removed" );
                     Vector v = new Vector();
                     targetNode.collectSubtreeBlocks(v);
                     for (Iterator it = v.iterator(); it.hasNext(); ) {
@@ -905,12 +927,17 @@
             String cur = (String) v.elementAt(i);
             INode inserted = unprotectedMkdir(cur);
             if (inserted != null) {
+                NameNode.stateChangeLog.fine("DIR* FSDirectory.mkdirs: "
+                        +"created directory "+cur );
                 logEdit(OP_MKDIR, new UTF8(inserted.computeName()), null);
                 lastSuccess = true;
             } else {
                 lastSuccess = false;
             }
         }
+        if( !lastSuccess )
+            NameNode.stateChangeLog.warning("DIR* FSDirectory.mkdirs: "
+                    +"failed to create directory "+src );
         return lastSuccess;
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Fri May  5 10:04:21 2006
@@ -284,7 +284,7 @@
                                     short replication, 
                                     UTF8 clientName 
                                   ) throws IOException {
-      String text = "File " + src 
+      String text = "file " + src 
               + ((clientName != null) ? " on client " + clientName : "")
               + ".\n"
               + "Requested replication " + replication;
@@ -314,31 +314,38 @@
                                             boolean overwrite,
                                             short replication 
                                           ) throws IOException {
+      NameNode.stateChangeLog.fine("DIR* NameSystem.startFile: file "
+            +src+" for "+holder+" at "+clientMachine);
       try {
         if (pendingCreates.get(src) != null) {
-          String msg = "Cannot create file " + src + " for " + holder +
-                       " on " + clientMachine + 
-                       " because pendingCreates is non-null.";
-          throw new NameNode.AlreadyBeingCreatedException(msg);
+           throw new NameNode.AlreadyBeingCreatedException(
+                   "failed to create file " + src + " for " + holder +
+                   " on client " + clientMachine + 
+                   " because pendingCreates is non-null.");
         }
 
-        verifyReplication(src.toString(), replication, clientMachine );
-        
+        try {
+           verifyReplication(src.toString(), replication, clientMachine );
+        } catch( IOException e) {
+            throw new IOException( "failed to create "+e.getMessage());
+        }
         if (!dir.isValidToCreate(src)) {
           if (overwrite) {
             delete(src);
           } else {
-            throw new IOException("Can't create file " + src + 
-                                  ", because the filename is invalid.");
+            throw new IOException("failed to create file " + src 
+                    +" on client " + clientMachine
+                    +" either because the filename is invalid or the file exists");
           }
         }
 
         // Get the array of replication targets 
         DatanodeInfo targets[] = chooseTargets(replication, null, clientMachine);
         if (targets.length < this.minReplication) {
-            throw new IOException("Target-length is " + targets.length +
-                                  ", below MIN_REPLICATION (" + 
-                                  minReplication+ ")");
+            throw new IOException("failed to create file "+src
+                    +" on client " + clientMachine
+                    +" because target-length is " + targets.length 
+                    +", below MIN_REPLICATION (" + minReplication+ ")");
        }
 
         // Reserve space for this pending file
@@ -346,7 +353,8 @@
                            new FileUnderConstruction(replication, 
                                                      holder,
                                                      clientMachine));
-        LOG.fine("Adding " + src + " to pendingCreates for " + holder);
+        NameNode.stateChangeLog.finer( "DIR* NameSystem.startFile: "
+                   +"add "+src+" to pendingCreates for "+holder );
         synchronized (leases) {
             Lease lease = (Lease) leases.get(holder);
             if (lease == null) {
@@ -367,7 +375,8 @@
         results[1] = targets;
         return results;
       } catch (IOException ie) {
-        LOG.warning(ie.getMessage());
+          NameNode.stateChangeLog.warning("DIR* NameSystem.startFile: "
+                  +ie.getMessage());
         throw ie;
       }
     }
@@ -386,6 +395,8 @@
     public synchronized Object[] getAdditionalBlock(UTF8 src, 
                                                     UTF8 clientName
                                                     ) throws IOException {
+        NameNode.stateChangeLog.fine("BLOCK* NameSystem.getAdditionalBlock: file "
+            +src+" for "+clientName);
         FileUnderConstruction pendingFile = 
           (FileUnderConstruction) pendingCreates.get(src);
         // make sure that we still have the lease on this file
@@ -428,6 +439,8 @@
         //
         // Remove the block from the pending creates list
         //
+        NameNode.stateChangeLog.fine("BLOCK* NameSystem.abandonBlock: "
+                +b.getBlockName()+"of file "+src );
         FileUnderConstruction pendingFile = 
           (FileUnderConstruction) pendingCreates.get(src);
         if (pendingFile != null) {
@@ -437,6 +450,10 @@
                 if (cur.compareTo(b) == 0) {
                     pendingCreateBlocks.remove(cur);
                     it.remove();
+                    NameNode.stateChangeLog.finer(
+                             "BLOCK* NameSystem.abandonBlock: "
+                            +b.getBlockName()
+                            +" is removed from pendingCreateBlock and pendingCreates");
                     return true;
                 }
             }
@@ -450,7 +467,7 @@
     public synchronized void abandonFileInProgress(UTF8 src, 
                                                    UTF8 holder
                                                    ) throws IOException {
-      LOG.info("abandoning file in progress on " + src.toString());
+      NameNode.stateChangeLog.fine("DIR* NameSystem.abandonFileInProgress:" + src );
       synchronized (leases) {
         // find the lease
         Lease lease = (Lease) leases.get(holder);
@@ -478,10 +495,12 @@
      * been reported by datanodes and are replicated correctly.
      */
     public synchronized int completeFile(UTF8 src, UTF8 holder) {
+        NameNode.stateChangeLog.fine("DIR* NameSystem.completeFile: " + src + " for " + holder );
         if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
-            LOG.info( "Failed to complete " + src + 
-                      "  because dir.getFile()==" + dir.getFile(src) + 
-                      " and " + pendingCreates.get(src));
+            NameNode.stateChangeLog.warning( "DIR* NameSystem.completeFile: "
+                    + "failed to complete " + src
+                    + " because dir.getFile()==" + dir.getFile(src) 
+                    + " and " + pendingCreates.get(src));
             return OPERATION_FAILED;
         } else if (! checkFileProgress(src)) {
             return STILL_WAITING;
@@ -519,14 +538,14 @@
         // Now we can add the (name,blocks) tuple to the filesystem
         //
         if ( ! dir.addFile(src, pendingBlocks, pendingFile.getReplication())) {
-          System.out.println("AddFile() for " + src + " failed");
           return OPERATION_FAILED;
         }
 
         // The file is no longer pending
         pendingCreates.remove(src);
-        LOG.fine("Removing " + src + " from pendingCreates for " + holder +
-                 ". (complete)");
+        NameNode.stateChangeLog.finer(
+             "DIR* NameSystem.completeFile: " + src
+           + " is removed from pendingCreates");
         for (int i = 0; i < nrBlocks; i++) {
             pendingCreateBlocks.remove(pendingBlocks[i]);
         }
@@ -554,13 +573,11 @@
         for (int i = 0; i < nrBlocks; i++) {
             TreeSet containingNodes = (TreeSet) blocksMap.get(pendingBlocks[i]);
             if (containingNodes.size() < pendingFile.getReplication()) {
+                   NameNode.stateChangeLog.finer(
+                          "DIR* NameSystem.completeFile:"
+                        + pendingBlocks[i].getBlockName()+" has only "+containingNodes.size()
+                        +" replicas so is added to neededReplications");           
                 synchronized (neededReplications) {
-                    LOG.info("Completed file " + src 
-                              + ", at holder " + holder 
-                              + ".  There is/are only " + containingNodes.size() 
-                              + " copies of block " + pendingBlocks[i] 
-                              + ", so replicating up to " 
-                              + pendingFile.getReplication());
                     neededReplications.add(pendingBlocks[i]);
                 }
             }
@@ -577,6 +594,9 @@
           (FileUnderConstruction) pendingCreates.get(src);
         v.getBlocks().add(b);
         pendingCreateBlocks.add(b);
+        NameNode.stateChangeLog.finer("BLOCK* NameSystem.allocateBlock: "
+            +src+ ". "+b.getBlockName()+
+            " is created and added to pendingCreates and pendingCreateBlocks" );      
         return b;
     }
 
@@ -613,6 +633,7 @@
      * Change the indicated filename.
      */
     public boolean renameTo(UTF8 src, UTF8 dst) {
+        NameNode.stateChangeLog.fine("DIR* NameSystem.renameTo: " + src + " to " + dst );
         return dir.renameTo(src, dst);
     }
 
@@ -621,6 +642,7 @@
      * invalidate some blocks that make up the file.
      */
     public synchronized boolean delete(UTF8 src) {
+        NameNode.stateChangeLog.fine("DIR* NameSystem.delete: " + src );
         Block deletedBlocks[] = (Block[]) dir.delete(src);
         if (deletedBlocks != null) {
             for (int i = 0; i < deletedBlocks.length; i++) {
@@ -636,6 +658,8 @@
                             recentInvalidateSets.put(node.getName(), invalidateSet);
                         }
                         invalidateSet.add(b);
+                        NameNode.stateChangeLog.finer("BLOCK* NameSystem.delete: "
+                            + b.getBlockName() + " is added to invalidSet of " + node.getName() );
                     }
                 }
             }
@@ -666,6 +690,7 @@
      * Create all the necessary directories
      */
     public boolean mkdirs(UTF8 src) {
+        NameNode.stateChangeLog.fine("DIR* NameSystem.mkdirs: " + src );
         return dir.mkdirs(src);
     }
 
@@ -897,15 +922,18 @@
       FileUnderConstruction v = 
         (FileUnderConstruction) pendingCreates.remove(src);
       if (v != null) {
-        LOG.info("Removing " + src + " from pendingCreates for " + 
-            holder + " (failure)");
+         NameNode.stateChangeLog.finer(
+                      "DIR* NameSystem.internalReleaseCreate: " + src
+                    + " is removed from pendingCreates for "
+                    + holder + " (failure)");
         for (Iterator it2 = v.getBlocks().iterator(); it2.hasNext(); ) {
           Block b = (Block) it2.next();
           pendingCreateBlocks.remove(b);
         }
       } else {
-        LOG.info("Attempt to release a create lock on " + src.toString()
-                 + " that was not in pendingCreates");
+          NameNode.stateChangeLog.warning("DIR* NameSystem.internalReleaseCreate: "
+                 + "attempt to release a create lock on "+ src.toString()
+                 + " that was not in pedingCreates");
       }
     }
 
@@ -949,8 +977,9 @@
                 DatanodeInfo nodeinfo = (DatanodeInfo) datanodeMap.get(name);
 
                 if (nodeinfo == null) {
-                    LOG.info("Got brand-new heartbeat from " + name);
-                    nodeinfo = new DatanodeInfo(name, capacity, remaining);
+                    NameNode.stateChangeLog.fine("BLOCK* NameSystem.gotHeartbeat: "
+                            +"brand-new heartbeat from "+name );
+                     nodeinfo = new DatanodeInfo(name, capacity, remaining);
                     datanodeMap.put(name, nodeinfo);
                     capacityDiff = capacity;
                     remainingDiff = remaining;
@@ -995,11 +1024,13 @@
             while ((heartbeats.size() > 0) &&
                    ((nodeInfo = (DatanodeInfo) heartbeats.first()) != null) &&
                    (nodeInfo.lastUpdate() < System.currentTimeMillis() - EXPIRE_INTERVAL)) {
-                LOG.info("Lost heartbeat for " + nodeInfo.getName());
-
                 heartbeats.remove(nodeInfo);
+                NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
+                           + "lost heartbeat from " + nodeInfo.getName());
                 synchronized (datanodeMap) {
                     datanodeMap.remove(nodeInfo.getName());
+                    NameNode.stateChangeLog.finer("BLOCK* NameSystem.heartbeatCheck: "
+                            + nodeInfo.getName() + " is removed from datanodeMap");
                 }
                 totalCapacity -= nodeInfo.getCapacity();
                 totalRemaining -= nodeInfo.getRemaining();
@@ -1023,8 +1054,12 @@
      * update the (machine-->blocklist) and (block-->machinelist) tables.
      */
     public synchronized Block[] processReport(Block newReport[], UTF8 name) {
+        NameNode.stateChangeLog.fine("BLOCK* NameSystem.processReport: "
+                +"from "+name+" "+newReport.length+" blocks" );
         DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
         if (node == null) {
+            NameNode.stateChangeLog.severe("BLOCK* NameSystem.processReport: "
+                    +"from "+name+" but can not find its info" );
             throw new IllegalArgumentException("Unexpected exception.  Received block report from node " + name + ", but there is no info for " + name);
         }
 
@@ -1084,8 +1119,9 @@
             Block b = (Block) it.next();
 
             if (! dir.isValidBlock(b) && ! pendingCreateBlocks.contains(b)) {
-                LOG.info("Obsoleting block " + b);
                 obsolete.add(b);
+                NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+                        +"ask "+name+" to delete "+b.getBlockName() );
             }
         }
         return (Block[]) obsolete.toArray(new Block[obsolete.size()]);
@@ -1103,8 +1139,19 @@
         }
         if (! containingNodes.contains(node)) {
             containingNodes.add(node);
+            // 
+            // Hairong: I would prefer to set the level of next logrecord
+            // to be finer.
+            // But at startup time, because too many new blocks come in
+            // they simply take up all the space in the log file 
+            // So I set the level to be finest
+            //
+            NameNode.stateChangeLog.finest("BLOCK* NameSystem.addStoredBlock: "
+                    +"blockMap updated: "+node.getName()+" is added to "+block.getBlockName() );
         } else {
-            LOG.info("Redundant addStoredBlock request received for block " + block + " on node " + node);
+            NameNode.stateChangeLog.warning("BLOCK* NameSystem.addStoredBlock: "
+                    + "Redundant addStoredBlock request received for " 
+                    + block.getBlockName() + " on " + node.getName());
         }
 
         synchronized (neededReplications) {
@@ -1115,8 +1162,15 @@
             if (containingNodes.size() >= fileReplication ) {
                 neededReplications.remove(block);
                 pendingReplications.remove(block);
-            } else // containingNodes.size() < fileReplication
+                NameNode.stateChangeLog.finest("BLOCK* NameSystem.addStoredBlock: "
+                        +block.getBlockName()+" has "+containingNodes.size()
+                        +" replicas so is removed from neededReplications and pendingReplications" );
+            } else {// containingNodes.size() < fileReplication
                 neededReplications.add(block);
+                NameNode.stateChangeLog.finer("BLOCK* NameSystem.addStoredBlock: "
+                    +block.getBlockName()+" has only "+containingNodes.size()
+                    +" replicas so is added to neededReplications" );
+            }
 
             proccessOverReplicatedBlock( block, fileReplication );
         }
@@ -1161,6 +1215,8 @@
                 excessReplicateMap.put(cur.getName(), excessBlocks);
             }
             excessBlocks.add(b);
+            NameNode.stateChangeLog.finer("BLOCK* NameSystem.chooseExcessReplicates: "
+                    +"("+cur.getName()+", "+b.getBlockName()+") is added to excessReplicateMap" );
 
             //
             // The 'excessblocks' tracks blocks until we get confirmation
@@ -1177,6 +1233,8 @@
                 recentInvalidateSets.put(cur.getName(), invalidateSet);
             }
             invalidateSet.add(b);
+            NameNode.stateChangeLog.finer("BLOCK* NameSystem.chooseExcessReplicates: "
+                    +"("+cur.getName()+", "+b.getBlockName()+") is added to recentInvalidateSets" );
         }
     }
 
@@ -1185,12 +1243,13 @@
      * replication tasks, if the removed block is still valid.
      */
     synchronized void removeStoredBlock(Block block, DatanodeInfo node) {
+        NameNode.stateChangeLog.fine("BLOCK* NameSystem.removeStoredBlock: "
+                +block.getBlockName() + " from "+node.getName() );
         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
         if (containingNodes == null || ! containingNodes.contains(node)) {
             throw new IllegalArgumentException("No machine mapping found for block " + block + ", which should be at node " + node);
         }
         containingNodes.remove(node);
-
         //
         // It's possible that the block was removed because of a datanode
         // failure.  If the block is still valid, check if replication is
@@ -1202,6 +1261,9 @@
             synchronized (neededReplications) {
                 neededReplications.add(block);
             }
+            NameNode.stateChangeLog.finer("BLOCK* NameSystem.removeStoredBlock: "
+                    +block.getBlockName()+" has only "+containingNodes.size()
+                    +" replicas so is added to neededReplications" );
         }
 
         //
@@ -1211,6 +1273,8 @@
         TreeSet excessBlocks = (TreeSet) excessReplicateMap.get(node.getName());
         if (excessBlocks != null) {
             excessBlocks.remove(block);
+            NameNode.stateChangeLog.finer("BLOCK* NameSystem.removeStoredBlock: "
+                    +block.getBlockName()+" is removed from excessBlocks" );
             if (excessBlocks.size() == 0) {
                 excessReplicateMap.remove(node.getName());
             }
@@ -1223,8 +1287,12 @@
     public synchronized void blockReceived(Block block, UTF8 name) {
         DatanodeInfo node = (DatanodeInfo) datanodeMap.get(name);
         if (node == null) {
+            NameNode.stateChangeLog.warning("BLOCK* NameSystem.blockReceived: "
+                    +block.getBlockName()+" is received from an unrecorded node " + name );
             throw new IllegalArgumentException("Unexpected exception.  Got blockReceived message from node " + name + ", but there is no info for " + name);
         }
+        NameNode.stateChangeLog.fine("BLOCK* NameSystem.blockReceived: "
+                +block.getBlockName()+" is received from " + name );
         //
         // Modify the blocks->datanode map
         // 
@@ -1279,11 +1347,20 @@
      */
     public synchronized Block[] blocksToInvalidate(UTF8 sender) {
         Vector invalidateSet = (Vector) recentInvalidateSets.remove(sender);
-        if (invalidateSet != null) {
-            return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
-        } else {
+ 
+        if (invalidateSet == null ) 
             return null;
+        
+        if(NameNode.stateChangeLog.isLoggable(Level.INFO)) {
+            StringBuffer blockList = new StringBuffer();
+            for( int i=0; i<invalidateSet.size(); i++ ) {
+                blockList.append(' ');
+                blockList.append(((Block)invalidateSet.elementAt(i)).getBlockName());
+            }
+            NameNode.stateChangeLog.info("BLOCK* NameSystem.blockToInvalidate: "
+                   +"ask "+sender+" to delete " + blockList );
         }
+        return (Block[]) invalidateSet.toArray(new Block[invalidateSet.size()]);
     }
 
     /**
@@ -1299,7 +1376,7 @@
     public synchronized Object[] pendingTransfers(DatanodeInfo srcNode, int xmitsInProgress) {
         synchronized (neededReplications) {
             Object results[] = null;
-	    int scheduledXfers = 0;
+            int scheduledXfers = 0;
 
             if (neededReplications.size() > 0) {
                 //
@@ -1334,7 +1411,7 @@
                                 // Build items to return
                                 replicateBlocks.add(block);
                                 replicateTargetSets.add(targets);
-				scheduledXfers += targets.length;
+                                scheduledXfers += targets.length;
                             }
                         }
                     }
@@ -1356,14 +1433,22 @@
                         if (containingNodes.size() + targets.length >= dir.getFileByBlock(block).getReplication()) {
                             neededReplications.remove(block);
                             pendingReplications.add(block);
+                            NameNode.stateChangeLog.finer("BLOCK* NameSystem.pendingTransfer: "
+                                    +block.getBlockName()
+                                    +" is removed from neededReplications to pendingReplications" );
                         }
 
-                        LOG.info("Pending transfer (block " 
-                            + block.getBlockName() 
-                            + ") from " + srcNode.getName() 
-                            + " to " + targets[0].getName() 
-                            + (targets.length > 1 ? " and " + (targets.length-1) 
-                                + " more destination(s)" : "" ));
+                        if(NameNode.stateChangeLog.isLoggable(Level.INFO)) {
+                            StringBuffer targetList = new StringBuffer( "datanode(s)");
+                            for(int k=0; k<targets.length; k++) {
+                               targetList.append(' ');
+                               targetList.append(targets[k].getName());
+                            }
+                            NameNode.stateChangeLog.info("BLOCK* NameSystem.pendingTransfer: "
+                                    +"ask "+srcNode.getName()
+                                    +" to replicate "+block.getBlockName()
+                                    +" to "+targetList);
+                        }
                     }
 
                     //

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri May  5 10:04:21 2006
@@ -57,6 +57,7 @@
  **********************************************************/
 public class NameNode implements ClientProtocol, DatanodeProtocol, FSConstants {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.NameNode");
+    public static final Logger stateChangeLog = LogFormatter.getLogger( "org.apache.hadoop.dfs.StateChange");
 
     private FSNamesystem namesystem;
     private Server server;
@@ -182,7 +183,9 @@
                                boolean overwrite,
                                short replication
     ) throws IOException {
-        Object results[] = namesystem.startFile(new UTF8(src), 
+       stateChangeLog.fine("*DIR* NameNode.create: file "
+            +src+" for "+clientName+" at "+clientMachine);
+       Object results[] = namesystem.startFile(new UTF8(src), 
                                                 new UTF8(clientName), 
                                                 new UTF8(clientMachine), 
                                                 overwrite,
@@ -202,6 +205,8 @@
      */
     public LocatedBlock addBlock(String src, 
                                  String clientName) throws IOException {
+        stateChangeLog.fine("*BLOCK* NameNode.addBlock: file "
+            +src+" for "+clientName);
         UTF8 src8 = new UTF8(src);
         UTF8 client8 = new UTF8(clientName);
         Object[] results = namesystem.getAdditionalBlock(src8, client8);
@@ -216,8 +221,12 @@
      * to prevent weird heartbeat race conditions.
      */
     public void reportWrittenBlock(LocatedBlock lb) throws IOException {
-        Block b = lb.getBlock();
+        Block b = lb.getBlock();        
         DatanodeInfo targets[] = lb.getLocations();
+        stateChangeLog.fine("*BLOCK* NameNode.reportWrittenBlock"
+                +": " + b.getBlockName() +" is written to "
+                +targets.length + " locations" );
+
         for (int i = 0; i < targets.length; i++) {
             namesystem.blockReceived(b, targets[i].getName());
         }
@@ -227,6 +236,8 @@
      * The client needs to give up on the block.
      */
     public void abandonBlock(Block b, String src) throws IOException {
+        stateChangeLog.fine("*BLOCK* NameNode.abandonBlock: "
+                +b.getBlockName()+" of file "+src );
         if (! namesystem.abandonBlock(b, new UTF8(src))) {
             throw new IOException("Cannot abandon block during write to " + src);
         }
@@ -235,11 +246,13 @@
      */
     public void abandonFileInProgress(String src, 
                                       String holder) throws IOException {
+        stateChangeLog.fine("*DIR* NameNode.abandonFileInProgress:" + src );
         namesystem.abandonFileInProgress(new UTF8(src), new UTF8(holder));
     }
     /**
      */
     public boolean complete(String src, String clientName) throws IOException {
+        stateChangeLog.fine("*DIR* NameNode.complete: " + src + " for " + clientName );
         int returnCode = namesystem.completeFile(new UTF8(src), new UTF8(clientName));
         if (returnCode == STILL_WAITING) {
             return false;
@@ -269,12 +282,14 @@
     /**
      */
     public boolean rename(String src, String dst) throws IOException {
+        stateChangeLog.fine("*DIR* NameNode.rename: " + src + " to " + dst );
         return namesystem.renameTo(new UTF8(src), new UTF8(dst));
     }
 
     /**
      */
     public boolean delete(String src) throws IOException {
+        stateChangeLog.fine("*DIR* NameNode.delete: " + src );
         return namesystem.delete(new UTF8(src));
     }
 
@@ -293,6 +308,7 @@
     /**
      */
     public boolean mkdirs(String src) throws IOException {
+        stateChangeLog.fine("*DIR* NameNode.mkdirs: " + src );
         return namesystem.mkdirs(new UTF8(src));
     }
 
@@ -404,6 +420,8 @@
     }
 
     public Block[] blockReport(String sender, Block blocks[]) {
+        stateChangeLog.fine("*BLOCK* NameNode.blockReport: "
+                +"from "+sender+" "+blocks.length+" blocks" );
         if( firstBlockReportTime==0)
               firstBlockReportTime=System.currentTimeMillis();
 
@@ -411,6 +429,8 @@
      }
 
     public void blockReceived(String sender, Block blocks[]) {
+        stateChangeLog.fine("*BLOCK* NameNode.blockReceived: "
+                +"from "+sender+" "+blocks.length+" blocks." );
         for (int i = 0; i < blocks.length; i++) {
             namesystem.blockReceived(blocks[i], new UTF8(sender));
         }
@@ -441,6 +461,19 @@
           System.err.println("Formatted "+dir);
           System.exit(0);
         }
+            
+        LogFormatter.initFileHandler( conf, "namenode" );
+        LogFormatter.setShowThreadIDs(true);
+        String confLevel = conf.get("dfs.namenode.logging.level", "info");
+        Level level;
+        if( confLevel.equals( "dir"))
+                level=Level.FINE;
+        else if( confLevel.equals( "block"))
+                level=Level.FINER;
+        else if( confLevel.equals( "all"))
+                level=Level.FINEST;
+        else level=Level.INFO;
+        stateChangeLog.setLevel( level);
 
         NameNode namenode = new NameNode(conf);
         namenode.join();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri May  5 10:04:21 2006
@@ -1015,6 +1015,8 @@
           System.exit(-1);
         }
 
-        startTracker(new Configuration());
+        Configuration conf=new Configuration();
+        LogFormatter.initFileHandler( conf, "jobtracker" );
+        startTracker(conf);
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri May  5 10:04:21 2006
@@ -839,7 +839,9 @@
             System.exit(-1);
         }
 
-        TaskTracker tt = new TaskTracker(new JobConf());
+        JobConf conf=new JobConf();
+        LogFormatter.initFileHandler( conf, "tasktracker" );
+        TaskTracker tt = new TaskTracker(conf);
         tt.run();
     }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java?rev=400112&r1=400111&r2=400112&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/LogFormatter.java Fri May  5 10:04:21 2006
@@ -18,9 +18,12 @@
 
 import java.util.logging.*;
 import java.io.*;
+import java.net.InetAddress;
 import java.text.*;
 import java.util.Date;
 
+import org.apache.hadoop.conf.Configuration;
+
 /** Prints just the date and the log message. */
 
 public class LogFormatter extends Formatter {
@@ -34,7 +37,7 @@
 
   private static boolean showTime = true;
   private static boolean showThreadIDs = false;
-
+  
   // install when this class is loaded
   static {
     Handler[] handlers = LogFormatter.getLogger("").getHandlers();
@@ -44,6 +47,63 @@
     }
   }
 
+  public static String initFileHandler( Configuration conf, String opName )
+      throws IOException {
+          String logDir=System.getenv("HADOOP_LOG_DIR");
+          String userHome=System.getProperty("user.dir");
+          if( logDir==null ) {
+        	  logDir=System.getenv("HADOOP_HOME");
+        	  if(logDir==null) {
+        		  logDir=userHome;
+        	  } else {
+                  logDir+=File.separator+"logs";   
+              }
+          }
+          
+          if(!logDir.equals(userHome)) {
+              File logDirFile = new File( logDir );
+              if(!logDirFile.exists()) {
+                  if(!logDirFile.mkdirs()) {
+                      logDir=userHome;
+                  }
+              } else if( !logDirFile.isDirectory()) {
+                  logDir=userHome;
+              }
+          }
+          
+          String hostname;
+          try {
+          	hostname=InetAddress.getLocalHost().getHostName();
+          	int index=hostname.indexOf('.');
+          	if( index != -1 ) {
+          		hostname=hostname.substring(0, index);
+          	}
+          } catch (java.net.UnknownHostException e) {
+          	hostname="localhost";
+          }
+          
+          String logFile = logDir+File.separator+"hadoop-"+System.getProperty( "user.name" )
+               +"-"+opName+"-"+hostname+".log";
+
+          int logFileSize = conf.getInt( "hadoop.logfile.size", 10000000 );
+          int logFileCount = conf.getInt( "hadoop.logfile.count", 10 );
+          
+          FileHandler fh=new FileHandler(logFile, logFileSize, logFileCount, false);
+          fh.setFormatter(new LogFormatter());
+          fh.setLevel(Level.FINEST);
+          
+          Logger rootLogger = LogFormatter.getLogger("");
+          rootLogger.info( "directing logs to directory "+logDir );
+          
+          Handler[] handlers = rootLogger.getHandlers();
+          for( int i=0; i<handlers.length; i++ ) {
+          	rootLogger.removeHandler( handlers[i]);
+          }
+          rootLogger.addHandler(fh);
+          
+          return logFile;
+  }
+      
   /** Gets a logger and, as a side effect, installs this as the default
    * formatter. */
   public static Logger getLogger(String name) {
@@ -77,8 +137,11 @@
     
     // the thread id
     if (showThreadIDs) {
-      buffer.append(" ");
-      buffer.append(record.getThreadID());
+      buffer.append(" 0x");
+      String threadID = Integer.toHexString(record.getThreadID());
+      for (int i = 0; i < 8 - threadID.length(); i++) 
+        buffer.append('0');
+      buffer.append(threadID);
     }
 
     // handle SEVERE specially

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java?rev=400112&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/ClusterTestDFSNamespaceLogging.java Fri May  5 10:04:21 2006
@@ -0,0 +1,467 @@
+/**
+ * Copyright 2005 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.dfs;
+
+import junit.framework.TestCase;
+import junit.framework.AssertionFailedError;
+import org.apache.hadoop.fs.FSOutputStream;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.util.LogFormatter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.NameNode;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Test DFS logging
+ * make sure that any namespace mutations are logged.
+ * @author Hairong Kuang
+ */
+public class ClusterTestDFSNamespaceLogging extends TestCase implements FSConstants {
+  private static final Logger LOG =
+      LogFormatter.getLogger("org.apache.hadoop.dfs.ClusterTestDFS");
+
+  private static Configuration conf = new Configuration();
+
+  /**
+   * all DFS test files go under this base directory
+   */
+  private static String baseDirSpecified=conf.get("test.dfs.data", "/tmp/test-dfs");;
+
+  /**
+   * base dir as File
+   */
+  private static File baseDir=new File(baseDirSpecified);
+  
+  /**
+   * name node port
+   */
+  int nameNodePort = conf.getInt("dfs.namenode.port", 9020);
+  
+  /** DFS client, datanodes, and namenode
+   */
+  DFSClient dfsClient;
+  ArrayList dataNodeDaemons = new ArrayList();
+  NameNode nameNodeDaemon;
+  
+  /** Log header length
+   */
+  private static final int DIR_LOG_HEADER_LEN = 30;
+  private static final int BLOCK_LOG_HEADER_LEN = 32;
+  /** DFS block size
+   */
+  private static final int BLOCK_SIZE = 32*1000*1000;
+  
+  /** Buffer size
+   */
+  private static final int BUFFER_SIZE = 4096;
+
+  private BufferedReader logfh;
+  private String logFile;
+  
+  protected void setUp() throws Exception {
+    super.setUp();
+    conf.setBoolean("test.dfs.same.host.targets.allowed", true);
+  }
+
+ /**
+  * Remove old files from temp area used by this test case and be sure
+  * base temp directory can be created.
+  */
+  protected void prepareTempFileSpace() {
+    if (baseDir.exists()) {
+      try { // start from a blank state
+        FileUtil.fullyDelete(baseDir);
+      } catch (Exception ignored) {
+      }
+    }
+    baseDir.mkdirs();
+    if (!baseDir.isDirectory()) {
+      throw new RuntimeException("Value of root directory property" 
+          + "test.dfs.data for dfs test is not a directory: "
+          + baseDirSpecified);
+    }
+  }
+
+  /**
+   * Pseudo Distributed FS Test.
+   * Test DFS by running all the necessary daemons in one process.
+   *
+   * @throws Exception
+   */
+  public void testFsPseudoDistributed() throws Exception {
+	  // test on a small cluster with 3 data nodes
+	  testFsPseudoDistributed(3);
+  }
+  
+  private void testFsPseudoDistributed( int datanodeNum ) throws Exception {
+    try {
+      prepareTempFileSpace();
+
+      configureDFS();
+      startDFS(datanodeNum);
+
+      if( logfh == null )
+        try {
+          logfh = new BufferedReader( new FileReader( logFile ) );
+        } catch (FileNotFoundException e) {
+          // TODO Auto-generated catch block
+          throw new AssertionFailedError("Log file does not exist: "+logFile);
+        }
+    
+      // create a directory
+      try {
+        dfsClient.mkdirs( new UTF8( "/data") );
+        assertMkdirs( "/data", false );
+      } catch ( IOException ioe ) {
+      	ioe.printStackTrace();
+      }
+       
+      try {
+        dfsClient.mkdirs( new UTF8( "data") );
+        assertMkdirs( "data", true );
+      } catch ( IOException ioe ) {
+       	ioe.printStackTrace();
+      }
+      
+      //
+      // create a file with 1 data block
+      try {
+        createFile("/data/xx", 1);
+        assertCreate( "/data/xx", 1, false );
+      } catch( IOException ioe ) {
+    	assertCreate( "/data/xx", 1, true );
+      }
+    
+      // create a file with 2 data blocks
+      try {
+        createFile("/data/yy",BLOCK_SIZE+1);
+        assertCreate( "/data/yy", BLOCK_SIZE+1, false );
+      } catch( IOException ioe ) {
+    	assertCreate( "/data/yy", BLOCK_SIZE+1, true );
+      }
+
+      // create an existing file
+      try {
+        createFile("/data/xx", 2);
+        assertCreate( "/data/xx", 2, false );
+      } catch( IOException ioe ) {
+      	assertCreate( "/data/xx", 2, true );
+      }
+    
+      // delete the file
+      try {
+        dfsClient.delete( new UTF8("/data/yy") );
+        assertDelete("/data/yy", false);
+      } catch( IOException ioe ) {
+        ioe.printStackTrace();
+      }
+
+    
+      // rename the file
+      try {
+        dfsClient.rename( new UTF8("/data/xx"), new UTF8("/data/yy") );
+        assertRename( "/data/xx", "/data/yy", false );
+      } catch( IOException ioe ) {
+      	ioe.printStackTrace();
+      }
+
+      try {
+        dfsClient.delete(new UTF8("/data/xx"));
+        assertDelete("/data/xx", true);
+      } catch(IOException ioe) {
+    	ioe.printStackTrace();
+      }
+      
+      try {
+        dfsClient.rename( new UTF8("/data/xx"), new UTF8("/data/yy") );    
+        assertRename( "/data/xx", "/data/yy", true );
+      } catch( IOException ioe) {
+    	ioe.printStackTrace();
+      }
+        
+    } catch (AssertionFailedError afe) {
+      afe.printStackTrace();
+      throw afe;
+    } catch (Throwable t) {
+      msg("Unexpected exception_a: " + t);
+      t.printStackTrace();
+    } finally {
+      shutdownDFS();
+
+    }
+  }
+
+  private void createFile( String filename, long fileSize ) throws IOException { 
+    //
+    //           write filesize of data to file
+    //
+    byte[] buffer = new byte[BUFFER_SIZE];
+    UTF8 testFileName = new UTF8(filename); // hardcode filename
+    FSOutputStream nos;
+	nos = dfsClient.create(testFileName, false);
+    try {
+      for (long nBytesWritten = 0L;
+                nBytesWritten < fileSize;
+                nBytesWritten += buffer.length) {
+        if ((nBytesWritten + buffer.length) > fileSize) {
+          int pb = (int) (fileSize - nBytesWritten);
+          byte[] bufferPartial = new byte[pb];
+          for( int i=0; i<pb; i++) {
+            bufferPartial[i]='a';
+          }
+          nos.write(buffer);
+        } else {
+          for( int i=0; i<buffer.length;i++) {
+            buffer[i]='a';
+          }
+          nos.write(buffer);
+        }
+      }
+    } finally {
+      nos.flush();
+      nos.close();
+    }
+  }
+
+  private void assertMkdirs( String fileName, boolean failed ) {
+	  assertHasLogged("NameNode.mkdirs: " +fileName, DIR_LOG_HEADER_LEN+1);
+	  assertHasLogged("NameSystem.mkdirs: "+fileName, DIR_LOG_HEADER_LEN);
+	  if( failed )
+		assertHasLogged("FSDirectory.mkdirs: "
+        			+"failed to create directory "+fileName, DIR_LOG_HEADER_LEN);
+	  else
+	    assertHasLogged( "FSDirectory.mkdirs: created directory "+fileName, DIR_LOG_HEADER_LEN);
+  }
+  
+  private void assertCreate( String fileName, int filesize, boolean failed ) {
+	  assertHasLogged("NameNode.create: file "+fileName, DIR_LOG_HEADER_LEN+1);
+	  assertHasLogged("NameSystem.startFile: file "+fileName, DIR_LOG_HEADER_LEN);
+	  if( failed ) {
+		assertHasLogged("NameSystem.startFile: "
+            		  +"failed to create file " + fileName, DIR_LOG_HEADER_LEN);
+	  } else {
+	    assertHasLogged("NameSystem.allocateBlock: "+fileName, BLOCK_LOG_HEADER_LEN);
+	    int blockNum = (filesize/BLOCK_SIZE*BLOCK_SIZE==filesize)?
+		  filesize/BLOCK_SIZE : 1+filesize/BLOCK_SIZE;
+	    for( int i=1; i<blockNum; i++) {
+		  assertHasLogged("NameNode.addBlock: file "+fileName, BLOCK_LOG_HEADER_LEN+1);
+		  assertHasLogged("NameSystem.getAdditionalBlock: file "+fileName, BLOCK_LOG_HEADER_LEN);
+		  assertHasLogged("NameSystem.allocateBlock: "+fileName, BLOCK_LOG_HEADER_LEN);
+	    }
+	    assertHasLogged("NameNode.complete: "+fileName, DIR_LOG_HEADER_LEN+1);
+	    assertHasLogged("NameSystem.completeFile: "+fileName, DIR_LOG_HEADER_LEN);
+	    assertHasLogged("FSDirectory.addFile: "+fileName+" with "
+			  +blockNum+" blocks is added to the file system", DIR_LOG_HEADER_LEN);
+	    assertHasLogged("NameSystem.completeFile: "+fileName
+			  +" is removed from pendingCreates", DIR_LOG_HEADER_LEN);
+	  }
+  }
+  
+  private void assertDelete( String fileName, boolean failed ) {
+	  assertHasLogged("NameNode.delete: "+fileName, DIR_LOG_HEADER_LEN+1);
+      assertHasLogged("NameSystem.delete: "+fileName, DIR_LOG_HEADER_LEN);
+      assertHasLogged("FSDirectory.delete: "+fileName, DIR_LOG_HEADER_LEN);
+      if( failed )
+        assertHasLogged("FSDirectory.unprotectedDelete: "
+            +"failed to remove "+fileName, DIR_LOG_HEADER_LEN );
+      else
+        assertHasLogged("FSDirectory.unprotectedDelete: "
+            +fileName+" is removed", DIR_LOG_HEADER_LEN);
+  }
+  
+  private void assertRename( String src, String dst, boolean failed ) {
+	  assertHasLogged("NameNode.rename: "+src+" to "+dst, DIR_LOG_HEADER_LEN+1);
+	  assertHasLogged("NameSystem.renameTo: "+src+" to "+dst, DIR_LOG_HEADER_LEN );
+	  assertHasLogged("FSDirectory.renameTo: "+src+" to "+dst, DIR_LOG_HEADER_LEN );
+	  if( failed )
+		assertHasLogged("FSDirectory.unprotectedRenameTo: "
+                         +"failed to rename "+src+" to "+dst, DIR_LOG_HEADER_LEN);
+	  else
+	    assertHasLogged("FSDirectory.unprotectedRenameTo: "
+                       +src+" is renamed to "+dst, DIR_LOG_HEADER_LEN );
+  }
+  
+  private void assertHasLogged( String target, int headerLen ) {
+	  String line;
+	  boolean notFound = true;
+	  try {
+	      while( notFound && (line=logfh.readLine()) != null ) {
+		      if(line.length()>headerLen && line.startsWith(target, headerLen))
+			      notFound = false;
+	      }
+	  } catch(java.io.IOException e) {
+		  throw new AssertionFailedError("error reading the log file");
+	  }
+	  if(notFound) {
+		  throw new AssertionFailedError(target+" not logged");
+	  }
+  }
+
+  //
+  //     modify config for test
+  //
+  private void configureDFS() throws IOException {
+	// set given config param to override other config settings
+	conf.setInt("test.dfs.block_size", BLOCK_SIZE);
+	// verify that config changed
+	assertTrue(BLOCK_SIZE == conf.getInt("test.dfs.block_size", 2)); // 2 is an intentional obviously-wrong block size
+	// downsize for testing (just to save resources)
+	conf.setInt("dfs.namenode.handler.count", 3);
+	conf.setLong("dfs.blockreport.intervalMsec", 50*1000L);
+	conf.setLong("dfs.datanode.startupMsec", 15*1000L);
+	conf.setInt("dfs.replication", 2);
+	//System.setProperty("HADOOP_LOG_DIR", baseDirSpecified+"/logs");
+	conf.setInt("hadoop.logfile.count", 1);
+	conf.setInt("hadoop.logfile.size", 1000000000);
+
+	// logging configuration for namenode
+    logFile = LogFormatter.initFileHandler( conf, "namenode" );
+    LogFormatter.setShowThreadIDs(true);
+    NameNode.stateChangeLog.setLevel( Level.FINEST);
+  }
+  
+  private void startDFS( int dataNodeNum) throws IOException {
+    //
+    //          start a NameNode
+    String nameNodeSocketAddr = "localhost:" + nameNodePort;
+    conf.set("fs.default.name", nameNodeSocketAddr);
+    
+    String nameFSDir = baseDirSpecified + "/name";
+	conf.set("dfs.name.dir", nameFSDir);
+	
+    NameNode.format(conf);
+    
+    nameNodeDaemon = new NameNode(new File(nameFSDir), nameNodePort, conf);
+
+     //
+      //        start DataNodes
+      //
+      for (int i = 0; i < dataNodeNum; i++) {
+        // uniquely config real fs path for data storage for this datanode
+        String dataDir = baseDirSpecified + "/datanode" + i;
+        conf.set("dfs.data.dir", dataDir);
+        DataNode dn = DataNode.makeInstanceForDir(dataDir, conf);
+        if (dn != null) {
+          dataNodeDaemons.add(dn);
+          (new Thread(dn, "DataNode" + i + ": " + dataDir)).start();
+        }
+      }
+	         
+      assertTrue("incorrect datanodes for test to continue",
+            (dataNodeDaemons.size() == dataNodeNum));
+      //
+      //          wait for datanodes to report in
+      try {
+        awaitQuiescence();
+      } catch( InterruptedException e) {
+    	  e.printStackTrace();
+      }
+      
+      //  act as if namenode is a remote process
+      dfsClient = new DFSClient(new InetSocketAddress("localhost", nameNodePort), conf);
+  }
+
+  private void shutdownDFS() {
+      // shutdown client
+      if (dfsClient != null) {
+        try {
+          msg("close down subthreads of DFSClient");
+          dfsClient.close();
+        } catch (Exception ignored) { }
+        msg("finished close down of DFSClient");
+      }
+
+      //
+      // shut down datanode daemons (this takes advantage of being same-process)
+      msg("begin shutdown of all datanode daemons" );
+
+      for (int i = 0; i < dataNodeDaemons.size(); i++) {
+        DataNode dataNode = (DataNode) dataNodeDaemons.get(i);
+        try {
+          dataNode.shutdown();
+        } catch (Exception e) {
+           msg("ignoring exception during (all) datanode shutdown, e=" + e);
+        }
+      }
+      msg("finished shutdown of all datanode daemons");
+      
+      // shutdown namenode
+      msg("begin shutdown of namenode daemon");
+      try {
+        nameNodeDaemon.stop();
+      } catch (Exception e) {
+        msg("ignoring namenode shutdown exception=" + e);
+      }
+      msg("finished shutdown of namenode daemon");
+  }
+  
+  /** Wait for the DFS datanodes to become quiescent.
+   * The initial implementation is to sleep for some fixed amount of time,
+   * but a better implementation would be to really detect when distributed
+   * operations are completed.
+   * @throws InterruptedException
+   */
+  private void awaitQuiescence() throws InterruptedException {
+    // ToDo: Need observer pattern, not static sleep
+    // Doug suggested that the block report interval could be made shorter
+    //   and then observing that would be a good way to know when an operation
+    //   was complete (quiescence detect).
+    sleepAtLeast(30000);
+  }
+
+  private void msg(String s) {
+    //System.out.println(s);
+    LOG.info(s);
+  }
+
+  public static void sleepAtLeast(int tmsec) {
+    long t0 = System.currentTimeMillis();
+    long t1 = t0;
+    long tslept = t1 - t0;
+    while (tmsec > tslept) {
+      try {
+        long tsleep = tmsec - tslept;
+        Thread.sleep(tsleep);
+        t1 = System.currentTimeMillis();
+      }  catch (InterruptedException ie) {
+        t1 = System.currentTimeMillis();
+      }
+      tslept = t1 - t0;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    String usage = "Usage: ClusterTestDFSNameSpaceChangeLogging (no args)";
+    if (args.length != 0) {
+      System.err.println(usage);
+      System.exit(-1);
+    }
+    String[] testargs = {"org.apache.hadoop.dfs.ClusterTestDFSNameSpaceChangeLogging"};
+    junit.textui.TestRunner.main(testargs);
+  }
+
+}



Mime
View raw message