hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r644599 - in /hadoop/core/trunk: ./ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/kfs/ src/java/org/apache/hadoop/fs/s3/ src/java/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 04 Apr 2008 03:31:30 GMT
Author: omalley
Date: Thu Apr  3 20:31:29 2008
New Revision: 644599

URL: http://svn.apache.org/viewvc?rev=644599&view=rev
Log:
HADOOP-3001. Add job counters that measure the number of bytes
read and written to HDFS, S3, KFS, and local file systems.

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task_FileSystemCounter.properties
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
    hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Apr  3 20:31:29 2008
@@ -108,6 +108,9 @@
     HADOOP-2551. More environment variables like HADOOP_NAMENODE_OPTS
     for better control of HADOOP_OPTS for each component. (rangadi)
 
+    HADOOP-3001. Add job counters that measure the number of bytes
+    read and written to HDFS, S3, KFS, and local file systems. (omalley)
+
   IMPROVEMENTS
 
     HADOOP-2655. Copy on write for data and metadata files in the 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Thu Apr  3 20:31:29 2008
@@ -70,6 +70,7 @@
   private short defaultReplication;
   private SocketFactory socketFactory;
   private int socketTimeout;
+  private FileSystem.Statistics stats;
     
   /**
    * A map from name -> DFSOutputStream of files that are currently being
@@ -158,9 +159,11 @@
   /** 
    * Create a new DFSClient connected to the given namenode server.
    */
-  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf)
+  public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
+                   FileSystem.Statistics stats)
     throws IOException {
     this.conf = conf;
+    this.stats = stats;
     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
                                      FSConstants.READ_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
@@ -186,6 +189,12 @@
     this.leaseChecker.start();
   }
 
+  public DFSClient(InetSocketAddress nameNodeAddr, 
+                   Configuration conf) throws IOException {
+    this(nameNodeAddr, conf, 
+         FileSystem.getStatistics(DistributedFileSystem.class));
+  }
+
   private void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
@@ -326,15 +335,17 @@
   }
 
   public DFSInputStream open(String src) throws IOException {
-    return open(src, conf.getInt("io.file.buffer.size", 4096), true);
+    return open(src, conf.getInt("io.file.buffer.size", 4096), true, null);
   }
+
   /**
    * Create an input stream that obtains a nodelist from the
    * namenode, and then reads from all the right places.  Creates
    * inner subclass of InputStream that does the right out-of-band
    * work.
    */
-  DFSInputStream open(String src, int buffersize, boolean verifyChecksum
+  DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
+                      FileSystem.Statistics stats
       ) throws IOException {
     checkOpen();
     //    Get block info from namenode
@@ -1026,6 +1037,7 @@
     private Block currentBlock = null;
     private long pos = 0;
     private long blockEnd = -1;
+
     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
      * parallel accesses to DFSInputStream (through ptreads) properly */
     private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 
@@ -1039,7 +1051,7 @@
     }
     
     DFSInputStream(String src, int buffersize, boolean verifyChecksum
-        ) throws IOException {
+                   ) throws IOException {
       this.verifyChecksum = verifyChecksum;
       this.buffersize = buffersize;
       this.src = src;
@@ -1309,6 +1321,9 @@
               // got a EOS from reader though we expect more data on it.
               throw new IOException("Unexpected EOS from the reader");
             }
+            if (stats != null && result != -1) {
+              stats.incrementBytesRead(result);
+            }
             return result;
           } catch (ChecksumException ce) {
             throw ce;            
@@ -1456,6 +1471,9 @@
         offset += bytesToRead;
       }
       assert remaining == 0 : "Wrong number of bytes read.";
+      if (stats != null) {
+        stats.incrementBytesRead(realLen);
+      }
       return realLen;
     }
      

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Thu Apr  3
20:31:29 2008
@@ -66,7 +66,8 @@
     if (host == null || port == -1) {
       throw new IOException("Incomplete HDFS URI, no host/port: "+ uri);
     }
-    this.dfs = new DFSClient(new InetSocketAddress(host, port), conf);
+    this.dfs = new DFSClient(new InetSocketAddress(host, port), conf,
+                             statistics);
     this.uri = URI.create("hdfs://"+host+":"+port);
     this.workingDir = getHomeDirectory();
   }
