hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r505421 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/metrics/
Date Fri, 09 Feb 2007 18:58:50 GMT
Author: cutting
Date: Fri Feb  9 10:58:49 2007
New Revision: 505421

URL: http://svn.apache.org/viewvc?view=rev&rev=505421
Log:
HADOOP-954.  Change use of metrics to use callback mechanism.  Contributed by David &
Nigel.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java
      - copied, changed from r505360, lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Metrics.java
Removed:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Metrics.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Feb  9 10:58:49 2007
@@ -31,6 +31,10 @@
  5. HADOOP-992.  Fix MiniMR unit tests to use MiniDFS when specified,
     rather than the local FS.  (omalley via cutting)
 
+ 6. HADOOP-954.  Change use of metrics to use callback mechanism.
+    Also rename utility class Metrics to MetricsUtil.
+    (David Bowen & Nigel Daley via cutting)
+
 
 Release 0.11.0 - 2007-02-02
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Fri Feb  9 10:58:49 2007
@@ -22,7 +22,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.util.*;
@@ -34,7 +34,9 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.Updater;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -116,55 +118,72 @@
     long heartBeatInterval;
     private DataStorage storage = null;
     private StatusHttpServer infoServer;
+    private DataNodeMetrics myMetrics = new DataNodeMetrics();
     private static InetSocketAddress nameNodeAddr;
     private static DataNode datanodeObject = null;
-    private class DataNodeMetrics {
-      private MetricsRecord metricsRecord = null;
-      
-      
-      private long bytesWritten = 0L;
-      private long bytesRead = 0L;
-      private long blocksWritten = 0L;
-      private long blocksRead = 0L;
-      private long blocksReplicated = 0L;
-      private long blocksRemoved = 0L;
+
+    private class DataNodeMetrics implements Updater {
+      private final MetricsRecord metricsRecord;
+      private int bytesWritten = 0;
+      private int bytesRead = 0;
+      private int blocksWritten = 0;
+      private int blocksRead = 0;
+      private int blocksReplicated = 0;
+      private int blocksRemoved = 0;
       
       DataNodeMetrics() {
-        metricsRecord = Metrics.createRecord("dfs", "datanode");
+        MetricsContext context = MetricsUtil.getContext("dfs");
+        metricsRecord = MetricsUtil.createRecord(context, "datanode");
+        context.registerUpdater(this);
+      }
+      
+      /**
+       * Since this object is a registered updater, this method will be called
+       * periodically, e.g. every 5 seconds.
+       */
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          metricsRecord.incrMetric("bytes_read", bytesRead);
+          metricsRecord.incrMetric("bytes_written", bytesWritten);
+          metricsRecord.incrMetric("blocks_read", blocksRead);
+          metricsRecord.incrMetric("blocks_written", blocksWritten);
+          metricsRecord.incrMetric("blocks_replicated", blocksReplicated);
+          metricsRecord.incrMetric("blocks_removed", blocksRemoved);
+              
+          bytesWritten = 0;
+          bytesRead = 0;
+          blocksWritten = 0;
+          blocksRead = 0;
+          blocksReplicated = 0;
+          blocksRemoved = 0;
+        }
+        metricsRecord.update();
       }
-      
+
       synchronized void readBytes(int nbytes) {
         bytesRead += nbytes;
-        Metrics.report(metricsRecord, "bytes_read", bytesRead);
       }
       
       synchronized void wroteBytes(int nbytes) {
         bytesWritten += nbytes;
-        Metrics.report(metricsRecord, "bytes_written", bytesWritten);
       }
       
       synchronized void readBlocks(int nblocks) {
         blocksRead += nblocks;
-        Metrics.report(metricsRecord, "blocks_read", blocksRead);
       }
       
       synchronized void wroteBlocks(int nblocks) {
         blocksWritten += nblocks;
-        Metrics.report(metricsRecord, "blocks_written", blocksWritten);
       }
       
       synchronized void replicatedBlocks(int nblocks) {
         blocksReplicated += nblocks;
-        Metrics.report(metricsRecord, "blocks_replicated", blocksReplicated);
       }
       
       synchronized void removedBlocks(int nblocks) {
         blocksRemoved += nblocks;
-        Metrics.report(metricsRecord, "blocks_removed", blocksRemoved);
       }
     }
