hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r718229 - in /hadoop/core/trunk: ./ src/core/org/apache/hadoop/fs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Mon, 17 Nov 2008 12:23:16 GMT
Author: ddas
Date: Mon Nov 17 04:23:15 2008
New Revision: 718229

URL: http://svn.apache.org/viewvc?rev=718229&view=rev
Log:
HADOOP-4188. Removes task's dependency on concrete filesystems. Contributed by Sharad Agarwal.

Removed:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task_FileSystemCounter.properties
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Nov 17 04:23:15 2008
@@ -30,6 +30,9 @@
 
     HADOOP-4628. Move Hive into a standalone subproject. (omalley)
 
+    HADOOP-4188. Removes task's dependency on concrete filesystems.
+    (Sharad Agarwal via ddas)
+
   NEW FEATURES
 
     HADOOP-4575. Add a proxy service for relaying HsftpFileSystem requests.

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FileSystem.java Mon Nov 17 04:23:15 2008
@@ -68,11 +68,15 @@
   private static final Map<Class<? extends FileSystem>, Statistics> 
     statisticsTable =
       new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
+  
+  /** Recording statistics per FileSystem URI scheme */
+  private static final Map<String, Statistics> statsByUriScheme = 
+    new HashMap<String, Statistics>();
 
   /**
    * The statistics for this file system.
    */
-  protected final Statistics statistics;
+  protected Statistics statistics;
 
   /**
    * A cache of files that should be deleted when filsystem is closed
@@ -1365,6 +1369,7 @@
     }
     FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
     fs.initialize(uri, conf);
+    statsByUriScheme.put(uri.getScheme(), fs.statistics);
     return fs;
   }
 
@@ -1516,7 +1521,16 @@
   }
   
   /**
+   * Get the Map of Statistics object indexed by URI Scheme.
+   * @return a Map having a key as URI scheme and value as Statistics object
+   */
+  public static synchronized Map<String, Statistics> getStatistics() {
+    return statsByUriScheme;
+  }
+  
+  /**
    * Get the statistics for a particular file system
+   * @deprecated Consider using {@link #getStatistics()} instead.
    * @param cls the class to lookup
    * @return a statistics object
    */

Modified: hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java (original)
+++ hadoop/core/trunk/src/core/org/apache/hadoop/fs/FilterFileSystem.java Mon Nov 17 04:23:15
2008
@@ -52,6 +52,7 @@
   
   public FilterFileSystem(FileSystem fs) {
     this.fs = fs;
+    this.statistics = fs.statistics;
   }
 
   /** Called after a new FileSystem instance is constructed.

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Mon Nov 17 04:23:15 2008
@@ -23,8 +23,10 @@
 import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -32,13 +34,9 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RawLocalFileSystem;
-import org.apache.hadoop.fs.kfs.KosmosFileSystem;
-import org.apache.hadoop.fs.s3.S3FileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Text;
@@ -75,13 +73,18 @@
   
   /**
    * Counters to measure the usage of the different file systems.
+   * Always return the String array with two elements. First one is the name of  
+   * BYTES_READ counter and second one is of the BYTES_WRITTEN counter.
    */
-  protected static enum FileSystemCounter {
-    LOCAL_READ, LOCAL_WRITE, 
-    HDFS_READ, HDFS_WRITE, 
-    S3_READ, S3_WRITE,
-    KFS_READ, KFSWRITE
+  protected static String[] getFileSystemCounterNames(String uriScheme) {
+    String scheme = uriScheme.toUpperCase();
+    return new String[]{scheme+"_BYTES_READ", scheme+"_BYTES_WRITTEN"};
   }
+  
+  /**
+   * Name of the FileSystem counters' group
+   */
+  protected static final String FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";
 
   ///////////////////////////////////////////////////////////
   // Helper methods to construct task-output paths
@@ -513,15 +516,11 @@
     private FileSystem.Statistics stats;
     private Counters.Counter readCounter = null;
     private Counters.Counter writeCounter = null;
-    private FileSystemCounter read;
-    private FileSystemCounter write;
-
-    FileSystemStatisticUpdater(FileSystemCounter read,
-                               FileSystemCounter write,
-                               Class<? extends FileSystem> cls) {
-      stats = FileSystem.getStatistics(cls);
-      this.read = read;
-      this.write = write;
+    private String[] counterNames;
+    
+    FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
+      this.stats = stats;
+      this.counterNames = getFileSystemCounterNames(uriScheme);
     }
 
     void updateCounters() {
@@ -529,14 +528,16 @@
       long newWriteBytes = stats.getBytesWritten();
       if (prevReadBytes != newReadBytes) {
         if (readCounter == null) {
-          readCounter = counters.findCounter(read);
+          readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
+              counterNames[0]);
         }
         readCounter.increment(newReadBytes - prevReadBytes);
         prevReadBytes = newReadBytes;
       }
       if (prevWriteBytes != newWriteBytes) {
         if (writeCounter == null) {
-          writeCounter = counters.findCounter(write);
+          writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
+              counterNames[1]);
         }
         writeCounter.increment(newWriteBytes - prevWriteBytes);
         prevWriteBytes = newWriteBytes;
@@ -545,31 +546,20 @@
   }
   
   /**
-   * A list of all of the file systems that we want to report on.
+   * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
    */