@@ -128,7 +129,7 @@
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     try {
       return new DFSClient.DFSDataInputStream(
-          dfs.open(getPathName(f), bufferSize, verifyChecksum));
+          dfs.open(getPathName(f), bufferSize, verifyChecksum, statistics));
     } catch(RemoteException e) {
       if (IOException.class.getName().equals(e.getClassName()) &&
           e.getMessage().startsWith(
@@ -147,8 +148,10 @@
     int bufferSize, short replication, long blockSize,
     Progressable progress) throws IOException {
 
-    return new FSDataOutputStream(dfs.create(getPathName(f), permission,
-        overwrite, replication, blockSize, progress, bufferSize));
+    return new FSDataOutputStream
+       (dfs.create(getPathName(f), permission,
+                   overwrite, replication, blockSize, progress, bufferSize),
+        statistics);
   }
 
   public boolean setReplication(Path src, 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/ChecksumFileSystem.java Thu Apr  3 20:31:29
2008
@@ -272,7 +272,7 @@
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
     return new FSDataInputStream(
-        new ChecksumFSInputChecker(this, f, bufferSize) );
+        new ChecksumFSInputChecker(this, f, bufferSize));
   }
 
   /**
@@ -358,8 +358,11 @@
     if (parent != null && !mkdirs(parent)) {
       throw new IOException("Mkdirs failed to create " + parent);
     }
-    return new FSDataOutputStream(new ChecksumFSOutputSummer(
-        this, f, overwrite, bufferSize, replication, blockSize, progress));
+    return new FSDataOutputStream
+            (new ChecksumFSOutputSummer
+                (this, f, overwrite, bufferSize, replication, 
+                 blockSize, progress),
+             null);
   }
 
   /**

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FSDataOutputStream.java Thu Apr  3 20:31:29
2008
@@ -26,20 +26,29 @@
   private OutputStream wrappedStream;
 
   private static class PositionCache extends FilterOutputStream {
+    private FileSystem.Statistics statistics;
     long position;
 
-    public PositionCache(OutputStream out) throws IOException {
+    public PositionCache(OutputStream out, 
+                         FileSystem.Statistics stats) throws IOException {
       super(out);
+      statistics = stats;
     }
 
     public void write(int b) throws IOException {
       out.write(b);
       position++;
+      if (statistics != null) {
+        statistics.incrementBytesWritten(1);
+      }
     }
     
     public void write(byte b[], int off, int len) throws IOException {
       out.write(b, off, len);
       position += len;                            // update position
+      if (statistics != null) {
+        statistics.incrementBytesWritten(len);
+      }
     }
       
     public long getPos() throws IOException {
@@ -51,9 +60,14 @@
     }
   }
 
-  public FSDataOutputStream(OutputStream out)
+  @Deprecated
+  public FSDataOutputStream(OutputStream out) throws IOException {
+    this(out, null);
+  }
+
+  public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats)
     throws IOException {
-    super(new PositionCache(out));
+    super(new PositionCache(out, stats));
     wrappedStream = out;
   }
   

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/FileSystem.java Thu Apr  3 20:31:29 2008
@@ -20,6 +20,7 @@
 import java.io.*;
 import java.net.*;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.*;
@@ -57,8 +58,17 @@
 
   /** FileSystem cache */
   private static final Cache CACHE = new Cache();
+  /** Recording statistics per a FileSystem class */
+  private static final Map<Class<? extends FileSystem>, Statistics> 
+    statisticsTable =
+      new IdentityHashMap<Class<? extends FileSystem>, Statistics>();
 
   /**
+   * The statistics for this file system.
+   */
+  protected final Statistics statistics;
+  
+  /**
    * Parse the cmd-line args, starting at i.  Remove consumed args
    * from array.  We expect param in the form:
    * '-local | -dfs <namenode:port>'
@@ -268,6 +278,7 @@
 
   protected FileSystem() {
     super(null);
+    statistics = getStatistics(this.getClass());
   }
 
   /** Check that a Path belongs to this FileSystem. */
@@ -1370,6 +1381,72 @@
       public String toString() {
         return username + "@" + scheme + "://" + authority;        
       }
+    }
+  }
+  
+  public static final class Statistics {
+    private AtomicLong bytesRead = new AtomicLong();
+    private AtomicLong bytesWritten = new AtomicLong();
+    
+    /**
+     * Increment the bytes read in the statistics
+     * @param newBytes the additional bytes read
+     */
+    public void incrementBytesRead(long newBytes) {
+      bytesRead.getAndAdd(newBytes);
+    }
+    
+    /**
+     * Increment the bytes written in the statistics
+     * @param newBytes the additional bytes written
+     */
+    public void incrementBytesWritten(long newBytes) {
+      bytesWritten.getAndAdd(newBytes);
+    }
+    
+    /**
+     * Get the total number of bytes read
+     * @return the number of bytes
+     */
+    public long getBytesRead() {
+      return bytesRead.get();
+    }
+    
+    /**
+     * Get the total number of bytes written
+     * @return the number of bytes
+     */
+    public long getBytesWritten() {
+      return bytesWritten.get();
+    }
+    
+    public String toString() {
+      return bytesRead + " bytes read and " + bytesWritten + 
+             " bytes written";
+    }
+  }
+  
+  /**
+   * Get the statistics for a particular file system
+   * @param cls the class to lookup
+   * @return a statistics object
+   */
+  public static synchronized 
+  Statistics getStatistics(Class<? extends FileSystem> cls) {
+    Statistics result = statisticsTable.get(cls);
+    if (result == null) {
+      result = new Statistics();
+      statisticsTable.put(cls, result);
+    }
+    return result;
+  }
+  
+  public static synchronized
+  void printStatistics() throws IOException {
+    for (Map.Entry<Class<? extends FileSystem>, Statistics> pair: 
+            statisticsTable.entrySet()) {
+      System.out.println("  FileSystem " + pair.getKey().getName() + 
+                         ": " + pair.getValue());
     }
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/InMemoryFileSystem.java Thu Apr  3 20:31:29
2008
@@ -223,7 +223,8 @@
       // map) until close is called on the outputstream that this method is
       // going to return
       // Create an output stream out of data byte array
-      return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr));
+      return new FSDataOutputStream(new InMemoryOutputStream(f, fAttr), 
+                                    statistics);
     }
 
     public void close() throws IOException {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/RawLocalFileSystem.java Thu Apr  3 20:31:29
2008
@@ -65,6 +65,36 @@
     setConf(conf);
   }
   
