hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r931358 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapreduce/lib/input/
Date Tue, 06 Apr 2010 22:43:32 GMT
Author: cdouglas
Date: Tue Apr  6 22:43:32 2010
New Revision: 931358

URL: http://svn.apache.org/viewvc?rev=931358&view=rev
Log:
MAPREDUCE-1466. Record number of files processed in FileInputFormat in the
Configuration for offline analysis. Contributed by Luke Lu and Arun Murthy

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=931358&r1=931357&r2=931358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Apr  6 22:43:32 2010
@@ -243,6 +243,9 @@ Trunk (unreleased changes)
 
     MAPREDUCE-1656. JobStory should provide queue info. (hong via mahadev)
 
+    MAPREDUCE-1466. Record number of files processed in FileInputFormat in the
+    Configuration for offline analysis. (Luke Lu and Arun Murthy via cdouglas)
+
   OPTIMIZATIONS
 
     MAPREDUCE-270. Fix the tasktracker to optionally send an out-of-band

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=931358&r1=931357&r2=931358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/FileInputFormat.java Tue Apr
 6 22:43:32 2010
@@ -61,6 +61,9 @@ public abstract class FileInputFormat<K,
   public static final Log LOG =
     LogFactory.getLog(FileInputFormat.class);
 
+  public static final String NUM_INPUT_FILES =
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
+
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
   private long minSplitSize = 1;
@@ -246,6 +249,8 @@ public abstract class FileInputFormat<K,
     throws IOException {
     FileStatus[] files = listStatus(job);
     
+    // Save the number of input files for metrics/loadgen
+    job.setLong(NUM_INPUT_FILES, files.length);
     long totalSize = 0;                           // compute total size
     for (FileStatus file: files) {                // check we have valid files
       if (file.isDir()) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java?rev=931358&r1=931357&r2=931358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java
Tue Apr  6 22:43:32 2010
@@ -61,6 +61,8 @@ public abstract class FileInputFormat<K,
     "mapreduce.input.fileinputformat.split.minsize";
   public static final String PATHFILTER_CLASS = 
     "mapreduce.input.pathFilter.class";
+  public static final String NUM_INPUT_FILES =
+    "mapreduce.input.fileinputformat.numinputfiles";
 
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
@@ -253,42 +255,48 @@ public abstract class FileInputFormat<K,
 
   /** 
    * Generate the list of files and make them into FileSplits.
-   */ 
-  public List<InputSplit> getSplits(JobContext job
-                                    ) throws IOException {
+   * @param job the job context
+   * @throws IOException
+   */
+  public List<InputSplit> getSplits(JobContext job) throws IOException {
     long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
     long maxSize = getMaxSplitSize(job);
 
     // generate splits
     List<InputSplit> splits = new ArrayList<InputSplit>();
-    for (FileStatus file: listStatus(job)) {
+    List<FileStatus> files = listStatus(job);
+    for (FileStatus file: files) {
       Path path = file.getPath();
-      FileSystem fs = path.getFileSystem(job.getConfiguration());
       long length = file.getLen();
-      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-      if ((length != 0) && isSplitable(job, path)) { 
-        long blockSize = file.getBlockSize();
-        long splitSize = computeSplitSize(blockSize, minSize, maxSize);
-
-        long bytesRemaining = length;
-        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
-          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
-          splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
-                                   blkLocations[blkIndex].getHosts()));
-          bytesRemaining -= splitSize;
-        }
-        
-        if (bytesRemaining != 0) {
-          splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, 
-                     blkLocations[blkLocations.length-1].getHosts()));
+      if (length != 0) {
+        FileSystem fs = path.getFileSystem(job.getConfiguration());
+        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+        if (isSplitable(job, path)) {
+          long blockSize = file.getBlockSize();
+          long splitSize = computeSplitSize(blockSize, minSize, maxSize);
+
+          long bytesRemaining = length;
+          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
+                                     blkLocations[blkIndex].getHosts()));
+            bytesRemaining -= splitSize;
+          }
+
+          if (bytesRemaining != 0) {
+            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
+                       blkLocations[blkLocations.length-1].getHosts()));
+          }
+        } else { // not splitable
+          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
         }
