hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r677872 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Fri, 18 Jul 2008 10:27:38 GMT
Author: tomwhite
Date: Fri Jul 18 03:27:37 2008
New Revision: 677872

URL: http://svn.apache.org/viewvc?rev=677872&view=rev
Log:
HADOOP-372.  Add support for multiple input paths with a different InputFormat and Mapper
for each path.  Contributed by Chris Smith.

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=677872&r1=677871&r2=677872&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Jul 18 03:27:37 2008
@@ -29,6 +29,9 @@
 
     HADOOP-2325.  Require Java 6. (cutting)
 
+    HADOOP-372.  Add support for multiple input paths with a different
+    InputFormat and Mapper for each path.  (Chris Smith via tomwhite)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java?rev=677872&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingInputFormat.java Fri Jul
18 03:27:37 2008
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputFormat} that delegates behaviour of paths to multiple other
+ * InputFormats.
+ * 
+ * @see FileInputFormat#addInputPath(JobConf, Path, Class, Class)
+ */
+public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
+
+  @Deprecated
+  public void validateInput(JobConf conf) throws IOException {
+    JobConf confCopy = new JobConf(conf);
+    Map<Path, InputFormat> formatMap = FileInputFormat.getInputFormatMap(conf);
+    for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+      Path path = entry.getKey();
+      InputFormat format = entry.getValue();
+      FileInputFormat.setInputPaths(confCopy, path);
+      format.validateInput(confCopy);
+    }
+  }
+
+  public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
+
+    JobConf confCopy = new JobConf(conf);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    Map<Path, InputFormat> formatMap = FileInputFormat.getInputFormatMap(conf);
+    Map<Path, Class<? extends Mapper>> mapperMap = FileInputFormat
+       .getMapperTypeMap(conf);
+    Map<Class<? extends InputFormat>, List<Path>> formatPaths
+        = new HashMap<Class<? extends InputFormat>, List<Path>>();
+
+    // First, build a map of InputFormats to Paths
+    for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+      if (!formatPaths.containsKey(entry.getValue().getClass())) {
+       formatPaths.put(entry.getValue().getClass(), new LinkedList<Path>());
+      }
+
+      formatPaths.get(entry.getValue().getClass()).add(entry.getKey());
+    }
+
+    for (Entry<Class<? extends InputFormat>, List<Path>> formatEntry :

+        formatPaths.entrySet()) {
+      Class<? extends InputFormat> formatClass = formatEntry.getKey();
+      InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+         formatClass, conf);
+      List<Path> paths = formatEntry.getValue();
+
+      Map<Class<? extends Mapper>, List<Path>> mapperPaths
+          = new HashMap<Class<? extends Mapper>, List<Path>>();
+
+      // Now, for each set of paths that have a common InputFormat, build
+      // a map of Mappers to the paths they're used for
+      for (Path path : paths) {
+       Class<? extends Mapper> mapperClass = mapperMap.get(path);
+       if (!mapperPaths.containsKey(mapperClass)) {
+         mapperPaths.put(mapperClass, new LinkedList<Path>());
+       }
+
+       mapperPaths.get(mapperClass).add(path);
+      }
+
+      // Now each set of paths that has a common InputFormat and Mapper can
+      // be added to the same job, and split together.
+      for (Entry<Class<? extends Mapper>, List<Path>> mapEntry : mapperPaths
+         .entrySet()) {
+       paths = mapEntry.getValue();
+       Class<? extends Mapper> mapperClass = mapEntry.getKey();
+
+       if (mapperClass == null) {
+         mapperClass = conf.getMapperClass();
+       }
+
+       FileInputFormat.setInputPaths(confCopy, paths.toArray(new Path[paths
+           .size()]));
+
+       // Get splits for each input path and tag with InputFormat
+       // and Mapper types by wrapping in a TaggedInputSplit.
+       InputSplit[] pathSplits = format.getSplits(confCopy, numSplits);
+       for (InputSplit pathSplit : pathSplits) {
+         splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+             mapperClass));
+       }
+      }
+    }
+
+    return splits.toArray(new InputSplit[splits.size()]);
+  }
+
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, V> getRecordReader(InputSplit split, JobConf conf,
+      Reporter reporter) throws IOException {
+
+    // Find the InputFormat and then the RecordReader from the
+    // TaggedInputSplit.
+
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+       .newInstance(taggedInputSplit.getInputFormatClass(), conf);
+    return inputFormat.getRecordReader(taggedInputSplit.getInputSplit(), conf,
+       reporter);
+  }
+}

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java?rev=677872&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/DelegatingMapper.java Fri Jul 18
03:27:37 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link Mapper} that delegates behaviour of paths to multiple other
+ * mappers.
+ * 
+ * @see FileInputFormat#addInputPath(JobConf, Path, Class, Class)
+ */
+public class DelegatingMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>
{
+
+  private JobConf conf;
+
+  private Mapper<K1, V1, K2, V2> mapper;
+
+  @SuppressWarnings("unchecked")
+  public void map(K1 key, V1 value, OutputCollector<K2, V2> outputCollector,
+      Reporter reporter) throws IOException {
+
+    if (mapper == null) {
+      // Find the Mapper from the TaggedInputSplit.
+      TaggedInputSplit inputSplit = (TaggedInputSplit) reporter.getInputSplit();
+      mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
+         .getMapperClass(), conf);
+    }
+    mapper.map(key, value, outputCollector, reporter);
+  }
+
+  public void configure(JobConf conf) {
+    this.conf = conf;
+  }
+
+  public void close() throws IOException {
+    if (mapper != null) {
+      mapper.close();
+    }
+  }
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java?rev=677872&r1=677871&r2=677872&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/FileInputFormat.java Fri Jul 18
03:27:37 2008
@@ -21,7 +21,10 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -398,8 +401,105 @@
     conf.set("mapred.input.dir", dirs == null ? dirStr :
       dirs + StringUtils.COMMA_STR + dirStr);
   }