-    
-    DataNodeMetrics myMetrics = new DataNodeMetrics();
 
     /**
      * Create the DataNode given a configuration and an array of dataDirs.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSDirectory.java Fri Feb  9 10:58:49
2007
@@ -25,7 +25,8 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
 
 /*************************************************
  * FSDirectory stores the filesystem directory state.
@@ -223,7 +224,7 @@
                     v.add(blocks[i]);
                 }
             }
-            Metrics.report(metricsRecord, "files_deleted", ++numFilesDeleted);
+            incrDeletedFileCount();
             for (Iterator it = children.values().iterator(); it.hasNext(); ) {
                 INode child = (INode) it.next();
                 child.collectSubtreeBlocks(v);
@@ -307,19 +308,25 @@
     FSImage fsImage;  
     boolean ready = false;
     int namespaceID = 0;    // TODO: move to FSImage class, it belongs there
-    // Metrics members
-    private MetricsRecord metricsRecord = null;
-    private int numFilesDeleted = 0;
+    // Metrics record
+    private MetricsRecord directoryMetrics = null;
     
     /** Access an existing dfs name directory. */
     public FSDirectory(File[] dirs) throws IOException {
       this.fsImage = new FSImage( dirs );
+      initialize();
     }
 
     public FSDirectory(FSImage fsImage) throws IOException {
       this.fsImage = fsImage;
+      initialize();
     }
     
+    private void initialize() {
+      MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+      directoryMetrics = MetricsUtil.createRecord(metricsContext, "FSDirectory");
+    }
+
     void loadFSImage( Configuration conf ) throws IOException {
       fsImage.loadFSImage( conf );
       synchronized (this) {
@@ -327,9 +334,13 @@
         this.notifyAll();
         fsImage.getEditLog().create();
       }
-      metricsRecord = Metrics.createRecord("dfs", "namenode");
     }
 
+    private void incrDeletedFileCount() {
+        directoryMetrics.incrMetric("files_deleted", 1);
+        directoryMetrics.update();
+    }
+    
     /**
      * Shutdown the filestore
      */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Fri Feb  9 10:58:49 2007
@@ -27,9 +27,12 @@
 
 import java.io.*;
 import java.net.*;
+import org.apache.hadoop.dfs.DatanodeProtocol.DataNodeAction;
 
 import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
 
 /**********************************************************
  * NameNode serves as both directory namespace manager and
@@ -110,33 +113,52 @@
       fsimage.getEditLog().close();
     }
 
-    private class NameNodeMetrics {
-      private MetricsRecord metricsRecord = null;
-      
-      private long numFilesCreated = 0L;
-      private long numFilesOpened = 0L;
-      private long numFilesRenamed = 0L;
-      private long numFilesListed = 0L;
+    private class NameNodeMetrics implements Updater {
+      private final MetricsRecord metricsRecord;
+      private int numFilesCreated = 0;
+      private int numFilesOpened = 0;
+      private int numFilesRenamed = 0;
+      private int numFilesListed = 0;
       
       NameNodeMetrics() {
-        metricsRecord = Metrics.createRecord("dfs", "namenode");
+        MetricsContext metricsContext = MetricsUtil.getContext("dfs");
+        metricsRecord = MetricsUtil.createRecord(metricsContext, "namenode");
+        metricsContext.registerUpdater(this);
+      }
+      
+      /**
+       * Since this object is a registered updater, this method will be called
+       * periodically, e.g. every 5 seconds.
+       */
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          metricsRecord.incrMetric("files_created", numFilesCreated);
+          metricsRecord.incrMetric("files_opened", numFilesOpened);
+          metricsRecord.incrMetric("files_renamed", numFilesRenamed);
+          metricsRecord.incrMetric("files_listed", numFilesListed);
+              
+          numFilesCreated = 0;
+          numFilesOpened = 0;
+          numFilesRenamed = 0;
+          numFilesListed = 0;
+        }
+        metricsRecord.update();
       }
       
       synchronized void createFile() {
-        Metrics.report(metricsRecord, "files_created", ++numFilesCreated);
+        ++numFilesCreated;
       }
       
       synchronized void openFile() {
-        Metrics.report(metricsRecord, "files_opened", ++numFilesOpened);
+        ++numFilesOpened;
       }
       
       synchronized void renameFile() {
-        Metrics.report(metricsRecord, "files_renamed", ++numFilesRenamed);
+        ++numFilesRenamed;
       }
       
       synchronized void listFile(int nfiles) {
         numFilesListed += nfiles;
-        Metrics.report(metricsRecord, "files_listed", numFilesListed);
       }
     }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Feb  9 10:58:49