+  class TrackingFileInputStream extends FileInputStream {
+    public TrackingFileInputStream(File f) throws IOException {
+      super(f);
+    }
+    
+    public int read() throws IOException {
+      int result = super.read();
+      if (result != -1) {
+        statistics.incrementBytesRead(1);
+      }
+      return result;
+    }
+    
+    public int read(byte[] data) throws IOException {
+      int result = super.read(data);
+      if (result != -1) {
+        statistics.incrementBytesRead(result);
+      }
+      return result;
+    }
+    
+    public int read(byte[] data, int offset, int length) throws IOException {
+      int result = super.read(data, offset, length);
+      if (result != -1) {
+        statistics.incrementBytesRead(result);
+      }
+      return result;
+    }
+  }
+
   /*******************************************************
    * For open()'s FSInputStream
    *******************************************************/
@@ -73,7 +103,7 @@
     private long position;
 
     public LocalFSFileInputStream(Path f) throws IOException {
-      this.fis = new FileInputStream(pathToFile(f));
+      this.fis = new TrackingFileInputStream(pathToFile(f));
     }
     
     public void seek(long pos) throws IOException {
@@ -190,7 +220,8 @@
       throw new IOException("Mkdirs failed to create " + parent.toString());
     }
     return new FSDataOutputStream(
-        new BufferedOutputStream(new LocalFSFileOutputStream(f), bufferSize));
+        new BufferedOutputStream(new LocalFSFileOutputStream(f), bufferSize),
+        statistics);
   }
 
   /** {@inheritDoc} */

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSImpl.java Thu Apr  3 20:31:29 2008
@@ -20,23 +20,27 @@
 package org.apache.hadoop.fs.kfs;
 
 import java.io.*;