+  
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} to the list of
+   * inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   */
+  public static void addInputPath(JobConf conf, Path path,
+      Class<? extends InputFormat> inputFormatClass) {
+
+    String inputFormatMapping = path.toString() + ";"
+       + inputFormatClass.getName();
+    String inputFormats = conf.get("mapred.input.dir.formats");
+    conf.set("mapred.input.dir.formats",
+       inputFormats == null ? inputFormatMapping : inputFormats + ","
+           + inputFormatMapping);
+
+    conf.setInputFormat(DelegatingInputFormat.class);
+  }
+
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} and
+   * {@link Mapper} to the list of inputs for the map-reduce job.
+   * 
+   * @param conf The configuration of the job
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   * @param mapperClass {@link Mapper} class to use for this path
+   */
+  public static void addInputPath(JobConf conf, Path path,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+
+    addInputPath(conf, path, inputFormatClass);
+
+    String mapperMapping = path.toString() + ";" + mapperClass.getName();
+    String mappers = conf.get("mapred.input.dir.mappers");
+    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+       : mappers + "," + mapperMapping);
+
+    conf.setMapperClass(DelegatingMapper.class);
+  }
 
-  // This method escapes commas in the glob pattern of the given paths. 
+  /**
+   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
+   * that should be used for them.
+   * 
+   * @param conf The confuration of the job
+   * @see #addInputPath(JobConf, Path, Class)
+   * @return A map of paths to inputformats for the job
+   */
+  static Map<Path, InputFormat> getInputFormatMap(JobConf conf) {
+    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
+    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      InputFormat inputFormat;
+      try {
+       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
+           .getClassByName(split[1]), conf);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), inputFormat);
+    }
+    return m;
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
+   * should be used for them.
+   * 
+   * @param conf The confuration of the job
+   * @see #addInputPath(JobConf, Path, Class, Class)
+   * @return A map of paths to mappers for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, Class<? extends Mapper>> getMapperTypeMap(JobConf conf) {
+    if (conf.get("mapred.input.dir.mappers") == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Class<? extends Mapper>> m = new HashMap<Path, Class<? extends
Mapper>>();
+    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      Class<? extends Mapper> mapClass;
+      try {
+       mapClass = (Class<? extends Mapper>) conf.getClassByName(split[1]);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), mapClass);
+    }
+    return m;
+  }
+         
+  // This method escapes commas in the glob pattern of the given paths.
   private static String[] getPathStrings(String commaSeparatedPaths) {
     int length = commaSeparatedPaths.length();
     int curlyOpen = 0;

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java?rev=677872&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaggedInputSplit.java Fri Jul 18
03:27:37 2008
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputSplit} that tags another InputSplit with extra data for use by
+ * {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ */
+public class TaggedInputSplit implements Configurable, InputSplit {
+
+  private Class<? extends InputSplit> inputSplitClass;
+
+  private InputSplit inputSplit;
+
+  private Class<? extends InputFormat> inputFormatClass;
+
+  private Class<? extends Mapper> mapperClass;
+
+  private Configuration conf;
+
+  public TaggedInputSplit() {
+    // Default constructor.
+  }
+
+  /**
+   * Creates a new TaggedInputSplit.
+   * 
+   * @param inputSplit The InputSplit to be tagged
+   * @param conf The configuration to use
+   * @param inputFormatClass The InputFormat class to use for this job
+   * @param mapperClass The Mapper class to use for this job
+   */
+  public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+    this.inputSplitClass = inputSplit.getClass();
+    this.inputSplit = inputSplit;
+    this.conf = conf;
+    this.inputFormatClass = inputFormatClass;
+    this.mapperClass = mapperClass;
+  }
+
+  /**
+   * Retrieves the original InputSplit.
+   * 
+   * @return The InputSplit that was tagged
+   */
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  /**
+   * Retrieves the InputFormat class to use for this split.
+   * 
+   * @return The InputFormat class to use
+   */
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  /**
+   * Retrieves the Mapper class to use for this split.
+   * 
+   * @return The Mapper class to use
+   */
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public long getLength() throws IOException {
+    return inputSplit.getLength();
+  }
+
+  public String[] getLocations() throws IOException {
+    return inputSplit.getLocations();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+       .newInstance(inputSplitClass, conf);
+    inputSplit.readFields(in);
+    inputFormatClass = (Class<? extends InputFormat>) readClass(in);
+    mapperClass = (Class<? extends Mapper>) readClass(in);
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, inputSplitClass.getName());
+    inputSplit.write(out);
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, mapperClass.getName());
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java?rev=677872&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestDelegatingInputFormat.java Fri
Jul 18 03:27:37 2008
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import junit.framework.TestCase;
+
+public class TestDelegatingInputFormat extends TestCase {
+
+  public void testSplitting() throws Exception {
+    JobConf conf = new JobConf();
+    conf.set("fs.hdfs.impl",
+       "org.apache.hadoop.hdfs.ChecksumDistributedFileSystem");
+    MiniDFSCluster dfs = null;
+    try {
+      dfs = new MiniDFSCluster(conf, 4, true, new String[] { "/rack0",
+         "/rack0", "/rack1", "/rack1" }, new String[] { "host0", "host1",
+         "host2", "host3" });
+      FileSystem fs = dfs.getFileSystem();
+
+      Path path = getPath("/foo/bar", fs);
+      Path path2 = getPath("/foo/baz", fs);
+      Path path3 = getPath("/bar/bar", fs);
+      Path path4 = getPath("/bar/baz", fs);
+
+      final int numSplits = 100;
+
+      FileInputFormat.addInputPath(conf, path, TextInputFormat.class,
+         MapClass.class);
+      FileInputFormat.addInputPath(conf, path2, TextInputFormat.class,
+         MapClass2.class);
+      FileInputFormat.addInputPath(conf, path3, KeyValueTextInputFormat.class,
+         MapClass.class);
+      FileInputFormat.addInputPath(conf, path4, TextInputFormat.class,
+         MapClass2.class);
+      DelegatingInputFormat inFormat = new DelegatingInputFormat();
+      InputSplit[] splits = inFormat.getSplits(conf, numSplits);
+
+      int[] bins = new int[3];
+      for (InputSplit split : splits) {
+       assertTrue(split instanceof TaggedInputSplit);
+       final TaggedInputSplit tis = (TaggedInputSplit) split;
+       int index = -1;
+
+       if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
+         // path3
+         index = 0;
+       } else if (tis.getMapperClass().equals(MapClass.class)) {
+         // path
+         index = 1;
+       } else {
+         // path2 and path4
+         index = 2;
+       }
+
+       bins[index]++;
+      }
+
+      // Each bin is a unique combination of a Mapper and InputFormat, and
+      // DelegatingInputFormat should split each bin into numSplits splits,
+      // regardless of the number of paths that use that Mapper/InputFormat
+      for (int count : bins) {
+       assertEquals(numSplits, count);
+      }
+
+      assertTrue(true);
+    } finally {
+      if (dfs != null) {
+       dfs.shutdown();
+      }
+    }
+  }
+
+  static Path getPath(final String location, final FileSystem fs)
+      throws IOException {
+    Path path = new Path(location);
+
+    // create a multi-block file on hdfs
+    DataOutputStream out = fs.create(path, true, 4096, (short) 2, 512, null);
+    for (int i = 0; i < 1000; ++i) {
+      out.writeChars("Hello\n");
+    }
+    out.close();
+
+    return path;
+  }
+
+  static class MapClass implements Mapper<String, String, String, String> {
+
+    public void map(String key, String value,
+       OutputCollector<String, String> output, Reporter reporter)
+       throws IOException {
+    }
+
+    public void configure(JobConf job) {
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+
+}

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java?rev=677872&r1=677871&r2=677872&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileInputFormat.java Fri Jul 18
03:27:37 2008
@@ -18,6 +18,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.fs.BlockLocation;
@@ -83,4 +85,52 @@
       }
     }
   }
