hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r656585 - in /hadoop/core/trunk: CHANGES.txt conf/hadoop-default.xml src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
Date Thu, 15 May 2008 11:31:20 GMT
Author: ddas
Date: Thu May 15 04:31:20 2008
New Revision: 656585

URL: http://svn.apache.org/viewvc?rev=656585&view=rev
Log:
HADOOP-3221. Adds org.apache.hadoop.mapred.lib.NLineInputFormat, which splits files into splits
each of N lines. N can be specified by configuration property mapred.line.input.format.linespermap,
which defaults to 1. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/conf/hadoop-default.xml

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=656585&r1=656584&r2=656585&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu May 15 04:31:20 2008
@@ -86,6 +86,11 @@
     be corrupt, retain all copies and mark the block as corrupt.
     (Lohit Vjayarenu via rangadi)
 
+    HADOOP-3221. Adds org.apache.hadoop.mapred.lib.NLineInputFormat, which 
+    splits files into splits each of N lines. N can be specified by 
+    configuration property "mapred.line.input.format.linespermap", which
+    defaults to 1. (Amareshwari Sriramadasu via ddas) 
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=656585&r1=656584&r2=656585&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Thu May 15 04:31:20 2008
@@ -1057,6 +1057,13 @@
     </description>
   </property>
 
+  <property>
+    <name>mapred.line.input.format.linespermap</name>
+    <value>1</value>
+    <description> Number of lines per split in NLineInputFormat.
+    </description>
+  </property>
+
 <!-- ipc properties -->
 
 <property>

