tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject git commit: TEZ-189. Add helper apis to generate splits for applications moving away from MR to Tez. (hitesh)
Date Fri, 07 Jun 2013 21:53:02 GMT
Updated Branches:
  refs/heads/master 5795b852b -> d8690f7e9


TEZ-189. Add helper apis to generate splits for applications moving away from MR to Tez. (hitesh)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d8690f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d8690f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d8690f7e

Branch: refs/heads/master
Commit: d8690f7e91d7b284f79e3dc1ea9abfe3674bb9f7
Parents: 5795b85
Author: Hitesh Shah <hitesh@apache.org>
Authored: Fri Jun 7 14:48:30 2013 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Fri Jun 7 14:50:13 2013 -0700

----------------------------------------------------------------------
 pom.xml                                            |    7 +
 .../org/apache/tez/dag/api/VertexLocationHint.java |   62 +++++
 tez-mapreduce/pom.xml                              |   11 +
 .../tez/mapreduce/hadoop/InputSplitInfo.java       |   83 ++++++
 .../org/apache/tez/mapreduce/hadoop/MRHelpers.java |  214 +++++++++++++++
 .../apache/tez/mapreduce/hadoop/TestMRHelpers.java |  211 ++++++++++++++
 6 files changed, 588 insertions(+), 0 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8690f7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4c5d0c5..46d25ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -399,6 +399,13 @@
                   <goal>clover</goal>
                 </goals>
               </execution>
+              <execution>
+                <id>site</id>
+                <phase>pre-site</phase>
+                <goals>
+                  <goal>aggregate</goal>
+                </goals>
+              </execution>
             </executions>
           </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8690f7e/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
----------------------------------------------------------------------
diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
index 381b4ba..3941165 100644
--- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
+++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/VertexLocationHint.java
@@ -18,6 +18,8 @@
 
 package org.apache.tez.dag.api;
 
