hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r798773 - in /hadoop/mapreduce/trunk: ./ src/java/org/apache/hadoop/mapred/lib/ src/java/org/apache/hadoop/mapreduce/lib/input/ src/test/mapred/org/apache/hadoop/mapreduce/lib/input/
Date Wed, 29 Jul 2009 04:02:16 GMT
Author: sharad
Date: Wed Jul 29 04:02:15 2009
New Revision: 798773

URL: http://svn.apache.org/viewvc?rev=798773&view=rev
Log:
MAPREDUCE-369. Change org.apache.hadoop.mapred.lib.MultipleInputs to use new mapreduce API.
Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingInputFormat.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingMapper.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=798773&r1=798772&r2=798773&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Jul 29 04:02:15 2009
@@ -144,6 +144,9 @@
     MAPREDUCE-782. Use PureJavaCrc32 in SpillRecord.  (Todd Lipcon via
     szetszwo)
 
+    MAPREDUCE-369. Change org.apache.hadoop.mapred.lib.MultipleInputs to 
+    use new api. (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
     MAPREDUCE-703. Sqoop requires dependency on hsqldb in ivy.
     (Aaron Kimball via matei)

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingInputFormat.java?rev=798773&r1=798772&r2=798773&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingInputFormat.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingInputFormat.java
Wed Jul 29 04:02:15 2009
@@ -41,7 +41,10 @@
  * InputFormats.
  * 
  * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.DelegatingInputFormat} instead
  */
+@Deprecated
 public class DelegatingInputFormat<K, V> implements InputFormat<K, V> {
 
   public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingMapper.java?rev=798773&r1=798772&r2=798773&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingMapper.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/DelegatingMapper.java Wed
Jul 29 04:02:15 2009
@@ -32,7 +32,10 @@
  * mappers.
  * 
  * @see MultipleInputs#addInputPath(JobConf, Path, Class, Class)
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.DelegatingMapper} instead
  */
+@Deprecated
 public class DelegatingMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>
{
 
   private JobConf conf;

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java?rev=798773&r1=798772&r2=798773&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/MultipleInputs.java Wed Jul
29 04:02:15 2009
@@ -30,7 +30,10 @@
 /**
  * This class supports MapReduce jobs that have multiple input paths with
  * a different {@link InputFormat} and {@link Mapper} for each path 
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.MultipleInputs} instead
  */
+@Deprecated
 public class MultipleInputs {
   /**
    * Add a {@link Path} with a custom {@link InputFormat} to the list of

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java?rev=798773&r1=798772&r2=798773&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java Wed
Jul 29 04:02:15 2009
@@ -31,9 +31,12 @@
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
- * An {@link InputSplit} that tags another InputSplit with extra data for use by
- * {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ * An {@link InputSplit} that tags another InputSplit with extra data for use
+ * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ * @deprecated Use 
+ * {@link org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit} instead
  */
+@Deprecated
 class TaggedInputSplit implements Configurable, InputSplit {
 