-  private List<FileSystemStatisticUpdater> statisticUpdaters =
-     new ArrayList<FileSystemStatisticUpdater>();
-  {
-    statisticUpdaters.add
-      (new FileSystemStatisticUpdater(FileSystemCounter.LOCAL_READ,
-                                      FileSystemCounter.LOCAL_WRITE,
-                                      RawLocalFileSystem.class));
-    statisticUpdaters.add
-      (new FileSystemStatisticUpdater(FileSystemCounter.HDFS_READ,
-                                      FileSystemCounter.HDFS_WRITE,
-                                      DistributedFileSystem.class));
-    statisticUpdaters.add
-    (new FileSystemStatisticUpdater(FileSystemCounter.KFS_READ,
-                                    FileSystemCounter.KFSWRITE,
-                                    KosmosFileSystem.class));
-    statisticUpdaters.add
-    (new FileSystemStatisticUpdater(FileSystemCounter.S3_READ,
-                                    FileSystemCounter.S3_WRITE,
-                                    S3FileSystem.class));
-  }
-
+  private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
+     new HashMap<String, FileSystemStatisticUpdater>();
+  
   private synchronized void updateCounters() {
-    for(FileSystemStatisticUpdater updater: statisticUpdaters) {
+    for(Map.Entry<String, FileSystem.Statistics> entry : 
+      FileSystem.getStatistics().entrySet()) {
+      String uriScheme = entry.getKey();
+      FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
+      if(updater==null) {//new FileSystem has been found in the cache
+        updater = new FileSystemStatisticUpdater(uriScheme, entry.getValue());
+        statisticUpdaters.put(uriScheme, updater);
+      }
       updater.updateCounters();
     }
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestCounters.java Mon Nov 17 04:23:15
2008
@@ -67,8 +67,7 @@
   }
   
   public void testCounters() throws IOException {
-    Enum[] keysWithResource = {Task.FileSystemCounter.HDFS_READ, 
-                               Task.Counter.MAP_INPUT_BYTES, 
+    Enum[] keysWithResource = {Task.Counter.MAP_INPUT_BYTES, 
                                Task.Counter.MAP_OUTPUT_BYTES};
     
     Enum[] keysWithoutResource = {myCounters.TEST1, myCounters.TEST2};

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Mon Nov 17
04:23:15 2008
@@ -203,9 +203,11 @@
     assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
     Counters counters = result.job.getCounters();
     long hdfsRead = 
-      counters.findCounter(Task.FileSystemCounter.HDFS_READ).getCounter();
+      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+          Task.getFileSystemCounterNames("hdfs")[0]).getCounter();
     long hdfsWrite = 
-      counters.findCounter(Task.FileSystemCounter.HDFS_WRITE).getCounter();
+      counters.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+          Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
     assertEquals(result.output.length(), hdfsWrite);
     assertEquals(input.length(), hdfsRead);
 

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java?rev=718229&r1=718228&r2=718229&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestReduceFetch.java Mon Nov 17 04:23:15
2008
@@ -35,8 +35,6 @@
 import org.apache.hadoop.mapred.TestMapCollection.FakeIF;
 import org.apache.hadoop.mapred.TestMapCollection.FakeSplit;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.HDFS_WRITE;
-import static org.apache.hadoop.mapred.Task.FileSystemCounter.LOCAL_READ;
 
 public class TestReduceFetch extends TestCase {
 
@@ -107,8 +105,10 @@
     job.set("mapred.job.reduce.input.buffer.percent", "0.0");
     job.setNumMapTasks(3);
     Counters c = runJob(job);
-    final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
-    final long localRead = c.findCounter(LOCAL_READ).getCounter();
+    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
     assertTrue("Expected more bytes read from local (" +
         localRead + ") than written to HDFS (" + hdfsWritten + ")",
         hdfsWritten <= localRead);
@@ -126,8 +126,10 @@
     job.setNumTasksToExecutePerJvm(1);
     job.set("mapred.job.shuffle.merge.percent", "1.0");
     Counters c = runJob(job);
-    final long hdfsWritten = c.findCounter(HDFS_WRITE).getCounter();
-    final long localRead = c.findCounter(LOCAL_READ).getCounter();
+    final long hdfsWritten = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("hdfs")[1]).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
     assertTrue("Expected at least 1MB fewer bytes read from local (" +
         localRead + ") than written to HDFS (" + hdfsWritten + ")",
         hdfsWritten >= localRead + 1024 * 1024);
@@ -138,7 +140,8 @@
     job.set("mapred.job.reduce.input.buffer.percent", "1.0");
     job.setNumMapTasks(3);
     Counters c = runJob(job);
-    final long localRead = c.findCounter(LOCAL_READ).getCounter();
+    final long localRead = c.findCounter(Task.FILESYSTEM_COUNTER_GROUP, 
+        Task.getFileSystemCounterNames("file")[0]).getCounter();
     assertTrue("Non-zero read from local: " + localRead, localRead == 0);
   }
 



Mime
View raw message