+import java.util.Arrays;
+
 public class VertexLocationHint  {
 
   private int numTasks;
@@ -49,6 +51,36 @@ public class VertexLocationHint  {
     this.taskLocationHints = taskLocationHints;
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 7883;
+    int result = 1;
+    result = prime * result + numTasks;
+    result = prime * result + Arrays.hashCode(taskLocationHints);
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    VertexLocationHint other = (VertexLocationHint) obj;
+    if (numTasks != other.numTasks) {
+      return false;
+    }
+    if (!Arrays.equals(taskLocationHints, other.taskLocationHints)) {
+      return false;
+    }
+    return true;
+  }
+
   public static class TaskLocationHint {
 
     // Host names if any to be used
@@ -77,5 +109,35 @@ public class VertexLocationHint  {
     public void setRacks(String[] racks) {
       this.racks = racks;
     }
+
+    @Override
+    public int hashCode() {
+      final int prime = 9397;
+      int result = 1;
+      result = prime * result + Arrays.hashCode(hosts);
+      result = prime * result + Arrays.hashCode(racks);
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      TaskLocationHint other = (TaskLocationHint) obj;
+      if (!Arrays.equals(hosts, other.hosts)) {
+        return false;
+      }
+      if (!Arrays.equals(racks, other.racks)) {
+        return false;
+      }
+      return true;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8690f7e/tez-mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/tez-mapreduce/pom.xml b/tez-mapreduce/pom.xml
index e32b098..ea5016b 100644
--- a/tez-mapreduce/pom.xml
+++ b/tez-mapreduce/pom.xml
@@ -71,6 +71,17 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8690f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
new file mode 100644
index 0000000..44013c6
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfo.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+
+/**
+ * Information obtained by using the InputFormat class to generate
+ * the required InputSplit information files that in turn can be used to
+ * setup a DAG.
+ *
+ * The splitsFile and splitsMetaInfoFile need to be provided as LocalResources
+ * to the vertex in question. The numTasks represents the parallelism for
+ * the vertex and the taskLocationHints define the possible nodes on which the
+ * tasks should be run based on the location of the splits that will be
+ * processed by each task.
+ */
+public class InputSplitInfo {
+
+  /// Splits file
+  private final Path splitsFile;
+  /// Meta info file for all the splits information
+  private final Path splitsMetaInfoFile;
+  /// Location hints to determine where to run the tasks
+  private final TaskLocationHint[] taskLocationHints;
+  /// The num of tasks - same as number of splits generated.
+  private final int numTasks;
+
+  public InputSplitInfo(Path splitsFile, Path splitsMetaInfoFile, int numTasks,
+      TaskLocationHint[] taskLocationHints) {
+    this.splitsFile = splitsFile;
+    this.splitsMetaInfoFile = splitsMetaInfoFile;
+    this.taskLocationHints = taskLocationHints;
+    this.numTasks = numTasks;
+  }
+
+  /**
+   * Get the TaskLocationHints for each task
+   */
+  public TaskLocationHint[] getTaskLocationHints() {
+    return taskLocationHints;
+  }
+
+  /**
+   * Get the path to the splits meta info file
+   */
+  public Path getSplitsMetaInfoFile() {
+    return splitsMetaInfoFile;
+  }
+
+  /**
+   * Get the path to the splits file
+   */
+  public Path getSplitsFile() {
+    return splitsFile;
+  }
+
+  /**
+   * Get the number of splits that were generated. Same as number of tasks that
+   * should be run for the vertex processing these set of splits.
+   */
+  public int getNumTasks() {
+    return numTasks;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8690f7e/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
new file mode 100644
index 0000000..a8a77db
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRHelpers.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+
+public class MRHelpers {
+
+  private static final Log LOG = LogFactory.getLog(MRHelpers.class);
+
+  /**
+   * Comparator for org.apache.hadoop.mapreduce.InputSplit
+   */
+  private static class InputSplitComparator
+      implements Comparator<org.apache.hadoop.mapreduce.InputSplit> {
+    @Override
+    public int compare(org.apache.hadoop.mapreduce.InputSplit o1,
+        org.apache.hadoop.mapreduce.InputSplit o2) {
+      try {
+        long len1 = o1.getLength();
+        long len2 = o2.getLength();
+        if (len1 < len2) {
+          return 1;
+        } else if (len1 == len2) {
+          return 0;
+        } else {
+          return -1;
+        }
+      } catch (IOException ie) {
+        throw new RuntimeException("exception in InputSplit compare", ie);
+      } catch (InterruptedException ie) {
+        throw new RuntimeException("exception in InputSplit compare", ie);
+      }
+    }
+  }
+
+  /**
+   * Comparator for org.apache.hadoop.mapred.InputSplit
+   */
+  private static class OldInputSplitComparator
+      implements Comparator<org.apache.hadoop.mapred.InputSplit> {
+    @Override
+    public int compare(org.apache.hadoop.mapred.InputSplit o1,
+        org.apache.hadoop.mapred.InputSplit o2) {
+      try {
+        long len1 = o1.getLength();
+        long len2 = o2.getLength();
+        if (len1 < len2) {
+          return 1;
+        } else if (len1 == len2) {
+          return 0;
+        } else {
+          return -1;
+        }
+      } catch (IOException ie) {
+        throw new RuntimeException("Problem getting input split size", ie);
+      }
+    }
+  }
+
+  /**
+   * Generate new-api mapreduce InputFormat splits
+   * @param jobContext JobContext required by InputFormat
+   * @param inputSplitDir Directory in which to generate splits information
+   *
+   * @return InputSplitInfo containing the split files' information and the
+   * location hints for each split generated to be used to determining parallelism of
+   * the map stage.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  private static InputSplitInfo writeNewSplits(JobContext jobContext,
+      Path inputSplitDir) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    Configuration conf = jobContext.getConfiguration();
+    InputFormat<?, ?> input =
+        ReflectionUtils.newInstance(jobContext.getInputFormatClass(), conf);
+
+    List<org.apache.hadoop.mapreduce.InputSplit> array =
+        input.getSplits(jobContext);
+    org.apache.hadoop.mapreduce.InputSplit[] splits =
+        (org.apache.hadoop.mapreduce.InputSplit[])
+        array.toArray(
+            new org.apache.hadoop.mapreduce.InputSplit[array.size()]);
+
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new InputSplitComparator());
+
+    JobSplitWriter.createSplitFiles(inputSplitDir, conf,
+        inputSplitDir.getFileSystem(conf), splits);
+
+    TaskLocationHint[] locationHints =
+        new TaskLocationHint[splits.length];
+    for (int i = 0; i < splits.length; ++i) {
+      locationHints[i] = new TaskLocationHint(splits[i].getLocations(), null);
+    }
+
+    return new InputSplitInfo(
+        JobSubmissionFiles.getJobSplitFile(inputSplitDir),
+        JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
+        splits.length, locationHints);
+  }
+
+  /**
+   * Generate old-api mapred InputFormat splits
+   * @param jobConf JobConf required by InputFormat class
+   * @param inputSplitDir Directory in which to generate splits information
+   *
+   * @return InputSplitInfo containing the split files' information and the
+   * number of splits generated to be used to determining parallelism of
+   * the map stage.
+   *
+   * @throws IOException
+   */
+  private static InputSplitInfo writeOldSplits(JobConf jobConf,
+      Path inputSplitDir) throws IOException {
+    org.apache.hadoop.mapred.InputSplit[] splits =
+        jobConf.getInputFormat().getSplits(jobConf, jobConf.getNumMapTasks());
+    // sort the splits into order based on size, so that the biggest
+    // go first
+    Arrays.sort(splits, new OldInputSplitComparator());
+    JobSplitWriter.createSplitFiles(inputSplitDir, jobConf,
+        inputSplitDir.getFileSystem(jobConf), splits);
+
+    TaskLocationHint[] locationHints =
+        new TaskLocationHint[splits.length];
+    for (int i = 0; i < splits.length; ++i) {
+      locationHints[i] = new TaskLocationHint(splits[i].getLocations(), null);
+    }
+
+    return new InputSplitInfo(
+        JobSubmissionFiles.getJobSplitFile(inputSplitDir),
+        JobSubmissionFiles.getJobSplitMetaFile(inputSplitDir),
+        splits.length, locationHints);
+  }
+
+  /**
+   * Helper api to generate splits
+   * @param conf Configuration with all necessary information set to generate
+   * splits. The following are required at a minimum:
+   *
+   *   - mapred.mapper.new-api: determine whether mapred.InputFormat or
+   *     mapreduce.InputFormat is to be used
+   *   - mapred.input.format.class or mapreduce.job.inputformat.class:
+   *     determines the InputFormat class to be used
+   *
+   * In addition to this, all the configs needed by the InputFormat class also
+   * have to be set. For example, FileInputFormat needs the input directory
+   * paths to be set in the config.
+   *
+   * @param inputSplitsDir Directory in which the splits file and meta info file
+   * will be generated. job.split and job.splitmetainfo files in this directory
+   * will be overwritten.
+   *
+   * @return InputSplitInfo containing the split files' information and the
+   * number of splits generated to be used to determining parallelism of
+   * the map stage.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws ClassNotFoundException
+   */
+  public static InputSplitInfo generateInputSplits(Configuration conf,
+      Path inputSplitsDir) throws IOException, InterruptedException,
+      ClassNotFoundException {
+    Job job = Job.getInstance(conf);
+    JobConf jobConf = new JobConf(conf);
+    if (jobConf.getUseNewMapper()) {
+      LOG.info("Generating new input splits"
+          + ", splitsDir=" + inputSplitsDir.toString());
+      return writeNewSplits(job, inputSplitsDir);
+    } else {
+      LOG.info("Generating old input splits"
+          + ", splitsDir=" + inputSplitsDir.toString());
+      return writeOldSplits(jobConf, inputSplitsDir);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d8690f7e/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
new file mode 100644
index 0000000..477be9b
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/hadoop/TestMRHelpers.java
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestMRHelpers {
+
+  protected static MiniDFSCluster dfsCluster;
+
+  private static Configuration conf = new Configuration();
+  private static FileSystem remoteFs;
+  private static Path testFilePath;
+  private static Path oldSplitsDir;
+  private static Path newSplitsDir;
+
+  private static String TEST_ROOT_DIR = "target"
+      + Path.SEPARATOR + TestMRHelpers.class.getName() + "-tmpDir";
+
+  @BeforeClass
+  public static void setup() throws IOException {
+    try {
+      conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
+      dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+        .format(true).racks(null).build();
+      remoteFs = dfsCluster.getFileSystem();
+    } catch (IOException io) {
+      throw new RuntimeException("problem starting mini dfs cluster", io);
+    }
+
+    Configuration testConf = new YarnConfiguration(
+        dfsCluster.getFileSystem().getConf());
+
+    File testConfFile = new File(TEST_ROOT_DIR, "test.xml");
+    try {
+      testConfFile.createNewFile();
+      testConf.writeXml(new FileOutputStream(testConfFile));
+      testConfFile.deleteOnExit();
+    } catch (IOException e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+
+    remoteFs.mkdirs(new Path("/tmp/input/"));
+    remoteFs.mkdirs(new Path("/tmp/splitsDirNew/"));
+    remoteFs.mkdirs(new Path("/tmp/splitsDirOld/"));
+    testFilePath = remoteFs.makeQualified(new Path("/tmp/input/test.xml"));
+    remoteFs.copyFromLocalFile(new Path(testConfFile.getAbsolutePath()),
+        testFilePath);
+    FileStatus fsStatus = remoteFs.getFileStatus(testFilePath);
+    Assert.assertTrue(fsStatus.getLen() > 0);
+
+    oldSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirOld/"));
+    newSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirNew/"));
+  }
+
+  private void verifyLocationHints(Path inputSplitsDir,
+      TaskLocationHint[] actual) throws Exception {
+    JobID jobId = new JobID("dummy", 1);
+    TaskSplitMetaInfo[] splitsInfo =
+        SplitMetaInfoReader.readSplitMetaInfo(jobId , remoteFs,
+            conf, inputSplitsDir);
+    int splitsCount = splitsInfo.length;
+    TaskLocationHint[] locationHints =
+        new TaskLocationHint[splitsCount];
+    for (int i = 0; i < splitsCount; ++i) {
+      TaskLocationHint locationHint =
+          new TaskLocationHint(splitsInfo[i].getLocations(), null);
+      locationHints[i] = locationHint;
+    }
+
+    Assert.assertArrayEquals(locationHints, actual);
+  }
+
+  private InputSplitInfo generateNewSplits(Path inputSplitsDir)
+      throws Exception {
+    JobConf jobConf = new JobConf();
+    jobConf.setUseNewMapper(true);
+    jobConf.setClass(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class,
+        InputFormat.class);
+    jobConf.set(TextInputFormat.INPUT_DIR, testFilePath.toString());
+
+    return MRHelpers.generateInputSplits(jobConf, inputSplitsDir);
+  }
+
+  @Test
+  public void testNewSplitsGen() throws Exception {
+    InputSplitInfo info = generateNewSplits(newSplitsDir);
+
+    Assert.assertEquals(new Path(newSplitsDir, "job.split"),
+        info.getSplitsFile());
+    Assert.assertEquals(new Path(newSplitsDir, "job.splitmetainfo"),
+        info.getSplitsMetaInfoFile());
+
+    RemoteIterator<LocatedFileStatus> files =
+        remoteFs.listFiles(newSplitsDir, false);
+
+    boolean foundSplitsFile = false;
+    boolean foundMetaFile = false;
+    int totalFilesFound = 0;
+
+    while (files.hasNext()) {
+      LocatedFileStatus status = files.next();
+      String fName = status.getPath().getName();
+      totalFilesFound++;
+      if (fName.equals("job.split")) {
+        foundSplitsFile = true;
+      } else if (fName.equals("job.splitmetainfo")) {
+        foundMetaFile = true;
+      } else {
+        Assert.fail("Found invalid file in splits dir, filename=" + fName);
+      }
+      Assert.assertTrue(status.getLen() > 0);
+    }
+
+    Assert.assertEquals(2, totalFilesFound);
+    Assert.assertTrue(foundSplitsFile);
+    Assert.assertTrue(foundMetaFile);
+
+    verifyLocationHints(newSplitsDir, info.getTaskLocationHints());
+  }
+
+  private InputSplitInfo generateOldSplits(Path inputSplitsDir)
+      throws Exception {
+    JobConf jobConf = new JobConf();
+    jobConf.setUseNewMapper(false);
+    jobConf.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class);
+    jobConf.set(TextInputFormat.INPUT_DIR, testFilePath.toString());
+
+    return MRHelpers.generateInputSplits(jobConf, inputSplitsDir);
+  }
+
+  @Test
+  public void testOldSplitsGen() throws Exception {
+    InputSplitInfo info = generateOldSplits(oldSplitsDir);
+    Assert.assertEquals(new Path(oldSplitsDir, "job.split"),
+        info.getSplitsFile());
+    Assert.assertEquals(new Path(oldSplitsDir, "job.splitmetainfo"),
+        info.getSplitsMetaInfoFile());
+
+    RemoteIterator<LocatedFileStatus> files =
+        remoteFs.listFiles(oldSplitsDir, false);
+
+    boolean foundSplitsFile = false;
+    boolean foundMetaFile = false;
+    int totalFilesFound = 0;
+
+    while (files.hasNext()) {
+      LocatedFileStatus status = files.next();
+      String fName = status.getPath().getName();
+      totalFilesFound++;
+      if (fName.equals("job.split")) {
+        foundSplitsFile = true;
+      } else if (fName.equals("job.splitmetainfo")) {
+        foundMetaFile = true;
+      } else {
+        Assert.fail("Found invalid file in splits dir, filename=" + fName);
+      }
+      Assert.assertTrue(status.getLen() > 0);
+    }
+
+    Assert.assertEquals(2, totalFilesFound);
+    Assert.assertTrue(foundSplitsFile);
+    Assert.assertTrue(foundMetaFile);
+
+    verifyLocationHints(oldSplitsDir, info.getTaskLocationHints());
+  }
+
+}


Mime
View raw message