2007
@@ -31,7 +31,9 @@
 import java.util.*;
 
 import org.apache.hadoop.metrics.MetricsRecord;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
 
 /*******************************************************
  * JobTracker is the central location for submitting and 
@@ -372,48 +374,66 @@
         }
     }
 
-    static class JobTrackerMetrics {
+    static class JobTrackerMetrics implements Updater {
       private MetricsRecord metricsRecord = null;
-      
-      private long numMapTasksLaunched = 0L;
-      private long numMapTasksCompleted = 0L;
-      private long numReduceTasksLaunched = 0L;
-      private long numReduceTasksCompleted = 0L;
-      private long numJobsSubmitted = 0L;
-      private long numJobsCompleted = 0L;
+      private int numMapTasksLaunched = 0;
+      private int numMapTasksCompleted = 0;
+      private int numReduceTasksLaunched = 0;
+      private int numReduceTasksCompleted = 0;
+      private int numJobsSubmitted = 0;
+      private int numJobsCompleted = 0;
       
       JobTrackerMetrics() {
-        metricsRecord = Metrics.createRecord("mapred", "jobtracker");
+          MetricsContext context = MetricsUtil.getContext("mapred");
+          metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+          context.registerUpdater(this);
+      }
+      
+      /**
+       * Since this object is a registered updater, this method will be called
+       * periodically, e.g. every 5 seconds.
+       */
+      public void doUpdates(MetricsContext unused) {
+        synchronized (this) {
+          metricsRecord.incrMetric("maps_launched", numMapTasksLaunched);
+          metricsRecord.incrMetric("maps_completed", numMapTasksCompleted);
+          metricsRecord.incrMetric("reduces_launched", numReduceTasksLaunched);
+          metricsRecord.incrMetric("reduces_completed", numReduceTasksCompleted);
+          metricsRecord.incrMetric("jobs_submitted", numJobsSubmitted);
+          metricsRecord.incrMetric("jobs_completed", numJobsCompleted);
+              
+          numMapTasksLaunched = 0;
+          numMapTasksCompleted = 0;
+          numReduceTasksLaunched = 0;
+          numReduceTasksCompleted = 0;
+          numJobsSubmitted = 0;
+          numJobsCompleted = 0;
+        }
+        metricsRecord.update();
       }
       
       synchronized void launchMap() {
-        Metrics.report(metricsRecord, "maps_launched",
-            ++numMapTasksLaunched);
+        ++numMapTasksLaunched;
       }
       
       synchronized void completeMap() {
-        Metrics.report(metricsRecord, "maps_completed",
-            ++numMapTasksCompleted);
+        ++numMapTasksCompleted;
       }
       
       synchronized void launchReduce() {
-        Metrics.report(metricsRecord, "reduces_launched",
-            ++numReduceTasksLaunched);
+        ++numReduceTasksLaunched;
       }
       
       synchronized void completeReduce() {
-        Metrics.report(metricsRecord, "reduces_completed",
-            ++numReduceTasksCompleted);
+        ++numReduceTasksCompleted;
       }
       
       synchronized void submitJob() {
-        Metrics.report(metricsRecord, "jobs_submitted",
-            ++numJobsSubmitted);
+        ++numJobsSubmitted;
       }
       
       synchronized void completeJob() {
-        Metrics.report(metricsRecord, "jobs_completed",
-            ++numJobsCompleted);
+        ++numJobsCompleted;
       }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java Fri Feb 
9 10:58:49 2007
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.util.Progressable;
 
 /** The location of a map output file, as passed to a reduce task via the
@@ -104,7 +105,9 @@
    * @param pingee a status object that wants to know when we make progress
    * @param timeout number of ms for connection and read timeout
    * @throws IOException when something goes wrong
+   * @deprecated
    */