-import java.net.*;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Progressable;
 
 import org.kosmix.kosmosfs.access.KfsAccess;
 
 class KFSImpl implements IFSImpl {
     private KfsAccess kfsAccess = null;
+    private FileSystem.Statistics statistics;
 
-    public KFSImpl(String metaServerHost, int metaServerPort) throws IOException {
+    @Deprecated
+    public KFSImpl(String metaServerHost, int metaServerPort
+                   ) throws IOException {
+      this(metaServerHost, metaServerPort, null);
+    }
+
+    public KFSImpl(String metaServerHost, int metaServerPort, 
+                   FileSystem.Statistics stats) throws IOException {
         kfsAccess = new KfsAccess(metaServerHost, metaServerPort);
+        statistics = stats;
     }
 
     public boolean exists(String path) throws IOException {
@@ -95,10 +99,12 @@
     }
 
     public FSDataOutputStream create(String path, short replication, int bufferSize) throws
IOException {
-        return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication));
+        return new FSDataOutputStream(new KFSOutputStream(kfsAccess, path, replication),

+                                      statistics);
     }
 
     public FSDataInputStream open(String path, int bufferSize) throws IOException {
-        return new FSDataInputStream(new KFSInputStream(kfsAccess, path));
+        return new FSDataInputStream(new KFSInputStream(kfsAccess, path, 
+                                                        statistics));
     }
-};
+}

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java Thu Apr  3 20:31:29
2008
@@ -21,28 +21,28 @@
 package org.apache.hadoop.fs.kfs;
 
 import java.io.*;
-import java.net.*;
-import java.util.*;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.util.Progressable;
 
 import org.kosmix.kosmosfs.access.KfsAccess;
 import org.kosmix.kosmosfs.access.KfsInputChannel;
 
 class KFSInputStream extends FSInputStream {
 
-    private String path;
     private KfsInputChannel kfsChannel;
-
+    private FileSystem.Statistics statistics;
     private long fsize;
 
+    @Deprecated
     public KFSInputStream(KfsAccess kfsAccess, String path) {
-        this.path = path;
+      this(kfsAccess, path, null);
+    }
 
+    public KFSInputStream(KfsAccess kfsAccess, String path,
+                            FileSystem.Statistics stats) {
+        this.statistics = stats;
         this.kfsChannel = kfsAccess.kfs_open(path);
         if (this.kfsChannel != null)
             this.fsize = kfsAccess.kfs_filesize(path);
@@ -81,8 +81,12 @@
         }
         byte b[] = new byte[4];
         int res = read(b, 0, 4);
-        if (res == 4)
-            return (b[0] + (b[1] << 8) + (b[2] << 16) + (b[3] << 24));
+        if (res == 4) {
+          if (statistics != null) {
+            statistics.incrementBytesRead(1);
+          }
+          return (b[0] + (b[1] << 8) + (b[2] << 16) + (b[3] << 24));  
       
+        }
         return -1;
     }
 
@@ -96,6 +100,9 @@
 	// Use -1 to signify EOF
 	if (res == 0)
 	    return -1;
+	if (statistics != null) {
+	  statistics.incrementBytesRead(res);
+	}
 	return res;
     }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java Thu Apr  3 20:31:29
2008
@@ -63,7 +63,8 @@
         try {
             if (kfsImpl == null) {
                 kfsImpl = new KFSImpl(conf.get("fs.kfs.metaServerHost", ""),
-                                      conf.getInt("fs.kfs.metaServerPort", -1));
+                                      conf.getInt("fs.kfs.metaServerPort", -1),
+                                      statistics);
             }
             this.localFs = FileSystem.getLocal(conf);
             this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3FileSystem.java Thu Apr  3 20:31:29
2008
@@ -203,15 +203,17 @@
         }
       }      
     }