   private Class<? extends InputSplit> inputSplitClass;

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java?rev=798773&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingInputFormat.java
Wed Jul 29 04:02:15 2009
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputFormat} that delegates behavior of paths to multiple other
+ * InputFormats.
+ * 
+ * @see MultipleInputs#addInputPath(Job, Path, Class, Class)
+ */
+public class DelegatingInputFormat<K, V> extends InputFormat<K, V> {
+
+  @SuppressWarnings("unchecked")
+  public List<InputSplit> getSplits(JobContext job) 
+      throws IOException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    Job jobCopy =new Job(conf);
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    Map<Path, InputFormat> formatMap = 
+      MultipleInputs.getInputFormatMap(job);
+    Map<Path, Class<? extends Mapper>> mapperMap = MultipleInputs
+       .getMapperTypeMap(job);
+    Map<Class<? extends InputFormat>, List<Path>> formatPaths
+        = new HashMap<Class<? extends InputFormat>, List<Path>>();
+
+    // First, build a map of InputFormats to Paths
+    for (Entry<Path, InputFormat> entry : formatMap.entrySet()) {
+      if (!formatPaths.containsKey(entry.getValue().getClass())) {
+       formatPaths.put(entry.getValue().getClass(), new LinkedList<Path>());
+      }
+
+      formatPaths.get(entry.getValue().getClass()).add(entry.getKey());
+    }
+
+    for (Entry<Class<? extends InputFormat>, List<Path>> formatEntry :

+        formatPaths.entrySet()) {
+      Class<? extends InputFormat> formatClass = formatEntry.getKey();
+      InputFormat format = (InputFormat) ReflectionUtils.newInstance(
+         formatClass, conf);
+      List<Path> paths = formatEntry.getValue();
+
+      Map<Class<? extends Mapper>, List<Path>> mapperPaths
+          = new HashMap<Class<? extends Mapper>, List<Path>>();
+
+      // Now, for each set of paths that have a common InputFormat, build
+      // a map of Mappers to the paths they're used for
+      for (Path path : paths) {
+       Class<? extends Mapper> mapperClass = mapperMap.get(path);
+       if (!mapperPaths.containsKey(mapperClass)) {
+         mapperPaths.put(mapperClass, new LinkedList<Path>());
+       }
+
+       mapperPaths.get(mapperClass).add(path);
+      }
+
+      // Now each set of paths that has a common InputFormat and Mapper can
+      // be added to the same job, and split together.
+      for (Entry<Class<? extends Mapper>, List<Path>> mapEntry :
+          mapperPaths.entrySet()) {
+       paths = mapEntry.getValue();
+       Class<? extends Mapper> mapperClass = mapEntry.getKey();
+
+       if (mapperClass == null) {
+         try {
+           mapperClass = job.getMapperClass();
+         } catch (ClassNotFoundException e) {
+           throw new IOException("Mapper class is not found", e);
+         }
+       }
+
+       FileInputFormat.setInputPaths(jobCopy, paths.toArray(new Path[paths
+           .size()]));
+
+       // Get splits for each input path and tag with InputFormat
+       // and Mapper types by wrapping in a TaggedInputSplit.
+       List<InputSplit> pathSplits = format.getSplits(jobCopy);
+       for (InputSplit pathSplit : pathSplits) {
+         splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
+             mapperClass));
+       }
+      }
+    }
+
+    return splits;
+  }
+
+  @SuppressWarnings("unchecked")
+  public RecordReader<K, V> createRecordReader(InputSplit split,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+
+    // Find the InputFormat and then the RecordReader from the
+    // TaggedInputSplit.
+    TaggedInputSplit taggedInputSplit = (TaggedInputSplit) split;
+    InputFormat<K, V> inputFormat = (InputFormat<K, V>) ReflectionUtils
+      .newInstance(taggedInputSplit.getInputFormatClass(),
+         context.getConfiguration());
+    return inputFormat.createRecordReader(taggedInputSplit.getInputSplit(),
+      context);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java?rev=798773&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/DelegatingMapper.java
Wed Jul 29 04:02:15 2009
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link Mapper} that delegates behavior of paths to multiple other
+ * mappers.
+ * 
+ * @see MultipleInputs#addInputPath(Job, Path, Class, Class)
+ */
+public class DelegatingMapper<K1, V1, K2, V2> extends Mapper<K1, V1, K2, V2>
{
+
+  private Mapper<K1, V1, K2, V2> mapper;
+
+  @SuppressWarnings("unchecked")
+  protected void setup(Context context)
+      throws IOException, InterruptedException {
+    // Find the Mapper from the TaggedInputSplit.
+    TaggedInputSplit inputSplit = (TaggedInputSplit) context.getInputSplit();
+    mapper = (Mapper<K1, V1, K2, V2>) ReflectionUtils.newInstance(inputSplit
+       .getMapperClass(), context.getConfiguration());
+    
+  }
+
+  @SuppressWarnings("unchecked")
+  public void run(Context context) 
+      throws IOException, InterruptedException {
+    setup(context);
+    mapper.run(context);
+    cleanup(context);
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java?rev=798773&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/MultipleInputs.java
Wed Jul 29 04:02:15 2009
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class supports MapReduce jobs that have multiple input paths with
+ * a different {@link InputFormat} and {@link Mapper} for each path 
+ */
+public class MultipleInputs {
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} to the list of
+   * inputs for the map-reduce job.
+   * 
+   * @param job The {@link Job}
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   */
+  @SuppressWarnings("unchecked")
+  public static void addInputPath(Job job, Path path,
+      Class<? extends InputFormat> inputFormatClass) {
+    String inputFormatMapping = path.toString() + ";"
+       + inputFormatClass.getName();
+    Configuration conf = job.getConfiguration();
+    String inputFormats = conf.get("mapred.input.dir.formats");
+    conf.set("mapred.input.dir.formats",
+       inputFormats == null ? inputFormatMapping : inputFormats + ","
+           + inputFormatMapping);
+
+    job.setInputFormatClass(DelegatingInputFormat.class);
+  }
+
+  /**
+   * Add a {@link Path} with a custom {@link InputFormat} and
+   * {@link Mapper} to the list of inputs for the map-reduce job.
+   * 
+   * @param job The {@link Job}
+   * @param path {@link Path} to be added to the list of inputs for the job
+   * @param inputFormatClass {@link InputFormat} class to use for this path
+   * @param mapperClass {@link Mapper} class to use for this path
+   */
+  @SuppressWarnings("unchecked")
+  public static void addInputPath(Job job, Path path,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+
+    addInputPath(job, path, inputFormatClass);
+    Configuration conf = job.getConfiguration();
+    String mapperMapping = path.toString() + ";" + mapperClass.getName();
+    String mappers = conf.get("mapred.input.dir.mappers");
+    conf.set("mapred.input.dir.mappers", mappers == null ? mapperMapping
+       : mappers + "," + mapperMapping);
+
+    job.setMapperClass(DelegatingMapper.class);
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link InputFormat} class
+   * that should be used for them.
+   * 
+   * @param job The {@link JobContext}
+   * @see #addInputPath(JobConf, Path, Class)
+   * @return A map of paths to inputformats for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, InputFormat> getInputFormatMap(JobContext job) {
+    Map<Path, InputFormat> m = new HashMap<Path, InputFormat>();
+    Configuration conf = job.getConfiguration();
+    String[] pathMappings = conf.get("mapred.input.dir.formats").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      InputFormat inputFormat;
+      try {
+       inputFormat = (InputFormat) ReflectionUtils.newInstance(conf
+           .getClassByName(split[1]), conf);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), inputFormat);
+    }
+    return m;
+  }
+
+  /**
+   * Retrieves a map of {@link Path}s to the {@link Mapper} class that
+   * should be used for them.
+   * 
+   * @param job The {@link JobContext}
+   * @see #addInputPath(JobConf, Path, Class, Class)
+   * @return A map of paths to mappers for the job
+   */
+  @SuppressWarnings("unchecked")
+  static Map<Path, Class<? extends Mapper>> 
+      getMapperTypeMap(JobContext job) {
+    Configuration conf = job.getConfiguration();
+    if (conf.get("mapred.input.dir.mappers") == null) {
+      return Collections.emptyMap();
+    }
+    Map<Path, Class<? extends Mapper>> m = 
+      new HashMap<Path, Class<? extends Mapper>>();
+    String[] pathMappings = conf.get("mapred.input.dir.mappers").split(",");
+    for (String pathMapping : pathMappings) {
+      String[] split = pathMapping.split(";");
+      Class<? extends Mapper> mapClass;
+      try {
+       mapClass = 
+         (Class<? extends Mapper>) conf.getClassByName(split[1]);
+      } catch (ClassNotFoundException e) {
+       throw new RuntimeException(e);
+      }
+      m.put(new Path(split[0]), mapClass);
+    }
+    return m;
+  }
+}

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java?rev=798773&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
Wed Jul 29 04:02:15 2009
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.io.serializer.Serializer;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * An {@link InputSplit} that tags another InputSplit with extra data for use
+ * by {@link DelegatingInputFormat}s and {@link DelegatingMapper}s.
+ */
+class TaggedInputSplit extends InputSplit implements Configurable, Writable {
+
+  private Class<? extends InputSplit> inputSplitClass;
+
+  private InputSplit inputSplit;
+
+  @SuppressWarnings("unchecked")
+  private Class<? extends InputFormat> inputFormatClass;
+
+  @SuppressWarnings("unchecked")
+  private Class<? extends Mapper> mapperClass;
+
+  private Configuration conf;
+
+  public TaggedInputSplit() {
+    // Default constructor.
+  }
+
+  /**
+   * Creates a new TaggedInputSplit.
+   * 
+   * @param inputSplit The InputSplit to be tagged
+   * @param conf The configuration to use
+   * @param inputFormatClass The InputFormat class to use for this job
+   * @param mapperClass The Mapper class to use for this job
+   */
+  @SuppressWarnings("unchecked")
+  public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
+      Class<? extends InputFormat> inputFormatClass,
+      Class<? extends Mapper> mapperClass) {
+    this.inputSplitClass = inputSplit.getClass();
+    this.inputSplit = inputSplit;
+    this.conf = conf;
+    this.inputFormatClass = inputFormatClass;
+    this.mapperClass = mapperClass;
+  }
+
+  /**
+   * Retrieves the original InputSplit.
+   * 
+   * @return The InputSplit that was tagged
+   */
+  public InputSplit getInputSplit() {
+    return inputSplit;
+  }
+
+  /**
+   * Retrieves the InputFormat class to use for this split.
+   * 
+   * @return The InputFormat class to use
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return inputFormatClass;
+  }
+
+  /**
+   * Retrieves the Mapper class to use for this split.
+   * 
+   * @return The Mapper class to use
+   */
+  @SuppressWarnings("unchecked")
+  public Class<? extends Mapper> getMapperClass() {
+    return mapperClass;
+  }
+
+  public long getLength() throws IOException, InterruptedException {
+    return inputSplit.getLength();
+  }
+
+  public String[] getLocations() throws IOException, InterruptedException {
+    return inputSplit.getLocations();
+  }
+
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    inputSplitClass = (Class<? extends InputSplit>) readClass(in);
+    inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
+    mapperClass = (Class<? extends Mapper<?, ?, ?, ?>>) readClass(in);
+    inputSplit = (InputSplit) ReflectionUtils
+       .newInstance(inputSplitClass, conf);
+    SerializationFactory factory = new SerializationFactory(conf);
+    Deserializer deserializer = factory.getDeserializer(inputSplitClass);
+    deserializer.open((DataInputStream)in);
+    inputSplit = (InputSplit)deserializer.deserialize(inputSplit);
+    deserializer.close();
+  }
+
+  private Class<?> readClass(DataInput in) throws IOException {
+    String className = Text.readString(in);
+    try {
+      return conf.getClassByName(className);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException("readObject can't find class", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void write(DataOutput out) throws IOException {
+    Text.writeString(out, inputSplitClass.getName());
+    Text.writeString(out, inputFormatClass.getName());
+    Text.writeString(out, mapperClass.getName());
+    SerializationFactory factory = new SerializationFactory(conf);
+    Serializer serializer = 
+          factory.getSerializer(inputSplitClass);
+    serializer.open((DataOutputStream)out);
+    serializer.serialize(inputSplit);
+    serializer.close();
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java?rev=798773&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestDelegatingInputFormat.java
Wed Jul 29 04:02:15 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+public class TestDelegatingInputFormat extends TestCase {
+
+  @SuppressWarnings("unchecked")
+  public void testSplitting() throws Exception {
+    Job job = new Job();
+    MiniDFSCluster dfs = null;
+    try {
+      dfs = new MiniDFSCluster(job.getConfiguration(), 4, true, new String[] { "/rack0",
+         "/rack0", "/rack1", "/rack1" }, new String[] { "host0", "host1",
+         "host2", "host3" });
+      FileSystem fs = dfs.getFileSystem();
+
+      Path path = getPath("/foo/bar", fs);
+      Path path2 = getPath("/foo/baz", fs);
+      Path path3 = getPath("/bar/bar", fs);
+      Path path4 = getPath("/bar/baz", fs);
+
+      final int numSplits = 100;
+
+      FileInputFormat.setMaxInputSplitSize(job, 
+              fs.getFileStatus(path).getLen() / numSplits);
+      MultipleInputs.addInputPath(job, path, TextInputFormat.class,
+         MapClass.class);
+      MultipleInputs.addInputPath(job, path2, TextInputFormat.class,
+         MapClass2.class);
+      MultipleInputs.addInputPath(job, path3, KeyValueTextInputFormat.class,
+         MapClass.class);
+      MultipleInputs.addInputPath(job, path4, TextInputFormat.class,
+         MapClass2.class);
+      DelegatingInputFormat inFormat = new DelegatingInputFormat();
+
+      int[] bins = new int[3];
+      for (InputSplit split : (List<InputSplit>)inFormat.getSplits(job)) {
+       assertTrue(split instanceof TaggedInputSplit);
+       final TaggedInputSplit tis = (TaggedInputSplit) split;
+       int index = -1;
+
+       if (tis.getInputFormatClass().equals(KeyValueTextInputFormat.class)) {
+         // path3
+         index = 0;
+       } else if (tis.getMapperClass().equals(MapClass.class)) {
+         // path
+         index = 1;
+       } else {
+         // path2 and path4
+         index = 2;
+       }
+
+       bins[index]++;
+      }
+
+      assertEquals("count is not equal to num splits", numSplits, bins[0]);
+      assertEquals("count is not equal to num splits", numSplits, bins[1]);
+      assertEquals("count is not equal to 2 * num splits",
+        numSplits * 2, bins[2]);
+    } finally {
+      if (dfs != null) {
+       dfs.shutdown();
+      }
+    }
+  }
+
+  static Path getPath(final String location, final FileSystem fs)
+      throws IOException {
+    Path path = new Path(location);
+
+    // create a multi-block file on hdfs
+    DataOutputStream out = fs.create(path, true, 4096, (short) 2, 512, null);
+    for (int i = 0; i < 1000; ++i) {
+      out.writeChars("Hello\n");
+    }
+    out.close();
+
+    return path;
+  }
+
+  static class MapClass extends Mapper<String, String, String, String> {
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java?rev=798773&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/lib/input/TestMultipleInputs.java
Wed Jul 29 04:02:15 2009
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.IOException;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+
+/**
+ * @see TestDelegatingInputFormat
+ */
+public class TestMultipleInputs extends TestCase {
+  @SuppressWarnings("unchecked")
+  public void testAddInputPathWithFormat() throws IOException {
+    final Job conf = new Job();
+    MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class);
+    MultipleInputs.addInputPath(conf, new Path("/bar"),
+        KeyValueTextInputFormat.class);
+    final Map<Path, InputFormat> inputs = MultipleInputs
+       .getInputFormatMap(conf);
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+  }
+
+  @SuppressWarnings("unchecked")
+  public void testAddInputPathWithMapper() throws IOException {
+    final Job conf = new Job();
+    MultipleInputs.addInputPath(conf, new Path("/foo"), TextInputFormat.class,
+       MapClass.class);
+    MultipleInputs.addInputPath(conf, new Path("/bar"),
+       KeyValueTextInputFormat.class, MapClass2.class);
+    final Map<Path, InputFormat> inputs = MultipleInputs
+       .getInputFormatMap(conf);
+    final Map<Path, Class<? extends Mapper>> maps = MultipleInputs
+       .getMapperTypeMap(conf);
+
+    assertEquals(TextInputFormat.class, inputs.get(new Path("/foo")).getClass());
+    assertEquals(KeyValueTextInputFormat.class, inputs.get(new Path("/bar"))
+       .getClass());
+    assertEquals(MapClass.class, maps.get(new Path("/foo")));
+    assertEquals(MapClass2.class, maps.get(new Path("/bar")));
+  }
+
+  static class MapClass extends Mapper<String, String, String, String> {
+  }
+
+  static class MapClass2 extends MapClass {
+  }
+}



Mime
View raw message