hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r550635 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java src/java/org/apache/hadoop/mapred/MultiFileSplit.java src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
Date Mon, 25 Jun 2007 22:59:55 GMT
Author: cutting
Date: Mon Jun 25 15:59:54 2007
New Revision: 550635

URL: http://svn.apache.org/viewvc?view=rev&rev=550635
Log:
HADOOP-1515.  Add MultiFileInputFormat, which packs multiple files per split.  Contributed
by Enis Soztutar.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=550635&r1=550634&r2=550635
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jun 25 15:59:54 2007
@@ -251,6 +251,9 @@
      to file properties to be through a new FileStatus interface.
      (Dhruba Borthakur via cutting)
 
+ 77. HADOOP-1515.  Add MultiFileInputFormat, which can pack multiple,
+     typically small, input files into each split.  (Enis Soztutar via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java?view=auto&rev=550635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java Mon Jun
25 15:59:54 2007
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * An abstract {@link InputFormat} that returns {@link MultiFileSplit}'s
+ * in {@link #getSplits(JobConf, int)} method. Splits are constructed from 
+ * the files under the input paths. Each split returned contains <i>nearly</i>
+ * equal content length. <br>  
+ * Subclasses implement {@link #getRecordReader(InputSplit, JobConf, Reporter)}
+ * to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
+ * @see MultiFileSplit
+ */
+public abstract class MultiFileInputFormat extends FileInputFormat {
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) 
+    throws IOException {
+    
+    MultiFileSplit[] splits = new MultiFileSplit[numSplits];
+    Path[] paths = listPaths(job);
+    long[] lengths = new long[paths.length];
+    long totLength = 0;
+    for(int i=0; i<paths.length; i++) {
+      FileSystem fs = paths[i].getFileSystem(job);
+      lengths[i] = fs.getContentLength(paths[i]);
+      totLength += lengths[i];
+    }
+    float avgLengthPerSplit = ((float)totLength) / numSplits;
+    long cumulativeLength = 0;
+
+    int startIndex = 0;
+
+    for(int i=0; i<numSplits; i++) {
+      int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
+          , startIndex, lengths);
+      Path[] splitPaths = new Path[splitSize];
+      long[] splitLengths = new long[splitSize];
+      System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
+      System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
+      splits[i] = new MultiFileSplit(job, splitPaths, splitLengths);
+      startIndex += splitSize;
+      for(long l: splitLengths) {
+        cumulativeLength += l;
+      }
+    }
+    return splits;
+    
+  }
+
+  private int findSize(int splitIndex, float avgLengthPerSplit
+      , long cumulativeLength , int startIndex, long[] lengths) {
+    
+    if(splitIndex == lengths.length - 1)
+      return lengths.length - startIndex;
+    
+    long goalLength = (long)((splitIndex + 1) * avgLengthPerSplit);
+    int partialLength = 0;
+    // accumulate till just above the goal length;
+    for(int i = startIndex; i < lengths.length; i++) {
+      partialLength += lengths[i];
+      if(partialLength + cumulativeLength >= goalLength) {
+        return i - startIndex + 1;
+      }
+    }
+    return lengths.length - startIndex;
+  }
+  
+  public abstract RecordReader getRecordReader(InputSplit split,
+      JobConf job, Reporter reporter)
+      throws IOException;
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java?view=auto&rev=550635
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MultiFileSplit.java Mon Jun 25 15:59:54
2007
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+
+/**
+ * A sub-collection of input files. Unlike {@link FileSplit}, MultiFileSplit 
+ * class does not represent a split of a file, but a split of input files 
+ * into smaller sets. The atomic unit of split is a file. <br> 
+ * MultiFileSplit can be used to implement {@link RecordReader}'s, with 
+ * reading one record per file.
+ * @see FileSplit
+ * @see MultiFileInputFormat 
+ */
+public class MultiFileSplit implements InputSplit {
+
+  private Path[] paths;
+  private long[] lengths;
+  private long totLength;
+  private JobConf job;
+
+  MultiFileSplit() {}
+  
+  public MultiFileSplit(JobConf job, Path[] files, long[] lengths) {
+    this.job = job;
+    this.lengths = lengths;
+    this.paths = files;
+    this.totLength = 0;
+    for(long length : lengths) {
+      totLength += length;
+    }
+  }
+
+  public long getLength() {
+    return totLength;
+  }
+  
+  /** Returns an array containing the lengths of the files in 
+   * the split*/ 
+  public long[] getLengths() {
+    return lengths;
+  }
+  
+  /** Returns the length of the i<sup>th</sup> Path */
+  public long getLength(int i) {
+    return lengths[i];
+  }
+  
+  /** Returns the number of Paths in the split */
+  public int getNumPaths() {
+    return paths.length;
+  }
+
+  /** Returns the i<sup>th</sup> Path */
+  public Path getPath(int i) {
+    return paths[i];
+  }
+  
+  /** Returns all the Paths in the split */
+  public Path[] getPaths() {
+    return paths;
+  }
+
+  public String[] getLocations() throws IOException {
+    HashSet<String> hostSet = new HashSet<String>();
+    for (Path file : paths) {
+      String[][] hints = FileSystem.get(job)
+      .getFileCacheHints(file, 0, FileSystem.get(job).getLength(file));
+      if (hints != null && hints.length > 0) {
+        addToSet(hostSet, hints[0]);
+      }
+    }
+    return hostSet.toArray(new String[hostSet.size()]);
+  }
+
+  private void addToSet(Set<String> set, String[] array) {
+    for(String s:array)
+      set.add(s); 
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int arrLength = in.readInt();
+    lengths = new long[arrLength];
+    for(int i=0; i<arrLength;i++) {
+      lengths[i] = in.readLong();
+    }
+    int filesLength = in.readInt();
+    paths = new Path[filesLength];
+    for(int i=0; i<filesLength;i++) {
+      paths[i] = new Path(Text.readString(in));
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(lengths.length);
+    for(long length : lengths)
+      out.writeLong(length);
+    out.writeInt(paths.length);
+    for(Path p : paths) {
+      Text.writeString(out, p.toString());
+    }
+  }
+}
+

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java?view=auto&rev=550635
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java Mon
Jun 25 15:59:54 2007
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.HashMap;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestMultiFileInputFormat extends TestCase{
+
+  private static JobConf job = new JobConf();
+  private static final Log LOG = LogFactory.getLog(TestMultiFileInputFormat.class);
+  
+  private static final int MAX_SPLIT_COUNT  = 10000;
+  private static final int SPLIT_COUNT_INCR = 6000;
+  private static final int MAX_BYTES = 1024;
+  private static final int MAX_NUM_FILES = 10000;
+  private static final int NUM_FILES_INCR = 8000;
+  
+  private Random rand = new Random(System.currentTimeMillis());
+  private HashMap<String, Long> lengths = new HashMap<String, Long>();
+  
+  /** Dummy class to extend MultiFileInputFormat*/
+  private class DummyMultiFileInputFormat extends MultiFileInputFormat {
+    @Override
+    public RecordReader getRecordReader(InputSplit split, JobConf job
+        , Reporter reporter) throws IOException {
+      return null;
+    }
+  }
+  
+  private Path initFiles(FileSystem fs, int numFiles) throws IOException{
+    Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
+    Path multiFileDir = new Path(dir, "test.multifile");
+    fs.delete(multiFileDir);
+    fs.mkdirs(multiFileDir);
+    LOG.info("Creating " + numFiles + " file(s) in " + multiFileDir);
+    for(int i=0; i<numFiles ;i++) {
+      Path path = new Path(multiFileDir, "file_" + i);
+       FSDataOutputStream out = fs.create(path);
+       int numBytes = rand.nextInt(MAX_BYTES);
+       for(int j=0; j< numBytes; j++) {
+         out.write(rand.nextInt());
+       }
+       out.close();
+       if(LOG.isDebugEnabled()) {
+         LOG.debug("Created file " + path + " with length " + numBytes);
+       }
+       lengths.put(path.getName(), new Long(numBytes));
+    }
+    job.setInputPath(multiFileDir);
+    return multiFileDir;
+  }
+  
+  public void testFormat() throws IOException {
+    if(LOG.isInfoEnabled()) {
+      LOG.info("Test started");
+      LOG.info("Max split count           = " + MAX_SPLIT_COUNT);
+      LOG.info("Split count increment     = " + SPLIT_COUNT_INCR);
+      LOG.info("Max bytes per file        = " + MAX_BYTES);
+      LOG.info("Max number of files       = " + MAX_NUM_FILES);
+      LOG.info("Number of files increment = " + NUM_FILES_INCR);
+    }
+    
+    MultiFileInputFormat format = new DummyMultiFileInputFormat();
+    FileSystem fs = FileSystem.getLocal(job);
+    
+    for(int numFiles = 1; numFiles< MAX_NUM_FILES ; 
+      numFiles+= (NUM_FILES_INCR / 2) + rand.nextInt(NUM_FILES_INCR / 2)) {
+      
+      Path dir = initFiles(fs, numFiles);
+      BitSet bits = new BitSet(numFiles);
+      for(int i=1;i< MAX_SPLIT_COUNT ;i+= rand.nextInt(SPLIT_COUNT_INCR) + 1) {
+        LOG.info("Running for Num Files=" + numFiles + ", split count=" + i);
+        
+        MultiFileSplit[] splits = (MultiFileSplit[])format.getSplits(job, i);
+        bits.clear();
+        
+        for(MultiFileSplit split : splits) {
+          long splitLength = 0;
+          for(Path p : split.getPaths()) {
+            long length = fs.getContentLength(p);
+            assertEquals(length, lengths.get(p.getName()).longValue());
+            splitLength += length;
+            String name = p.getName();
+            int index = Integer.parseInt(
+                name.substring(name.lastIndexOf("file_") + 5));
+            assertFalse(bits.get(index));
+            bits.set(index);
+          }
+          assertEquals(splitLength, split.getLength());
+        }
+      }
+      assertEquals(bits.cardinality(), numFiles);
+      fs.delete(dir);
+    }
+    LOG.info("Test Finished");
+  }
+  
+  public static void main(String[] args) throws Exception{
+    TestMultiFileInputFormat test = new TestMultiFileInputFormat();
+    test.testFormat();
+  }
+}



Mime
View raw message