-    return new FSDataOutputStream(
-        new S3OutputStream(getConf(), store, makeAbsolute(file),
-                           blockSize, progress, bufferSize));
+    return new FSDataOutputStream
+        (new S3OutputStream(getConf(), store, makeAbsolute(file),
+                            blockSize, progress, bufferSize),
+         statistics);
   }
 
   @Override
   public FSDataInputStream open(Path path, int bufferSize) throws IOException {
     INode inode = checkFile(path);
-    return new FSDataInputStream(new S3InputStream(getConf(), store, inode));
+    return new FSDataInputStream(new S3InputStream(getConf(), store, inode,
+                                                   statistics));
   }
 
   @Override

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/fs/s3/S3InputStream.java Thu Apr  3 20:31:29
2008
@@ -25,6 +25,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
 
 class S3InputStream extends FSInputStream {
 
@@ -43,11 +44,20 @@
   private DataInputStream blockStream;
 
   private long blockEnd = -1;
+  
+  private FileSystem.Statistics stats;
 
+  @Deprecated
   public S3InputStream(Configuration conf, FileSystemStore store,
                        INode inode) {
+    this(conf, store, inode, null);
+  }
+
+  public S3InputStream(Configuration conf, FileSystemStore store,
+                       INode inode, FileSystem.Statistics stats) {
     
     this.store = store;
+    this.stats = stats;
     this.blocks = inode.getBlocks();
     for (Block block : blocks) {
       this.fileLength += block.getLength();
@@ -93,6 +103,9 @@
         pos++;
       }
     }
+    if (stats != null & result >= 0) {
+      stats.incrementBytesRead(1);
+    }
     return result;
   }
 
@@ -109,6 +122,9 @@
       int result = blockStream.read(buf, off, realLen);
       if (result >= 0) {
         pos += result;
+      }
+      if (stats != null && result > 0) {
+        stats.incrementBytesRead(result);
       }
       return result;
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Thu Apr  3 20:31:29 2008
@@ -22,6 +22,8 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.text.NumberFormat;
 
@@ -29,11 +31,15 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.DistributedFileSystem;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.kfs.KosmosFileSystem;
+import org.apache.hadoop.fs.s3.S3FileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
@@ -59,6 +65,16 @@
     REDUCE_INPUT_RECORDS,
     REDUCE_OUTPUT_RECORDS
   }
+  
+  /**
+   * Counters to measure the usage of the different file systems.
+   */
+  protected static enum FileSystemCounter {
+    LOCAL_READ, LOCAL_WRITE, 
+    HDFS_READ, HDFS_WRITE, 
+    S3_READ, S3_WRITE,
+    KFS_READ, KFSWRITE
+  }
 
   ///////////////////////////////////////////////////////////
   // Helper methods to construct task-output paths
@@ -293,6 +309,7 @@
               
               if (sendProgress) {
                 // we need to send progress update
+                updateCounters();
                 taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
                         counters);
                 taskFound = umbilical.statusUpdate(taskId, taskStatus);
@@ -362,9 +379,81 @@
     setProgressFlag();
   }
 