+  @Deprecated
   public long getFile(FileSystem fileSys, 
                       Path localFilename, 
                       int reduce,
@@ -179,6 +182,7 @@
    */
   public Path getFile(InMemoryFileSystem inMemFileSys,
                       FileSystem localFileSys,
+                      MetricsRecord shuffleMetrics,
                       Path localFilename, 
                       int reduce,
                       int timeout) throws IOException, InterruptedException {
@@ -224,6 +228,8 @@
           int len = input.read(buffer);
           while (len > 0) {
             totalBytes += len;
+            shuffleMetrics.incrMetric("input_bytes", len);
+            shuffleMetrics.update();
             output.write(buffer, 0 ,len);
             if (currentThread.isInterrupted()) {
               throw new InterruptedException();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Fri Feb  9 10:58:49
2007
@@ -33,11 +33,13 @@
 import org.apache.hadoop.metrics.MetricsRecord;
 
 import org.apache.commons.logging.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.mapred.ReduceTask.ValuesIterator;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.Updater;
 
 
 /** A Map task. */
@@ -59,27 +61,25 @@
   }
   
   private class MapTaskMetrics {
-    private MetricsRecord metricsRecord = null;
+    private MetricsRecord mapInputMetrics = null;
+    private MetricsRecord mapOutputMetrics = null;
     
-    private long numInputRecords = 0L;
-    private long numInputBytes = 0L;
-    private long numOutputRecords = 0L;
-    private long numOutputBytes = 0L;
-    
-    MapTaskMetrics(String taskId) {
-      metricsRecord = Metrics.createRecord("mapred", "map", "taskid", taskId);
+    MapTaskMetrics(String user) {
+        MetricsContext context = MetricsUtil.getContext("mapred");
+        mapInputMetrics = MetricsUtil.createRecord(context, "mapInput", "user", user);
+        mapOutputMetrics = MetricsUtil.createRecord(context, "mapOutput", "user", user);
     }
     
-    synchronized void mapInput(long numBytes) {
-      Metrics.report(metricsRecord, "input_records", ++numInputRecords);
-      numInputBytes += numBytes;
-      Metrics.report(metricsRecord, "input_bytes", numInputBytes);
+    synchronized void mapInput(int numBytes) {
+        mapInputMetrics.incrMetric("input_records", 1);
+        mapInputMetrics.incrMetric("input_bytes", numBytes);
+        mapInputMetrics.update();
     }
     
-    synchronized void mapOutput(long numBytes) {
-      Metrics.report(metricsRecord, "output_records", ++numOutputRecords);
-      numOutputBytes += numBytes;
-      Metrics.report(metricsRecord, "output_bytes", numOutputBytes);
+    synchronized void mapOutput(int numBytes) {
+        mapOutputMetrics.incrMetric("output_records", 1);
+        mapOutputMetrics.incrMetric("output_bytes", numBytes);
+        mapOutputMetrics.update();
     }
     
   }
@@ -96,7 +96,6 @@
                  int partition, InputSplit split) {
     super(jobId, jobFile, tipId, taskId, partition);
     this.split = split;
-    myMetrics = new MapTaskMetrics(taskId);
   }
 
   public boolean isMapTask() {
@@ -134,13 +133,14 @@
     split = new FileSplit();
     split.readFields(in);
     if (myMetrics == null) {
-        myMetrics = new MapTaskMetrics(getTaskId());
+        myMetrics = new MapTaskMetrics("unknown");
     }
   }
 
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
 
+    myMetrics = new MapTaskMetrics(job.getUser());
     Reporter reporter = getReporter(umbilical, getProgress());
 
     MapOutputBuffer collector = new MapOutputBuffer(umbilical, job, reporter);
@@ -164,7 +164,7 @@
           setProgress(getProgress());
           long beforePos = getPos();
           boolean ret = rawIn.next(key, value);
-          myMetrics.mapInput(getPos() - beforePos);
+          myMetrics.mapInput((int)(getPos() - beforePos));
           return ret;
         }
         public long getPos() throws IOException { return rawIn.getPos(); }
@@ -326,7 +326,7 @@
         int partNumber = partitioner.getPartition(key, value, partitions);
         sortImpl[partNumber].addKeyValue(keyOffset, keyLength, valLength);
 
-        myMetrics.mapOutput(keyValBuffer.getLength() - keyOffset);
+        myMetrics.mapOutput((int)(keyValBuffer.getLength() - keyOffset));
 
         //now check whether we need to spill to disk
         long totalMem = 0;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Feb  9 10:58:49
2007
@@ -21,7 +21,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.fs.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
@@ -44,21 +44,22 @@
   }
 
   private class ReduceTaskMetrics {
-    private MetricsRecord metricsRecord = null;
+    private final MetricsRecord inputMetrics, outputMetrics;
     
-    private long numInputRecords = 0L;
-    private long numOutputRecords = 0L;
-    
-    ReduceTaskMetrics(String taskId) {
-      metricsRecord = Metrics.createRecord("mapred", "reduce", "taskid", taskId);
+    ReduceTaskMetrics(String user) {
+        MetricsContext context = MetricsUtil.getContext("mapred");
+        inputMetrics = MetricsUtil.createRecord(context, "reduceInput", "user", user);
+        outputMetrics = MetricsUtil.createRecord(context, "reduceOutput", "user", user);
     }
     
     synchronized void reduceInput() {
-      Metrics.report(metricsRecord, "input_records", ++numInputRecords);
+        inputMetrics.incrMetric("input_records", 1);
+        inputMetrics.update();
     }
     
     synchronized void reduceOutput() {
-      Metrics.report(metricsRecord, "output_records", ++numOutputRecords);
+        outputMetrics.incrMetric("output_records", 1);
+        outputMetrics.update();
     }
   }
   
@@ -84,7 +85,6 @@
                     int partition, int numMaps) {
     super(jobId, jobFile, tipId, taskId, partition);
     this.numMaps = numMaps;
-    myMetrics = new ReduceTaskMetrics(taskId);
   }
 
   public TaskRunner createRunner(TaskTracker tracker) throws IOException {
@@ -116,7 +116,7 @@
 
     numMaps = in.readInt();
     if (myMetrics == null) {
-        myMetrics = new ReduceTaskMetrics(getTaskId());
+        myMetrics = new ReduceTaskMetrics("unknown");
     }
   }
 
@@ -224,6 +224,7 @@
 
   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException {
+    myMetrics = new ReduceTaskMetrics(job.getUser());
     Class valueClass = job.getMapOutputValueClass();
     Reducer reducer = (Reducer)ReflectionUtils.newInstance(
                                   job.getReducerClass(), job);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Fri Feb  9
10:58:49 2007
@@ -24,6 +24,9 @@
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.util.*;
 
 import java.io.*;
@@ -116,6 +119,11 @@
   private MapOutputCopier[] copiers = null;
   
   /**
+   * The threads for fetching the files.
+   */
+  private MetricsRecord shuffleMetrics = null;
+  
+  /**
    * the minimum interval between jobtracker polls
    */
   private static final long MIN_POLL_INTERVAL = 5000;
@@ -275,8 +283,8 @@
       // a working filename that will be unique to this attempt
       Path tmpFilename = new Path(finalFilename + "-" + id);
       // this copies the map output file
-      tmpFilename = loc.getFile(inMemFileSys, localFileSys, tmpFilename,
-                               reduceTask.getPartition(),
+      tmpFilename = loc.getFile(inMemFileSys, localFileSys, shuffleMetrics,
+                               tmpFilename, reduceTask.getPartition(),
                                STALLED_COPY_TIMEOUT);
       if (tmpFilename == null)
         throw new IOException("File " + finalFilename + "-" + id + 
@@ -384,6 +392,10 @@
     this.uniqueHosts = new HashSet();
     
     this.lastPollTime = 0;
+
+    MetricsContext metricsContext = MetricsUtil.getContext("mapred");
+    this.shuffleMetrics = MetricsUtil.createRecord(
+      metricsContext, "shuffleInput", "user", conf.getUser());
   }
 
   /** Assemble all of the map output files */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Feb  9 10:58:49
2007
@@ -21,7 +21,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.ipc.*;
-import org.apache.hadoop.metrics.Metrics;
+import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -38,6 +38,7 @@
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
+import org.apache.hadoop.metrics.MetricsContext;
 
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.net.DNS;
@@ -125,17 +126,16 @@
     private class TaskTrackerMetrics {
       private MetricsRecord metricsRecord = null;
       
-      private long totalTasksCompleted = 0L;
-      
       TaskTrackerMetrics() {
-        metricsRecord = Metrics.createRecord("mapred", "tasktracker");
+          MetricsContext context = MetricsUtil.getContext("mapred");
+          metricsRecord = MetricsUtil.createRecord(context, "tasktracker");
       }
       
       synchronized void completeTask() {
         if (metricsRecord != null) {
-          metricsRecord.setMetric("tasks_completed", ++totalTasksCompleted);
+          metricsRecord.incrMetric("tasks_completed", 1);
           metricsRecord.setMetric("maps_running", mapTotal);
-          metricsRecord.setMetric("reduce_running", reduceTotal);
+          metricsRecord.setMetric("reduces_running", reduceTotal);
           metricsRecord.update();
         }
       }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java?view=diff&rev=505421&r1=505420&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/ContextFactory.java Fri Feb  9
10:58:49 2007
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Properties;
 import org.apache.hadoop.metrics.spi.AbstractMetricsContext;
+import org.apache.hadoop.metrics.spi.NullContext;
 
 /**
  * Factory class for creating MetricsContext objects.  To obtain an instance
@@ -43,9 +44,14 @@
     
     private static ContextFactory theFactory = null;
     
-    // private Map<String,Object> attributeMap = new HashMap<String,Object>();
-    private Map attributeMap = new HashMap();
-    private Map<String,MetricsContext> contextMap = new HashMap<String,MetricsContext>();
+    private Map<String,Object> attributeMap = new HashMap<String,Object>();
+    private Map<String,AbstractMetricsContext> contextMap = 
+            new HashMap<String,AbstractMetricsContext>();
+    
+    // Used only when contexts, or the ContextFactory itself, cannot be
+    // created.
+    private static Map<String,MetricsContext> nullContextMap = 
+            new HashMap<String,MetricsContext>();
     
     /** Creates a new instance of ContextFactory */
     protected ContextFactory() {
@@ -116,18 +122,31 @@
     public synchronized MetricsContext getContext(String contextName) 
         throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException
     {
-        if (contextMap.containsKey(contextName)) return contextMap.get(contextName);
-        String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
-        String className = (String) getAttribute(classNameAttribute);
-        if (className == null) {
-            className = DEFAULT_CONTEXT_CLASSNAME;
+        AbstractMetricsContext metricsContext = contextMap.get(contextName);
+        if (metricsContext == null) {
+            String classNameAttribute = contextName + CONTEXT_CLASS_SUFFIX;
+            String className = (String) getAttribute(classNameAttribute);
+            if (className == null) {
+                className = DEFAULT_CONTEXT_CLASSNAME;
+            }
+            Class contextClass = Class.forName(className);
+            metricsContext = (AbstractMetricsContext) contextClass.newInstance();
+            metricsContext.init(contextName, this);
+            contextMap.put(contextName, metricsContext);
         }
-        Class contextClass = Class.forName(className);
-        AbstractMetricsContext metricsContext = 
-                (AbstractMetricsContext) contextClass.newInstance();
-        metricsContext.init(contextName, this);
-        contextMap.put(contextName, metricsContext);
         return metricsContext;
+    }
+    
+    /**
+     * Returns a "null" context - one which does nothing.
+     */
+    public static synchronized MetricsContext getNullContext(String contextName) {
+        MetricsContext nullContext = nullContextMap.get(contextName);
+        if (nullContext == null) {
+            nullContext = new NullContext();
+            nullContextMap.put(contextName, nullContext);
+        }
+        return nullContext;
     }
     
     /**

Copied: lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java (from r505360,
lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Metrics.java)
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java?view=diff&rev=505421&p1=lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Metrics.java&r1=505360&p2=lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java&r2=505421
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/Metrics.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/metrics/MetricsUtil.java Fri Feb  9 10:58:49
2007
@@ -25,82 +25,92 @@
 
 /**
  * Utility class to simplify creation and reporting of hadoop metrics.
+ * This class makes the simplifying assumption that each metrics record has
+ * exactly one tag, which defaults to being the hostName.
+ *
  * For examples of usage, see {@link org.apache.hadoop.dfs.DataNode}.
  * @see org.apache.hadoop.metrics.MetricsRecord
  * @see org.apache.hadoop.metrics.MetricsContext
  * @see org.apache.hadoop.metrics.ContextFactory
  * @author Milind Bhandarkar
  */
-public class Metrics {
-  private static final Log LOG =
-      LogFactory.getLog("org.apache.hadoop.util.MetricsUtil");
-  
-  /**
-   * Don't allow creation of a new instance of Metrics
-   */
-  private Metrics() {}
-  
-  /**
-   * Utility method to create and return
-   * a new tagged metrics record instance within the
-   * given <code>contextName</code>.
-   * If exception is thrown while creating the record for any reason, it is
-   * logged, and a null record is returned.
-   * @param contextName name of the context
-   * @param recordName the name of the record
-   * @param tagName name of the tag field of metrics record
-   * @param tagValue value of the tag field
-   * @return newly created metrics record
-   */
-  public static MetricsRecord createRecord(String contextName, String recordName,
-      String tagName, String tagValue) {
-    try {
-      MetricsContext metricsContext =
-          ContextFactory.getFactory().getContext(contextName);
-      if (!metricsContext.isMonitoring()) {metricsContext.startMonitoring();}
-      MetricsRecord metricsRecord = metricsContext.createRecord(recordName);
-      metricsRecord.setTag(tagName, tagValue);
-      return metricsRecord;
-    } catch (Throwable ex) {
-      LOG.warn("Could not create metrics record with context:"+contextName, ex);
-      return null;
+public class MetricsUtil {
+    
+    private static final Log LOG =
+        LogFactory.getLog("org.apache.hadoop.util.MetricsUtil");
+
+    /**
+     * Don't allow creation of a new instance of Metrics
+     */
+    private MetricsUtil() {}
+    
+    /**
+     * Utility method to return the named context.
+     * If the desired context cannot be created for any reason, the exception
+     * is logged, and a null context is returned.
+     */
+    public static MetricsContext getContext(String contextName) {
+        MetricsContext metricsContext;
+        try {
+            metricsContext = ContextFactory.getFactory().getContext(contextName);
+            if (!metricsContext.isMonitoring()) {
+                metricsContext.startMonitoring();
+            }
+        } catch (Exception ex) {
+            LOG.error("Unable to create metrics context " + contextName, ex);
+            metricsContext = ContextFactory.getNullContext(contextName);
+        }
+        return metricsContext;
     }
-  }
-  
-  /**
-   * Utility method to create and return new metrics record instance within the
-   * given <code>contextName</code>. This record is tagged with hostname.
-   * If exception is thrown while creating the record due to any reason, it is
-   * logged, and a null record is returned.
-   * @param contextName name of the context
-   * @param recordName name of the record
-   * @return newly created metrics record
-   */
-  public static MetricsRecord createRecord(String contextName,
-      String recordName) {
-    String hostname = null;
-    try {
-      hostname = InetAddress.getLocalHost().getHostName();
-    } catch (UnknownHostException ex) {
-      LOG.info("Cannot get hostname", ex);
-      hostname = "unknown";
+    
+    /**
+     * Utility method to create and return a new metrics record instance 
+     * within the given name and the specified tag.
+     *
+     * @param context the context
+     * @param recordName the name of the record
+     * @param tagName name of the tag field of metrics record
+     * @param tagValue value of the tag field
+     * @return newly created metrics record
+     */
+    public static MetricsRecord createRecord(MetricsContext context, 
+                                             String recordName,
+                                             String tagName, 
+                                             String tagValue) 
+    {
+        MetricsRecord metricsRecord = context.createRecord(recordName);
+        metricsRecord.setTag(tagName, tagValue);
+        return metricsRecord;
     }
-    return createRecord(contextName, recordName, "hostname", hostname);
-  }
-  
-  /**
-   * Sets the named metric to the specified value in the given metrics record.
-   * Updates the table of buffered data which is to be sent periodically.
-   *
-   * @param record record for which the metric is updated
-   * @param metricName name of the metric
-   * @param metricValue new value of the metric
-   */
-  public static void report(MetricsRecord record, String metricName,
-      long metricValue) {
-    if (record != null) {
-      record.setMetric(metricName, metricValue);
-      record.update();
+
+    /**
+     * Utility method to create and return new metrics record instance within the
+     * given context. This record is tagged with the host name.
+     *
+     * @param context the context
+     * @param recordName name of the record
+     * @return newly created metrics record
+     */
+    public static MetricsRecord createRecord(MetricsContext context, 
+                                            String recordName) 
+    {
+        return createRecord(context, recordName, "hostName", getHostName());
     }
-  }
+    
+    /**
+     * Returns the host name.  If the host name is unobtainable, logs the
+     * exception and returns "unknown".
+     */
+    private static String getHostName() {
+        String hostName = null;
+        try {
+            hostName = InetAddress.getLocalHost().getHostName();
+        } 
+        catch (UnknownHostException ex) {
+            LOG.info("Unable to obtain hostName", ex);
+            hostName = "unknown";
+        }
+        return hostName;
+    }
+
 }



Mime
View raw message