-      } else if (length != 0) {
-        splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
       } else { 
         //Create empty hosts array for zero length files
         splits.add(makeSplit(path, 0, length, new String[0]));
       }
     }
+    // Save the number of input files for metrics/loadgen
+    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
     LOG.debug("Total # of splits: " + splits.size());
     return splits;
   }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=931358&r1=931357&r2=931358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestFileInputFormat.java
Tue Apr  6 22:43:32 2010
@@ -32,68 +32,99 @@ import org.apache.hadoop.hdfs.DFSTestUti
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
 
-import com.sun.org.apache.commons.logging.Log;
-import com.sun.org.apache.commons.logging.LogFactory;
-
 public class TestFileInputFormat extends TestCase {
 
+  Configuration conf = new Configuration();
+  MiniDFSCluster dfs = null;
+
+  private MiniDFSCluster newDFSCluster(JobConf conf) throws Exception {
+    return new MiniDFSCluster(conf, 4, true,
+                         new String[]{"/rack0", "/rack0",
+                                      "/rack1", "/rack1"},
+                         new String[]{"host0", "host1",
+                                      "host2", "host3"});
+  }
+
   public void testLocality() throws Exception {
-    JobConf conf = new JobConf();
-    MiniDFSCluster dfs = null;
-    try {
-      dfs = new MiniDFSCluster(conf, 4, true,
-                               new String[]{"/rack0", "/rack0", 
-                                             "/rack1", "/rack1"},
-                               new String[]{"host0", "host1", 
-                                            "host2", "host3"});
-      FileSystem fs = dfs.getFileSystem();
-      System.out.println("FileSystem " + fs.getUri());
-      Path path = new Path("/foo/bar");
-      // create a multi-block file on hdfs
-      DataOutputStream out = fs.create(path, true, 4096, 
-                                       (short) 2, 512, null);
-      for(int i=0; i < 1000; ++i) {
-        out.writeChars("Hello\n");
-      }
-      out.close();
-      System.out.println("Wrote file");
+    JobConf job = new JobConf(conf);
+    dfs = newDFSCluster(job);
+    FileSystem fs = dfs.getFileSystem();
+    System.out.println("FileSystem " + fs.getUri());
+
+    Path inputDir = new Path("/foo/");
+    String fileName = "part-0000";
+    createInputs(fs, inputDir, fileName);
+
+    // split it using a file input format
+    TextInputFormat.addInputPath(job, inputDir);
+    TextInputFormat inFormat = new TextInputFormat();
+    inFormat.configure(job);
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+    FileStatus fileStatus = fs.getFileStatus(new Path(inputDir, fileName));
+    BlockLocation[] locations =
+      fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
+    System.out.println("Made splits");
+
+    // make sure that each split is a block and the locations match
+    for(int i=0; i < splits.length; ++i) {
+      FileSplit fileSplit = (FileSplit) splits[i];
+      System.out.println("File split: " + fileSplit);
+      for (String h: fileSplit.getLocations()) {
+        System.out.println("Location: " + h);
+      }
+      System.out.println("Block: " + locations[i]);
+      assertEquals(locations[i].getOffset(), fileSplit.getStart());
+      assertEquals(locations[i].getLength(), fileSplit.getLength());
+      String[] blockLocs = locations[i].getHosts();
+      String[] splitLocs = fileSplit.getLocations();
+      assertEquals(2, blockLocs.length);
+      assertEquals(2, splitLocs.length);
+      assertTrue((blockLocs[0].equals(splitLocs[0]) &&
+                  blockLocs[1].equals(splitLocs[1])) ||
+                 (blockLocs[1].equals(splitLocs[0]) &&
+                  blockLocs[0].equals(splitLocs[1])));
+    }
 
-      // split it using a file input format
-      TextInputFormat.addInputPath(conf, path);
-      TextInputFormat inFormat = new TextInputFormat();
-      inFormat.configure(conf);
-      InputSplit[] splits = inFormat.getSplits(conf, 1);
-      FileStatus fileStatus = fs.getFileStatus(path);
-      BlockLocation[] locations = 
-        fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());
-      System.out.println("Made splits");
-
-      // make sure that each split is a block and the locations match
-      for(int i=0; i < splits.length; ++i) {
-        FileSplit fileSplit = (FileSplit) splits[i];
-        System.out.println("File split: " + fileSplit);
-        for (String h: fileSplit.getLocations()) {
-          System.out.println("Location: " + h);
-        }
-        System.out.println("Block: " + locations[i]);
-        assertEquals(locations[i].getOffset(), fileSplit.getStart());
-        assertEquals(locations[i].getLength(), fileSplit.getLength());
-        String[] blockLocs = locations[i].getHosts();
-        String[] splitLocs = fileSplit.getLocations();
-        assertEquals(2, blockLocs.length);
-        assertEquals(2, splitLocs.length);
-        assertTrue((blockLocs[0].equals(splitLocs[0]) && 
-                    blockLocs[1].equals(splitLocs[1])) ||
-                   (blockLocs[1].equals(splitLocs[0]) &&
-                    blockLocs[0].equals(splitLocs[1])));
-      }
-    } finally {
-      if (dfs != null) {
-        dfs.shutdown();
-      }
+    assertEquals("Expected value of " + FileInputFormat.NUM_INPUT_FILES,
+                 1, job.getLong(FileInputFormat.NUM_INPUT_FILES, 0));
+  }
+
+  private void createInputs(FileSystem fs, Path inDir, String fileName)
+  throws IOException {
+    // create a multi-block file on hdfs
+    DataOutputStream out = fs.create(new Path(inDir, fileName), true, 4096,
+                                     (short) 2, 512, null);
+    for(int i=0; i < 1000; ++i) {
+      out.writeChars("Hello\n");
     }
+    out.close();
+    System.out.println("Wrote file");
   }
 
+  public void testNumInputs() throws Exception {
+    JobConf job = new JobConf(conf);
+    dfs = newDFSCluster(job);
+    FileSystem fs = dfs.getFileSystem();
+    System.out.println("FileSystem " + fs.getUri());
+
+    Path inputDir = new Path("/foo/");
+    final int numFiles = 10;
+    String fileNameBase = "part-0000";
+    for (int i=0; i < numFiles; ++i) {
+      createInputs(fs, inputDir, fileNameBase + String.valueOf(i));
+    }
+    createInputs(fs, inputDir, "_meta");
+    createInputs(fs, inputDir, "_temp");
+
+    // split it using a file input format
+    TextInputFormat.addInputPath(job, inputDir);
+    TextInputFormat inFormat = new TextInputFormat();
+    inFormat.configure(job);
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+
+    assertEquals("Expected value of " + FileInputFormat.NUM_INPUT_FILES,
+                 numFiles, job.getLong(FileInputFormat.NUM_INPUT_FILES, 0));
+  }
   
   final Path root = new Path("/TestFileInputFormat");
   final Path file1 = new Path(root, "file1");
@@ -103,8 +134,6 @@ public class TestFileInputFormat extends
   static final int BLOCKSIZE = 1024;
   static final byte[] databuf = new byte[BLOCKSIZE];
 
-  private static final Log LOG = LogFactory.getLog(TestFileInputFormat.class);
-  
   private static final String rack1[] = new String[] {
     "/r1"
   };
@@ -112,7 +141,6 @@ public class TestFileInputFormat extends
     "host1.rack1.com"
   };
   