Added: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java?rev=656585&view=auto
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java (added)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/lib/NLineInputFormat.java Thu May
15 04:31:20 2008
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.LineRecordReader.LineReader;
+
+/**
+ * NLineInputFormat which splits N lines of input as one split.
+ *
+ * In many "pleasantly" parallel applications, each process/mapper 
+ * processes the same input file (s), but with computations are 
+ * controlled by different parameters.(Referred to as "parameter sweeps").
+ * One way to achieve this, is to specify a set of parameters 
+ * (one set per line) as input in a control file 
+ * (which is the input path to the map-reduce application,
+ * where as the input dataset is specified 
+ * via a config variable in JobConf.).
+ * 
+ * The NLineInputFormat can be used in such applications, that splits 
+ * the input file such that by default, one line is fed as
+ * a value to one map task, and key is the offset.
+ * i.e. (k,v) is (LongWritable, Text).
+ * The location hints will span the whole mapred cluster.
+ */
+
+public class NLineInputFormat extends FileInputFormat<LongWritable, Text> 
+                              implements JobConfigurable { 
+  private int N = 1;
+
+  public RecordReader<LongWritable, Text> getRecordReader(
+                                            InputSplit genericSplit,
+                                            JobConf job,
+                                            Reporter reporter) 
+  throws IOException {
+    reporter.setStatus(genericSplit.toString());
+    return new LineRecordReader(job, (FileSplit) genericSplit);
+  }
+
+  /** 
+   * Logically splits the set of input files for the job, splits N lines
+   * of the input as one split.
+   * 
+   * @see org.apache.hadoop.mapred.FileInputFormat#getSplits(JobConf, int)
+   */
+  public InputSplit[] getSplits(JobConf job, int numSplits)
+  throws IOException {
+    ArrayList<FileSplit> splits = new ArrayList<FileSplit>();
+    Path[] files = listPaths(job);
+    for (int i=0; i < files.length; i++) {
+      Path fileName = files[i];
+      FileSystem  fs = fileName.getFileSystem(job);
+      FileStatus status = fs.getFileStatus(fileName);
+      if (status.isDir() || !fs.exists(fileName)) {
+        throw new IOException("Not a file: " + fileName);
+      }
+      LineReader lr = null;
+      try {
+        FSDataInputStream in  = fs.open(fileName);
+        lr = new LineReader(in, job);
+        Text line = new Text();
+        int numLines = 0;
+        long begin = 0;
+        long length = 0;
+        int num = -1;
+        while ((num = lr.readLine(line)) > 0) {
+          numLines++;
+          length += num;
+          if (numLines == N) {
+            splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+            begin += length;
+            length = 0;
+            numLines = 0;
+          }
+        }
+        if (numLines != 0) {
+          splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+        }
+   
+      } finally {
+        if (lr != null) {
+          lr.close();
+        }
+      }
+    }
+    return splits.toArray(new FileSplit[splits.size()]);
+  }
+
+  public void configure(JobConf conf) {
+    N = conf.getInt("mapred.line.input.format.linespermap", 1);
+  }
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java?rev=656585&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java Thu May
15 04:31:20 2008
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred.lib;
+
+import java.io.*;
+import java.util.*;
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+public class TestLineInputFormat extends TestCase {
+  private static int MAX_LENGTH = 200;
+  
+  private static JobConf defaultConf = new JobConf();
+  private static FileSystem localFs = null; 
+
+  static {
+    try {
+      localFs = FileSystem.getLocal(defaultConf);
+    } catch (IOException e) {
+      throw new RuntimeException("init failure", e);
+    }
+  }
+
+  private static Path workDir = 
+    new Path(new Path(System.getProperty("test.build.data", "."), "data"),
+             "TestLineInputFormat");
+  
+  public void testFormat() throws Exception {
+    JobConf job = new JobConf();
+    Path file = new Path(workDir, "test.txt");
+
+    int seed = new Random().nextInt();
+    Random random = new Random(seed);
+
+    localFs.delete(workDir, true);
+    FileInputFormat.setInputPaths(job, workDir);
+    int numLinesPerMap = 5;
+    job.setInt("mapred.line.input.format.linespermap", numLinesPerMap);
+
+    // for a variety of lengths
+    for (int length = 0; length < MAX_LENGTH;
+         length += random.nextInt(MAX_LENGTH/10) + 1) {
+      // create a file with length entries
+      Writer writer = new OutputStreamWriter(localFs.create(file));
+      try {
+        for (int i = 0; i < length; i++) {
+          writer.write(Integer.toString(i));
+          writer.write("\n");
+        }
+      } finally {
+        writer.close();
+      }
+      checkFormat(job, numLinesPerMap);
+    }
+  }
+
+  // A reporter that does nothing
+  private static final Reporter voidReporter = Reporter.NULL;
+  
+  void checkFormat(JobConf job, int expectedN) throws IOException{
+    NLineInputFormat format = new NLineInputFormat();
+    format.configure(job);
+    int ignoredNumSplits = 1;
+    InputSplit[] splits = format.getSplits(job, ignoredNumSplits);
+
+    // check all splits except last one
+    int count = 0;
+    for (int j = 0; j < splits.length -1; j++) {
+      assertEquals("There are no split locations", 0,
+                   splits[j].getLocations().length);
+      RecordReader<LongWritable, Text> reader =
+        format.getRecordReader(splits[j], job, voidReporter);
+      Class readerClass = reader.getClass();
+      assertEquals("reader class is LineRecordReader.",
+                   LineRecordReader.class, readerClass);        
+      LongWritable key = reader.createKey();
+      Class keyClass = key.getClass();
+      assertEquals("Key class is LongWritable.", LongWritable.class, keyClass);
+      Text value = reader.createValue();
+      Class valueClass = value.getClass();
+      assertEquals("Value class is Text.", Text.class, valueClass);
+         
+      try {
+        count = 0;
+        while (reader.next(key, value)) {
+          count++;
+        }
+      } finally {
+        reader.close();
+      }
+      assertEquals("number of lines in split is " + expectedN ,
+                   expectedN, count);
+    }
+  }
+  
+  public static void main(String[] args) throws Exception {
+    new TestLineInputFormat().testFormat();
+  }
+}



Mime
View raw message