+  /**
+   * An updater that tracks the last number reported for a given file
+   * system and only creates the counters when they are needed.
+   */
+  class FileSystemStatisticUpdater {
+    private long prevReadBytes = 0;
+    private long prevWriteBytes = 0;
+    private FileSystem.Statistics stats;
+    private Counters.Counter readCounter = null;
+    private Counters.Counter writeCounter = null;
+    private FileSystemCounter read;
+    private FileSystemCounter write;
+
+    FileSystemStatisticUpdater(FileSystemCounter read,
+                               FileSystemCounter write,
+                               Class<? extends FileSystem> cls) {
+      stats = FileSystem.getStatistics(cls);
+      this.read = read;
+      this.write = write;
+    }
+
+    void updateCounters() {
+      long newReadBytes = stats.getBytesRead();
+      long newWriteBytes = stats.getBytesWritten();
+      if (prevReadBytes != newReadBytes) {
+        if (readCounter == null) {
+          readCounter = counters.findCounter(read);
+        }
+        readCounter.increment(newReadBytes - prevReadBytes);
+        prevReadBytes = newReadBytes;
+      }
+      if (prevWriteBytes != newWriteBytes) {
+        if (writeCounter == null) {
+          writeCounter = counters.findCounter(write);
+        }
+        writeCounter.increment(newWriteBytes - prevWriteBytes);
+        prevWriteBytes = newWriteBytes;
+      }
+    }
+  }
+  
+  /**
+   * A list of all of the file systems that we want to report on.
+   */
+  private List<FileSystemStatisticUpdater> statisticUpdaters =
+     new ArrayList<FileSystemStatisticUpdater>();
+  {
+    statisticUpdaters.add
+      (new FileSystemStatisticUpdater(FileSystemCounter.LOCAL_READ,
+                                      FileSystemCounter.LOCAL_WRITE,
+                                      RawLocalFileSystem.class));
+    statisticUpdaters.add
+      (new FileSystemStatisticUpdater(FileSystemCounter.HDFS_READ,
+                                      FileSystemCounter.HDFS_WRITE,
+                                      DistributedFileSystem.class));
+    statisticUpdaters.add
+    (new FileSystemStatisticUpdater(FileSystemCounter.KFS_READ,
+                                    FileSystemCounter.KFSWRITE,
+                                    KosmosFileSystem.class));
+    statisticUpdaters.add
+    (new FileSystemStatisticUpdater(FileSystemCounter.S3_READ,
+                                    FileSystemCounter.S3_WRITE,
+                                    S3FileSystem.class));
+  }
+
+  private synchronized void updateCounters() {
+    for(FileSystemStatisticUpdater updater: statisticUpdaters) {
+      updater.updateCounters();
+    }
+  }
+
   public void done(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = 10;
     boolean needProgress = true;
+    updateCounters();
     taskDone.set(true);
     while (true) {
       try {

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task_FileSystemCounter.properties
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task_FileSystemCounter.properties?rev=644599&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task_FileSystemCounter.properties
(added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task_FileSystemCounter.properties
Thu Apr  3 20:31:29 2008
@@ -0,0 +1,12 @@
+# ResourceBundle properties file for Map-Reduce counters
+
+CounterGroupName=              File Systems
+
+LOCAL_READ.name=               Local bytes read
+LOCAL_WRITE.name=              Local bytes written
+HDFS_READ.name=                HDFS bytes read
+HDFS_WRITE.name=               HDFS bytes written
+S3_READ.name=                  S3 bytes read
+S3_WRITE.name=                 S3 bytes written
+KFS_READ.name=                 KFS bytes read
+KFS_WRITE.name=                KFS bytes written

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=644599&r1=644598&r2=644599&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Thu Apr  3
20:31:29 2008
@@ -178,10 +178,8 @@
     TestResult result;
     final Path inDir = new Path("./wc/input");
     final Path outDir = new Path("./wc/output");
-    result = launchWordCount(jobConf, inDir, outDir,
-                             "The quick brown fox\nhas many silly\n" + 
-                             "red fox sox\n",
-                             3, 1);
+    String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+    result = launchWordCount(jobConf, inDir, outDir, input, 3, 1);
     assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
                  "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
     String jobid = result.job.getJobID();
@@ -189,8 +187,17 @@
     checkTaskDirectories(mr, new String[]{jobid}, new String[]{taskid});
     // test with maps=0
     jobConf = mr.createJobConf();
-    result = launchWordCount(jobConf, inDir, outDir, "owen is oom", 0, 1);
+    input = "owen is oom";
+    result = launchWordCount(jobConf, inDir, outDir, input, 0, 1);
     assertEquals("is\t1\noom\t1\nowen\t1\n", result.output);
+    Counters counters = result.job.getCounters();
+    long hdfsRead = 
+      counters.findCounter(Task.FileSystemCounter.HDFS_READ).getCounter();
+    long hdfsWrite = 
+      counters.findCounter(Task.FileSystemCounter.HDFS_WRITE).getCounter();
+    assertEquals(result.output.length(), hdfsWrite);
+    assertEquals(input.length(), hdfsRead);
+
     // Run a job with input and output going to localfs even though the 
     // default fs is hdfs.
     {
@@ -207,6 +214,7 @@
       assertEquals("all\t1\nbase\t1\nbelong\t1\nto\t1\nus\t1\nyour\t1\n", 
                    result.output);
       assertTrue("outputs on localfs", localfs.exists(localOut));
+
     }
   }
 



Mime
View raw message