-  /** Dummy class to extend CombineFileInputFormat*/
   private class DummyFileInputFormat extends FileInputFormat<Text, Text> {
     @Override
     public RecordReader<Text, Text> getRecordReader(InputSplit split,
@@ -122,50 +150,40 @@ public class TestFileInputFormat extends
   }
 
   public void testMultiLevelInput() throws IOException {
-    String namenode = null;
-    MiniDFSCluster dfs = null;
-    FileSystem fileSys = null;
-    try {
-      JobConf conf = new JobConf();
-      
-      conf.setBoolean("dfs.replication.considerLoad", false);
-      dfs = new MiniDFSCluster(conf, 1, true, rack1, hosts1);
-      dfs.waitActive();
-
-      namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
-                 (dfs.getFileSystem()).getUri().getPort();
-
-      fileSys = dfs.getFileSystem();
-      if (!fileSys.mkdirs(dir1)) {
-        throw new IOException("Mkdirs failed to create " + root.toString());
-      }
-      writeFile(conf, file1, (short)1, 1);
-      writeFile(conf, file2, (short)1, 1);
+    JobConf job = new JobConf(conf);
 
-      // split it using a CombinedFile input format
-      DummyFileInputFormat inFormat = new DummyFileInputFormat();
-      inFormat.setInputPaths(conf, root);
-
-      // By default, we don't allow multi-level/recursive inputs
-      boolean exceptionThrown = false;
-      try {
-        InputSplit[] splits = inFormat.getSplits(conf, 1);
-      } catch (Exception e) {
-        exceptionThrown = true;
-      }
-      assertTrue("Exception should be thrown by default for scanning a "
-          + "directory with directories inside.", exceptionThrown);
+    job.setBoolean("dfs.replication.considerLoad", false);
+    dfs = new MiniDFSCluster(job, 1, true, rack1, hosts1);
+    dfs.waitActive();
+
+    String namenode = (dfs.getFileSystem()).getUri().getHost() + ":" +
+                      (dfs.getFileSystem()).getUri().getPort();
+
+    FileSystem fileSys = dfs.getFileSystem();
+    if (!fileSys.mkdirs(dir1)) {
+      throw new IOException("Mkdirs failed to create " + root.toString());
+    }
+    writeFile(job, file1, (short)1, 1);
+    writeFile(job, file2, (short)1, 1);
 
-      // Enable multi-level/recursive inputs
-      conf.setBoolean("mapred.input.dir.recursive", true);
-      InputSplit[] splits = inFormat.getSplits(conf, 1);
-      assertEquals(splits.length, 2);
-      
-    } finally {
-      if (dfs != null) {
-        dfs.shutdown();
-      }
+    // split it using a CombinedFile input format
+    DummyFileInputFormat inFormat = new DummyFileInputFormat();
+    inFormat.setInputPaths(job, root);
+
+    // By default, we don't allow multi-level/recursive inputs
+    boolean exceptionThrown = false;
+    try {
+      InputSplit[] splits = inFormat.getSplits(job, 1);
+    } catch (Exception e) {
+      exceptionThrown = true;
     }
+    assertTrue("Exception should be thrown by default for scanning a "
+        + "directory with directories inside.", exceptionThrown);
+
+    // Enable multi-level/recursive inputs
+    job.setBoolean("mapred.input.dir.recursive", true);
+    InputSplit[] splits = inFormat.getSplits(job, 1);
+    assertEquals(splits.length, 2);
   }
 
   static void writeFile(Configuration conf, Path name,
@@ -182,5 +200,11 @@ public class TestFileInputFormat extends
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
 
-  
+  @Override
+  public void tearDown() throws Exception {
+    if (dfs != null) {
+      dfs.shutdown();
+      dfs = null;
+    }
+  }
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java?rev=931358&r1=931357&r2=931358&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java
Tue Apr  6 22:43:32 2010
@@ -24,7 +24,11 @@ import java.util.Arrays;
 import org.junit.Test;
 import static org.junit.Assert.*;
 
+import static org.mockito.Mockito.*;
+import static org.apache.hadoop.test.MockitoMaker.*;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
@@ -65,4 +69,15 @@ public class TestFileInputFormat {
     }
   }
 
+  @Test
+  public void testNumInputFiles() throws Exception {
+    Configuration conf = spy(new Configuration());
+    Job job = make(stub(Job.class).returning(conf).from.getConfiguration());
+    FileStatus stat = make(stub(FileStatus.class).returning(0L).from.getLen());
+    TextInputFormat ispy = spy(new TextInputFormat());
+    doReturn(Arrays.asList(stat)).when(ispy).listStatus(job);
+
+    ispy.getSplits(job);
+    verify(conf).setLong(FileInputFormat.NUM_INPUT_FILES, 1);
+  }
 }



Mime
View raw message