+  
+  public void testAddInputPathWithFormat() {
+    final JobConf conf = new JobConf();
+    FileInputFormat.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
+    FileInputFormat.addInputPath(conf, new Path("/bar"),
+        KeyValueTextInputFormat.class);
+    final Map<Path, InputFormat> inputs = FileInputFormat
+       .getInputFormatMap(conf);
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+  }
+
+  public void testAddInputPathWithMapper() {
+    final JobConf conf = new JobConf();
+    FileInputFormat.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
+       MapClass.class);
+    FileInputFormat.addInputPath(conf, new Path("/bar"),
+       KeyValueTextInputFormat.class, MapClass2.class);
+    final Map<Path, InputFormat> inputs = FileInputFormat
+       .getInputFormatMap(conf);
+    final Map<Path, Class<? extends Mapper>> maps = FileInputFormat
+       .getMapperTypeMap(conf);
+
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+    assertEquals(MapClass.class, maps.get(new Path("/foo")));
+    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+  }
+
+  static class MapClass implements Mapper<String, String, String, String> {
+
+    public void map(String key, String value,
+       OutputCollector<String, String> output, Reporter reporter)
+       throws IOException {
+    }
+
+    public void configure(JobConf job) {
+    }
+
+    public void close() throws IOException {
+    }
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+
 }



Mime
View raw message